1#[cfg(not(feature = "std"))]
57use alloc::{string::String, sync::Arc, vec::Vec};
58#[cfg(feature = "std")]
59use std::collections::HashMap;
60#[cfg(feature = "std")]
61use std::sync::Arc;
62
63use crate::document::{ENCRYPTED_MARKER, KEY_EXCHANGE_MARKER, PEER_E2EE_MARKER};
64use crate::document_sync::DocumentSync;
65use crate::gossip::{GossipStrategy, RandomFanout};
66use crate::observer::{DisconnectReason, HiveEvent, HiveObserver, SecurityViolationKind};
67use crate::peer::{
68 ConnectionStateGraph, FullStateCountSummary, HivePeer, IndirectPeer, PeerConnectionState,
69 PeerDegree, PeerManagerConfig, StateCountSummary,
70};
71use crate::peer_manager::PeerManager;
72use crate::relay::{
73 MessageId, RelayEnvelope, SeenMessageCache, DEFAULT_MAX_HOPS, DEFAULT_SEEN_TTL_MS,
74 RELAY_ENVELOPE_MARKER,
75};
76use crate::security::{
77 DeviceIdentity, IdentityAttestation, IdentityRegistry, KeyExchangeMessage, MeshEncryptionKey,
78 PeerEncryptedMessage, PeerSessionManager, RegistryResult, SessionState,
79};
80use crate::sync::crdt::{EventType, Peripheral, PeripheralType};
81use crate::sync::delta::{DeltaEncoder, DeltaStats};
82use crate::sync::delta_document::{DeltaDocument, Operation};
83use crate::NodeId;
84
85#[cfg(feature = "std")]
86use crate::observer::ObserverManager;
87
88use crate::registry::DocumentRegistry;
89
90#[derive(Debug, Clone)]
92pub struct HiveMeshConfig {
93 pub node_id: NodeId,
95
96 pub callsign: String,
98
99 pub mesh_id: String,
101
102 pub peripheral_type: PeripheralType,
104
105 pub peer_config: PeerManagerConfig,
107
108 pub sync_interval_ms: u64,
110
111 pub auto_broadcast_events: bool,
113
114 pub encryption_secret: Option<[u8; 32]>,
120
121 pub strict_encryption: bool,
129
130 pub enable_relay: bool,
137
138 pub max_relay_hops: u8,
143
144 pub relay_fanout: usize,
150
151 pub seen_cache_ttl_ms: u64,
156}
157
158impl HiveMeshConfig {
159 pub fn new(node_id: NodeId, callsign: &str, mesh_id: &str) -> Self {
161 Self {
162 node_id,
163 callsign: callsign.into(),
164 mesh_id: mesh_id.into(),
165 peripheral_type: PeripheralType::SoldierSensor,
166 peer_config: PeerManagerConfig::with_mesh_id(mesh_id),
167 sync_interval_ms: 5000,
168 auto_broadcast_events: true,
169 encryption_secret: None,
170 strict_encryption: false,
171 enable_relay: false,
172 max_relay_hops: DEFAULT_MAX_HOPS,
173 relay_fanout: 2,
174 seen_cache_ttl_ms: DEFAULT_SEEN_TTL_MS,
175 }
176 }
177
178 pub fn with_encryption(mut self, secret: [u8; 32]) -> Self {
183 self.encryption_secret = Some(secret);
184 self
185 }
186
187 pub fn with_peripheral_type(mut self, ptype: PeripheralType) -> Self {
189 self.peripheral_type = ptype;
190 self
191 }
192
193 pub fn with_sync_interval(mut self, interval_ms: u64) -> Self {
195 self.sync_interval_ms = interval_ms;
196 self
197 }
198
199 pub fn with_peer_timeout(mut self, timeout_ms: u64) -> Self {
201 self.peer_config.peer_timeout_ms = timeout_ms;
202 self
203 }
204
205 pub fn with_max_peers(mut self, max: usize) -> Self {
207 self.peer_config.max_peers = max;
208 self
209 }
210
211 pub fn with_strict_encryption(mut self) -> Self {
219 self.strict_encryption = true;
220 self
221 }
222
223 pub fn with_relay(mut self) -> Self {
228 self.enable_relay = true;
229 self
230 }
231
232 pub fn with_max_relay_hops(mut self, max_hops: u8) -> Self {
236 self.max_relay_hops = max_hops;
237 self
238 }
239
240 pub fn with_relay_fanout(mut self, fanout: usize) -> Self {
244 self.relay_fanout = fanout.max(1);
245 self
246 }
247
248 pub fn with_seen_cache_ttl(mut self, ttl_ms: u64) -> Self {
252 self.seen_cache_ttl_ms = ttl_ms;
253 self
254 }
255}
256
257#[cfg(feature = "std")]
259type AppDocumentStore =
260 std::sync::RwLock<HashMap<(u8, u32, u64), Box<dyn core::any::Any + Send + Sync>>>;
261
262#[cfg(feature = "std")]
267pub struct HiveMesh {
268 config: HiveMeshConfig,
270
271 peer_manager: PeerManager,
273
274 document_sync: DocumentSync,
276
277 observers: ObserverManager,
279
280 last_sync_ms: std::sync::atomic::AtomicU32,
282
283 last_cleanup_ms: std::sync::atomic::AtomicU32,
285
286 encryption_key: Option<MeshEncryptionKey>,
288
289 peer_sessions: std::sync::Mutex<Option<PeerSessionManager>>,
291
292 connection_graph: std::sync::Mutex<ConnectionStateGraph>,
294
295 seen_cache: std::sync::Mutex<SeenMessageCache>,
297
298 gossip_strategy: Box<dyn GossipStrategy>,
300
301 delta_encoder: std::sync::Mutex<DeltaEncoder>,
306
307 identity: Option<DeviceIdentity>,
312
313 identity_registry: std::sync::Mutex<IdentityRegistry>,
317
318 peer_peripherals: std::sync::RwLock<HashMap<NodeId, Peripheral>>,
323
324 document_registry: DocumentRegistry,
329
330 app_documents: AppDocumentStore,
336}
337
338#[cfg(feature = "std")]
339impl HiveMesh {
340 pub fn new(config: HiveMeshConfig) -> Self {
342 let peer_manager = PeerManager::new(config.node_id, config.peer_config.clone());
343 let document_sync = DocumentSync::with_peripheral_type(
344 config.node_id,
345 &config.callsign,
346 config.peripheral_type,
347 );
348
349 let encryption_key = config
351 .encryption_secret
352 .map(|secret| MeshEncryptionKey::from_shared_secret(&config.mesh_id, &secret));
353
354 let connection_graph = ConnectionStateGraph::with_config(
356 config.peer_config.rssi_degraded_threshold,
357 config.peer_config.lost_timeout_ms,
358 );
359
360 let seen_cache = SeenMessageCache::with_ttl(config.seen_cache_ttl_ms);
362
363 let gossip_strategy: Box<dyn GossipStrategy> =
365 Box::new(RandomFanout::new(config.relay_fanout));
366
367 let delta_encoder = DeltaEncoder::new(config.node_id);
369
370 let document_registry = DocumentRegistry::new();
372 #[cfg(feature = "hive-lite-sync")]
373 {
374 use crate::hive_lite_sync::CannedMessageDocument;
375 document_registry.register::<CannedMessageDocument>();
376 log::info!("Auto-registered CannedMessageDocument (0xC0) for hive-lite sync");
377 }
378
379 Self {
380 config,
381 peer_manager,
382 document_sync,
383 observers: ObserverManager::new(),
384 last_sync_ms: std::sync::atomic::AtomicU32::new(0),
385 last_cleanup_ms: std::sync::atomic::AtomicU32::new(0),
386 encryption_key,
387 peer_sessions: std::sync::Mutex::new(None),
388 connection_graph: std::sync::Mutex::new(connection_graph),
389 seen_cache: std::sync::Mutex::new(seen_cache),
390 gossip_strategy,
391 delta_encoder: std::sync::Mutex::new(delta_encoder),
392 identity: None,
393 identity_registry: std::sync::Mutex::new(IdentityRegistry::new()),
394 peer_peripherals: std::sync::RwLock::new(HashMap::new()),
395 document_registry,
396 app_documents: std::sync::RwLock::new(HashMap::new()),
397 }
398 }
399
400 pub fn with_identity(config: HiveMeshConfig, identity: DeviceIdentity) -> Self {
406 let mut config = config;
408 config.node_id = identity.node_id();
409
410 let peer_manager = PeerManager::new(config.node_id, config.peer_config.clone());
411 let document_sync = DocumentSync::with_peripheral_type(
412 config.node_id,
413 &config.callsign,
414 config.peripheral_type,
415 );
416
417 let encryption_key = config
418 .encryption_secret
419 .map(|secret| MeshEncryptionKey::from_shared_secret(&config.mesh_id, &secret));
420
421 let connection_graph = ConnectionStateGraph::with_config(
422 config.peer_config.rssi_degraded_threshold,
423 config.peer_config.lost_timeout_ms,
424 );
425
426 let seen_cache = SeenMessageCache::with_ttl(config.seen_cache_ttl_ms);
427 let gossip_strategy: Box<dyn GossipStrategy> =
428 Box::new(RandomFanout::new(config.relay_fanout));
429 let delta_encoder = DeltaEncoder::new(config.node_id);
430
431 let document_registry = DocumentRegistry::new();
433 #[cfg(feature = "hive-lite-sync")]
434 {
435 use crate::hive_lite_sync::CannedMessageDocument;
436 document_registry.register::<CannedMessageDocument>();
437 log::info!("Auto-registered CannedMessageDocument (0xC0) for hive-lite sync");
438 }
439
440 Self {
441 config,
442 peer_manager,
443 document_sync,
444 observers: ObserverManager::new(),
445 last_sync_ms: std::sync::atomic::AtomicU32::new(0),
446 last_cleanup_ms: std::sync::atomic::AtomicU32::new(0),
447 encryption_key,
448 peer_sessions: std::sync::Mutex::new(None),
449 connection_graph: std::sync::Mutex::new(connection_graph),
450 seen_cache: std::sync::Mutex::new(seen_cache),
451 gossip_strategy,
452 delta_encoder: std::sync::Mutex::new(delta_encoder),
453 identity: Some(identity),
454 identity_registry: std::sync::Mutex::new(IdentityRegistry::new()),
455 peer_peripherals: std::sync::RwLock::new(HashMap::new()),
456 document_registry,
457 app_documents: std::sync::RwLock::new(HashMap::new()),
458 }
459 }
460
461 pub fn from_genesis(
469 genesis: &crate::security::MeshGenesis,
470 identity: DeviceIdentity,
471 callsign: &str,
472 ) -> Self {
473 let config = HiveMeshConfig::new(identity.node_id(), callsign, &genesis.mesh_id())
474 .with_encryption(genesis.encryption_secret());
475
476 Self::with_identity(config, identity)
477 }
478
479 #[cfg(feature = "std")]
505 pub fn from_persisted(
506 state: crate::security::PersistedState,
507 callsign: &str,
508 ) -> Result<Self, crate::security::PersistenceError> {
509 let identity = state.restore_identity()?;
511
512 let genesis = state.restore_genesis();
514
515 let mesh = if let Some(ref gen) = genesis {
517 Self::from_genesis(gen, identity, callsign)
518 } else {
519 let config = HiveMeshConfig::new(identity.node_id(), callsign, "RESTORED");
520 Self::with_identity(config, identity)
521 };
522
523 let restored_registry = state.restore_registry();
525 if let Ok(mut registry) = mesh.identity_registry.lock() {
526 *registry = restored_registry;
527 }
528
529 log::info!(
530 "HiveMesh restored from persisted state: node_id={:08X}, known_peers={}",
531 mesh.config.node_id.as_u32(),
532 mesh.known_identity_count()
533 );
534
535 Ok(mesh)
536 }
537
538 #[cfg(feature = "std")]
551 pub fn to_persisted_state(
552 &self,
553 genesis: Option<&crate::security::MeshGenesis>,
554 ) -> Option<crate::security::PersistedState> {
555 let identity = self.identity.as_ref()?;
556 let registry = self.identity_registry.lock().ok()?;
557
558 Some(crate::security::PersistedState::with_registry(
559 identity, genesis, ®istry,
560 ))
561 }
562
563 pub fn is_encryption_enabled(&self) -> bool {
567 self.encryption_key.is_some()
568 }
569
570 pub fn is_strict_encryption_enabled(&self) -> bool {
574 self.config.strict_encryption && self.encryption_key.is_some()
575 }
576
577 pub fn enable_encryption(&mut self, secret: &[u8; 32]) {
582 self.encryption_key = Some(MeshEncryptionKey::from_shared_secret(
583 &self.config.mesh_id,
584 secret,
585 ));
586 }
587
588 pub fn disable_encryption(&mut self) {
590 self.encryption_key = None;
591 }
592
593 fn encrypt_document(&self, plaintext: &[u8]) -> Vec<u8> {
598 match &self.encryption_key {
599 Some(key) => {
600 match key.encrypt_to_bytes(plaintext) {
602 Ok(ciphertext) => {
603 let mut buf = Vec::with_capacity(2 + ciphertext.len());
604 buf.push(ENCRYPTED_MARKER);
605 buf.push(0x00); buf.extend_from_slice(&ciphertext);
607 buf
608 }
609 Err(e) => {
610 log::error!("Encryption failed: {}", e);
611 plaintext.to_vec()
613 }
614 }
615 }
616 None => plaintext.to_vec(),
617 }
618 }
619
620 fn decrypt_document<'a>(
628 &self,
629 data: &'a [u8],
630 source_hint: Option<&str>,
631 ) -> Option<std::borrow::Cow<'a, [u8]>> {
632 log::debug!(
633 "decrypt_document: len={}, first_byte=0x{:02X}, source={:?}",
634 data.len(),
635 data.first().copied().unwrap_or(0),
636 source_hint
637 );
638
639 if data.len() >= 2 && data[0] == ENCRYPTED_MARKER {
641 let _reserved = data[1];
643 let encrypted_payload = &data[2..];
644
645 log::debug!(
646 "decrypt_document: encrypted payload len={}, nonce+ciphertext",
647 encrypted_payload.len()
648 );
649
650 match &self.encryption_key {
651 Some(key) => match key.decrypt_from_bytes(encrypted_payload) {
652 Ok(plaintext) => {
653 log::debug!(
654 "decrypt_document: SUCCESS, plaintext len={}",
655 plaintext.len()
656 );
657 Some(std::borrow::Cow::Owned(plaintext))
658 }
659 Err(e) => {
660 log::warn!(
661 "decrypt_document: FAILED (wrong key or corrupted): {} [payload_len={}, source={:?}]",
662 e,
663 encrypted_payload.len(),
664 source_hint
665 );
666 self.notify(HiveEvent::SecurityViolation {
667 kind: SecurityViolationKind::DecryptionFailed,
668 source: source_hint.map(String::from),
669 });
670 None
671 }
672 },
673 None => {
674 log::warn!(
675 "decrypt_document: encryption not enabled but received encrypted doc"
676 );
677 None
678 }
679 }
680 } else {
681 if self.config.strict_encryption && self.encryption_key.is_some() {
684 log::warn!(
685 "Rejected unencrypted document in strict encryption mode (source: {:?})",
686 source_hint
687 );
688 self.notify(HiveEvent::SecurityViolation {
689 kind: SecurityViolationKind::UnencryptedInStrictMode,
690 source: source_hint.map(String::from),
691 });
692 None
693 } else {
694 Some(std::borrow::Cow::Borrowed(data))
696 }
697 }
698 }
699
700 pub fn decrypt_only(&self, data: &[u8]) -> Option<Vec<u8>> {
714 self.decrypt_document(data, None)
715 .map(|cow| cow.into_owned())
716 }
717
718 pub fn has_identity(&self) -> bool {
722 self.identity.is_some()
723 }
724
725 pub fn public_key(&self) -> Option<[u8; 32]> {
727 self.identity.as_ref().map(|id| id.public_key())
728 }
729
730 pub fn create_attestation(&self, now_ms: u64) -> Option<IdentityAttestation> {
734 self.identity
735 .as_ref()
736 .map(|id| id.create_attestation(now_ms))
737 }
738
739 pub fn verify_peer_identity(&self, attestation: &IdentityAttestation) -> RegistryResult {
748 self.identity_registry
749 .lock()
750 .unwrap()
751 .verify_or_register(attestation)
752 }
753
754 pub fn is_peer_identity_known(&self, node_id: NodeId) -> bool {
756 self.identity_registry.lock().unwrap().is_known(node_id)
757 }
758
759 pub fn peer_public_key(&self, node_id: NodeId) -> Option<[u8; 32]> {
761 self.identity_registry
762 .lock()
763 .unwrap()
764 .get_public_key(node_id)
765 .copied()
766 }
767
768 pub fn known_identity_count(&self) -> usize {
770 self.identity_registry.lock().unwrap().len()
771 }
772
773 pub fn pre_register_peer_identity(&self, node_id: NodeId, public_key: [u8; 32], now_ms: u64) {
778 self.identity_registry
779 .lock()
780 .unwrap()
781 .pre_register(node_id, public_key, now_ms);
782 }
783
784 pub fn forget_peer_identity(&self, node_id: NodeId) {
788 self.identity_registry.lock().unwrap().remove(node_id);
789 }
790
791 pub fn sign(&self, data: &[u8]) -> Option<[u8; 64]> {
795 self.identity.as_ref().map(|id| id.sign(data))
796 }
797
798 pub fn verify_peer_signature(
803 &self,
804 node_id: NodeId,
805 data: &[u8],
806 signature: &[u8; 64],
807 ) -> bool {
808 if let Some(public_key) = self.peer_public_key(node_id) {
809 crate::security::verify_signature(&public_key, data, signature)
810 } else {
811 false
812 }
813 }
814
815 pub fn is_relay_enabled(&self) -> bool {
819 self.config.enable_relay
820 }
821
822 pub fn enable_relay(&mut self) {
824 self.config.enable_relay = true;
825 }
826
827 pub fn disable_relay(&mut self) {
829 self.config.enable_relay = false;
830 }
831
832 pub fn has_seen_message(&self, message_id: &MessageId) -> bool {
836 self.seen_cache.lock().unwrap().has_seen(message_id)
837 }
838
839 pub fn mark_message_seen(&self, message_id: MessageId, origin: NodeId, now_ms: u64) -> bool {
843 self.seen_cache
844 .lock()
845 .unwrap()
846 .check_and_mark(message_id, origin, now_ms)
847 }
848
849 pub fn seen_cache_size(&self) -> usize {
851 self.seen_cache.lock().unwrap().len()
852 }
853
854 pub fn clear_seen_cache(&self) {
856 self.seen_cache.lock().unwrap().clear();
857 }
858
859 pub fn wrap_for_relay(&self, payload: Vec<u8>) -> Vec<u8> {
864 let envelope = RelayEnvelope::broadcast(self.config.node_id, payload)
865 .with_max_hops(self.config.max_relay_hops);
866 envelope.encode()
867 }
868
869 pub fn get_relay_targets(&self, exclude_peer: Option<NodeId>) -> Vec<HivePeer> {
874 let connected = self.peer_manager.get_connected_peers();
875 let filtered: Vec<_> = if let Some(exclude) = exclude_peer {
876 connected
877 .into_iter()
878 .filter(|p| p.node_id != exclude)
879 .collect()
880 } else {
881 connected
882 };
883
884 self.gossip_strategy
885 .select_peers(&filtered)
886 .into_iter()
887 .cloned()
888 .collect()
889 }
890
891 pub fn process_relay_envelope(
901 &self,
902 data: &[u8],
903 source_peer: NodeId,
904 now_ms: u64,
905 ) -> Option<RelayDecision> {
906 let envelope = RelayEnvelope::decode(data)?;
908
909 if envelope.origin_node != source_peer && envelope.origin_node != self.node_id() {
912 let is_new = self.connection_graph.lock().unwrap().on_relay_received(
913 source_peer,
914 envelope.origin_node,
915 envelope.hop_count,
916 now_ms,
917 );
918
919 if is_new {
920 log::debug!(
921 "Discovered indirect peer {:08X} via {:08X} ({} hops)",
922 envelope.origin_node.as_u32(),
923 source_peer.as_u32(),
924 envelope.hop_count
925 );
926 }
927 }
928
929 if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
931 let stats = self
933 .seen_cache
934 .lock()
935 .unwrap()
936 .get_stats(&envelope.message_id);
937 let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
938
939 self.notify(HiveEvent::DuplicateMessageDropped {
940 origin_node: envelope.origin_node,
941 seen_count,
942 });
943
944 log::debug!(
945 "Dropped duplicate message {} from {:08X} (seen {} times)",
946 envelope.message_id,
947 envelope.origin_node.as_u32(),
948 seen_count
949 );
950 return None;
951 }
952
953 if !envelope.can_relay() {
955 self.notify(HiveEvent::MessageTtlExpired {
956 origin_node: envelope.origin_node,
957 hop_count: envelope.hop_count,
958 });
959
960 log::debug!(
961 "Message {} from {:08X} TTL expired at hop {}",
962 envelope.message_id,
963 envelope.origin_node.as_u32(),
964 envelope.hop_count
965 );
966
967 return Some(RelayDecision {
969 payload: envelope.payload,
970 origin_node: envelope.origin_node,
971 hop_count: envelope.hop_count,
972 should_relay: false,
973 relay_envelope: None,
974 });
975 }
976
977 let should_relay = self.config.enable_relay;
979 let relay_envelope = if should_relay {
980 envelope.relay() } else {
982 None
983 };
984
985 Some(RelayDecision {
986 payload: envelope.payload,
987 origin_node: envelope.origin_node,
988 hop_count: envelope.hop_count,
989 should_relay,
990 relay_envelope,
991 })
992 }
993
994 pub fn build_relay_document(&self) -> Vec<u8> {
999 let doc = self.build_document(); self.wrap_for_relay(doc)
1001 }
1002
1003 pub fn register_peer_for_delta(&self, peer_id: &NodeId) {
1010 let mut encoder = self.delta_encoder.lock().unwrap();
1011 encoder.add_peer(peer_id);
1012 log::debug!(
1013 "Registered peer {:08X} for delta sync tracking",
1014 peer_id.as_u32()
1015 );
1016 }
1017
1018 pub fn unregister_peer_for_delta(&self, peer_id: &NodeId) {
1022 let mut encoder = self.delta_encoder.lock().unwrap();
1023 encoder.remove_peer(peer_id);
1024 log::debug!(
1025 "Unregistered peer {:08X} from delta sync tracking",
1026 peer_id.as_u32()
1027 );
1028 }
1029
1030 pub fn reset_peer_delta_state(&self, peer_id: &NodeId) {
1035 let mut encoder = self.delta_encoder.lock().unwrap();
1036 encoder.reset_peer(peer_id);
1037 log::debug!("Reset delta sync state for peer {:08X}", peer_id.as_u32());
1038 }
1039
1040 pub fn record_delta_sent(&self, peer_id: &NodeId, bytes: usize) {
1042 let mut encoder = self.delta_encoder.lock().unwrap();
1043 encoder.record_sent(peer_id, bytes);
1044 }
1045
1046 pub fn record_delta_received(&self, peer_id: &NodeId, bytes: usize, timestamp: u64) {
1048 let mut encoder = self.delta_encoder.lock().unwrap();
1049 encoder.record_received(peer_id, bytes, timestamp);
1050 }
1051
1052 pub fn delta_stats(&self) -> DeltaStats {
1057 self.delta_encoder.lock().unwrap().stats()
1058 }
1059
1060 pub fn peer_delta_stats(&self, peer_id: &NodeId) -> Option<(u64, u64, u32)> {
1064 let encoder = self.delta_encoder.lock().unwrap();
1065 encoder
1066 .get_peer_state(peer_id)
1067 .map(|state| (state.bytes_sent, state.bytes_received, state.sync_count))
1068 }
1069
1070 pub fn build_delta_document_for_peer(&self, peer_id: &NodeId, now_ms: u64) -> Option<Vec<u8>> {
1078 let mut all_operations: Vec<Operation> = Vec::new();
1080
1081 for (node_id_u32, count) in self.document_sync.counter_entries() {
1084 all_operations.push(Operation::IncrementCounter {
1085 counter_id: 0, node_id: NodeId::new(node_id_u32),
1087 amount: count,
1088 timestamp: count, });
1090 }
1091
1092 let peripheral = self.document_sync.peripheral_snapshot();
1095 let peripheral_timestamp = peripheral
1096 .last_event
1097 .as_ref()
1098 .map(|e| e.timestamp)
1099 .unwrap_or(1); all_operations.push(Operation::UpdatePeripheral {
1101 peripheral,
1102 timestamp: peripheral_timestamp,
1103 });
1104
1105 if let Some(emergency) = self.document_sync.emergency_snapshot() {
1107 let source_node = NodeId::new(emergency.source_node());
1108 let timestamp = emergency.timestamp();
1109
1110 all_operations.push(Operation::SetEmergency {
1112 source_node,
1113 timestamp,
1114 known_peers: emergency.all_nodes(),
1115 });
1116
1117 for acked_node in emergency.acked_nodes() {
1119 all_operations.push(Operation::AckEmergency {
1120 node_id: NodeId::new(acked_node),
1121 emergency_timestamp: timestamp,
1122 });
1123 }
1124 }
1125
1126 for app_op in self.app_document_delta_ops() {
1128 all_operations.push(Operation::App(app_op));
1129 }
1130
1131 let filtered_operations: Vec<Operation> = {
1133 let encoder = self.delta_encoder.lock().unwrap();
1134 if let Some(peer_state) = encoder.get_peer_state(peer_id) {
1135 all_operations
1136 .into_iter()
1137 .filter(|op| peer_state.needs_send(&op.key(), op.timestamp()))
1138 .collect()
1139 } else {
1140 all_operations
1142 }
1143 };
1144
1145 if filtered_operations.is_empty() {
1147 return None;
1148 }
1149
1150 {
1152 let mut encoder = self.delta_encoder.lock().unwrap();
1153 if let Some(peer_state) = encoder.get_peer_state_mut(peer_id) {
1154 for op in &filtered_operations {
1155 peer_state.mark_sent(&op.key(), op.timestamp());
1156 }
1157 }
1158 }
1159
1160 let mut delta = DeltaDocument::new(self.config.node_id, now_ms);
1162 for op in filtered_operations {
1163 delta.add_operation(op);
1164 }
1165
1166 let encoded = delta.encode();
1168 let result = self.encrypt_document(&encoded);
1169
1170 {
1172 let mut encoder = self.delta_encoder.lock().unwrap();
1173 encoder.record_sent(peer_id, result.len());
1174 }
1175
1176 Some(result)
1177 }
1178
1179 pub fn build_full_delta_document(&self, now_ms: u64) -> Vec<u8> {
1184 let mut delta = DeltaDocument::new(self.config.node_id, now_ms);
1185
1186 for (node_id_u32, count) in self.document_sync.counter_entries() {
1188 delta.add_operation(Operation::IncrementCounter {
1189 counter_id: 0,
1190 node_id: NodeId::new(node_id_u32),
1191 amount: count,
1192 timestamp: now_ms,
1193 });
1194 }
1195
1196 let peripheral = self.document_sync.peripheral_snapshot();
1198 let peripheral_timestamp = peripheral
1199 .last_event
1200 .as_ref()
1201 .map(|e| e.timestamp)
1202 .unwrap_or(now_ms);
1203 delta.add_operation(Operation::UpdatePeripheral {
1204 peripheral,
1205 timestamp: peripheral_timestamp,
1206 });
1207
1208 if let Some(emergency) = self.document_sync.emergency_snapshot() {
1210 let source_node = NodeId::new(emergency.source_node());
1211 let timestamp = emergency.timestamp();
1212
1213 delta.add_operation(Operation::SetEmergency {
1214 source_node,
1215 timestamp,
1216 known_peers: emergency.all_nodes(),
1217 });
1218
1219 for acked_node in emergency.acked_nodes() {
1220 delta.add_operation(Operation::AckEmergency {
1221 node_id: NodeId::new(acked_node),
1222 emergency_timestamp: timestamp,
1223 });
1224 }
1225 }
1226
1227 for app_op in self.app_document_delta_ops() {
1229 delta.add_operation(Operation::App(app_op));
1230 }
1231
1232 let encoded = delta.encode();
1233 self.encrypt_document(&encoded)
1234 }
1235
1236 fn process_delta_document_internal(
1240 &self,
1241 source_node: NodeId,
1242 data: &[u8],
1243 now_ms: u64,
1244 relay_data: Option<Vec<u8>>,
1245 origin_node: Option<NodeId>,
1246 hop_count: u8,
1247 ) -> Option<DataReceivedResult> {
1248 let delta = DeltaDocument::decode(data)?;
1250
1251 if delta.origin_node == self.config.node_id {
1253 return None;
1254 }
1255
1256 let mut counter_changed = false;
1258 let mut emergency_changed = false;
1259 let mut is_emergency = false;
1260 let mut is_ack = false;
1261 let mut event_timestamp = 0u64;
1262 let mut peer_peripheral: Option<crate::sync::crdt::Peripheral> = None;
1263
1264 log::info!(
1265 "Delta document from {:08X}: {} operations, data_len={}",
1266 delta.origin_node.as_u32(),
1267 delta.operations.len(),
1268 data.len()
1269 );
1270 for op in &delta.operations {
1271 log::info!(" Operation: {}", op.key());
1272 match op {
1273 Operation::IncrementCounter {
1274 node_id, amount, ..
1275 } => {
1276 let current = self.document_sync.counter_entries();
1278 let current_value = current
1279 .iter()
1280 .find(|(id, _)| *id == node_id.as_u32())
1281 .map(|(_, v)| *v)
1282 .unwrap_or(0);
1283
1284 if *amount > current_value {
1285 counter_changed = true;
1288 }
1289 }
1290 Operation::UpdatePeripheral {
1291 peripheral,
1292 timestamp,
1293 } => {
1294 if let Ok(mut peripherals) = self.peer_peripherals.write() {
1296 peripherals.insert(delta.origin_node, peripheral.clone());
1297 }
1298 peer_peripheral = Some(peripheral.clone());
1300 if *timestamp > event_timestamp {
1302 event_timestamp = *timestamp;
1303 }
1304 }
1305 Operation::SetEmergency { timestamp, .. } => {
1306 is_emergency = true;
1307 emergency_changed = true;
1308 event_timestamp = *timestamp;
1309 }
1310 Operation::AckEmergency {
1311 emergency_timestamp,
1312 ..
1313 } => {
1314 is_ack = true;
1315 emergency_changed = true;
1316 if *emergency_timestamp > event_timestamp {
1317 event_timestamp = *emergency_timestamp;
1318 }
1319 }
1320 Operation::ClearEmergency {
1321 emergency_timestamp,
1322 } => {
1323 emergency_changed = true;
1324 if *emergency_timestamp > event_timestamp {
1325 event_timestamp = *emergency_timestamp;
1326 }
1327 }
1328 Operation::App(app_op) => {
1329 let doc_timestamp = app_op.timestamp & 0x0000_FFFF_FFFF_FFFF;
1335
1336 log::info!(
1337 "App operation received: type={:02X} op_code={:02X} from {:08X} ts={} payload_len={}",
1338 app_op.type_id,
1339 app_op.op_code,
1340 app_op.source_node,
1341 doc_timestamp,
1342 app_op.payload.len()
1343 );
1344
1345 let doc_key = (app_op.type_id, app_op.source_node, doc_timestamp);
1347 let changed = {
1348 let mut docs = self.app_documents.write().unwrap();
1349
1350 if let Some(existing) = docs.get_mut(&doc_key) {
1351 self.document_registry.apply_delta_op(
1353 app_op.type_id,
1354 existing.as_mut(),
1355 app_op,
1356 )
1357 } else {
1358 if let Some(decoded) = self
1360 .document_registry
1361 .decode(app_op.type_id, &app_op.payload)
1362 {
1363 docs.insert(doc_key, decoded);
1364 true
1365 } else {
1366 log::debug!(
1369 "Received delta for unknown doc {:?}, waiting for full state",
1370 doc_key
1371 );
1372 false
1373 }
1374 }
1375 };
1376
1377 self.observers.notify(HiveEvent::app_document_received(
1379 app_op.type_id,
1380 NodeId::new(app_op.source_node),
1381 doc_timestamp,
1382 changed,
1383 ));
1384 }
1385 }
1386 }
1387
1388 self.peer_manager.record_sync(source_node, now_ms);
1390
1391 {
1393 let mut encoder = self.delta_encoder.lock().unwrap();
1394 encoder.record_received(&source_node, data.len(), now_ms);
1395 }
1396
1397 if is_emergency {
1399 self.notify(HiveEvent::EmergencyReceived {
1400 from_node: delta.origin_node,
1401 });
1402 } else if is_ack {
1403 self.notify(HiveEvent::AckReceived {
1404 from_node: delta.origin_node,
1405 });
1406 }
1407
1408 if counter_changed {
1409 let total_count = self.document_sync.total_count();
1410 self.notify(HiveEvent::DocumentSynced {
1411 from_node: delta.origin_node,
1412 total_count,
1413 });
1414 }
1415
1416 if relay_data.is_some() {
1418 let relay_targets = self.get_relay_targets(Some(source_node));
1419 self.notify(HiveEvent::MessageRelayed {
1420 origin_node: origin_node.unwrap_or(delta.origin_node),
1421 relay_count: relay_targets.len(),
1422 hop_count,
1423 });
1424 }
1425
1426 let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
1427 DataReceivedResult::peripheral_fields(&peer_peripheral);
1428
1429 Some(DataReceivedResult {
1430 source_node: delta.origin_node,
1431 is_emergency,
1432 is_ack,
1433 counter_changed,
1434 emergency_changed,
1435 total_count: self.document_sync.total_count(),
1436 event_timestamp,
1437 relay_data,
1438 origin_node,
1439 hop_count,
1440 callsign,
1441 battery_percent,
1442 heart_rate,
1443 event_type,
1444 latitude,
1445 longitude,
1446 altitude,
1447 })
1448 }
1449
1450 pub fn enable_peer_e2ee(&self) {
1458 let mut sessions = self.peer_sessions.lock().unwrap();
1459 if sessions.is_none() {
1460 *sessions = Some(PeerSessionManager::new(self.config.node_id));
1461 log::info!(
1462 "Per-peer E2EE enabled for node {:08X}",
1463 self.config.node_id.as_u32()
1464 );
1465 }
1466 }
1467
1468 pub fn disable_peer_e2ee(&self) {
1472 let mut sessions = self.peer_sessions.lock().unwrap();
1473 *sessions = None;
1474 log::info!("Per-peer E2EE disabled");
1475 }
1476
1477 pub fn is_peer_e2ee_enabled(&self) -> bool {
1479 self.peer_sessions.lock().unwrap().is_some()
1480 }
1481
1482 pub fn peer_e2ee_public_key(&self) -> Option<[u8; 32]> {
1486 self.peer_sessions
1487 .lock()
1488 .unwrap()
1489 .as_ref()
1490 .map(|s| s.our_public_key())
1491 }
1492
1493 pub fn initiate_peer_e2ee(&self, peer_node_id: NodeId, now_ms: u64) -> Option<Vec<u8>> {
1499 let mut sessions = self.peer_sessions.lock().unwrap();
1500 let session_mgr = sessions.as_mut()?;
1501
1502 let key_exchange = session_mgr.initiate_session(peer_node_id, now_ms);
1503 let mut buf = Vec::with_capacity(2 + 37);
1504 buf.push(KEY_EXCHANGE_MARKER);
1505 buf.push(0x00); buf.extend_from_slice(&key_exchange.encode());
1507
1508 log::info!(
1509 "Initiated E2EE session with peer {:08X}",
1510 peer_node_id.as_u32()
1511 );
1512 Some(buf)
1513 }
1514
1515 pub fn has_peer_e2ee_session(&self, peer_node_id: NodeId) -> bool {
1517 self.peer_sessions
1518 .lock()
1519 .unwrap()
1520 .as_ref()
1521 .is_some_and(|s| s.has_session(peer_node_id))
1522 }
1523
1524 pub fn peer_e2ee_session_state(&self, peer_node_id: NodeId) -> Option<SessionState> {
1526 self.peer_sessions
1527 .lock()
1528 .unwrap()
1529 .as_ref()
1530 .and_then(|s| s.session_state(peer_node_id))
1531 }
1532
1533 pub fn send_peer_e2ee(
1538 &self,
1539 peer_node_id: NodeId,
1540 plaintext: &[u8],
1541 now_ms: u64,
1542 ) -> Option<Vec<u8>> {
1543 let mut sessions = self.peer_sessions.lock().unwrap();
1544 let session_mgr = sessions.as_mut()?;
1545
1546 match session_mgr.encrypt_for_peer(peer_node_id, plaintext, now_ms) {
1547 Ok(encrypted) => {
1548 let mut buf = Vec::with_capacity(2 + encrypted.encode().len());
1549 buf.push(PEER_E2EE_MARKER);
1550 buf.push(0x00); buf.extend_from_slice(&encrypted.encode());
1552 Some(buf)
1553 }
1554 Err(e) => {
1555 log::warn!(
1556 "Failed to encrypt for peer {:08X}: {:?}",
1557 peer_node_id.as_u32(),
1558 e
1559 );
1560 None
1561 }
1562 }
1563 }
1564
1565 pub fn close_peer_e2ee(&self, peer_node_id: NodeId) {
1567 let mut sessions = self.peer_sessions.lock().unwrap();
1568 if let Some(session_mgr) = sessions.as_mut() {
1569 session_mgr.close_session(peer_node_id);
1570 self.notify(HiveEvent::PeerE2eeClosed { peer_node_id });
1571 log::info!(
1572 "Closed E2EE session with peer {:08X}",
1573 peer_node_id.as_u32()
1574 );
1575 }
1576 }
1577
1578 pub fn peer_e2ee_session_count(&self) -> usize {
1580 self.peer_sessions
1581 .lock()
1582 .unwrap()
1583 .as_ref()
1584 .map(|s| s.session_count())
1585 .unwrap_or(0)
1586 }
1587
1588 pub fn peer_e2ee_established_count(&self) -> usize {
1590 self.peer_sessions
1591 .lock()
1592 .unwrap()
1593 .as_ref()
1594 .map(|s| s.established_count())
1595 .unwrap_or(0)
1596 }
1597
1598 fn handle_key_exchange(&self, data: &[u8], now_ms: u64) -> Option<Vec<u8>> {
1603 if data.len() < 2 || data[0] != KEY_EXCHANGE_MARKER {
1604 return None;
1605 }
1606
1607 let payload = &data[2..];
1608 let msg = KeyExchangeMessage::decode(payload)?;
1609
1610 let mut sessions = self.peer_sessions.lock().unwrap();
1611 let session_mgr = sessions.as_mut()?;
1612
1613 let (response, established) = session_mgr.handle_key_exchange(&msg, now_ms)?;
1614
1615 if established {
1616 self.notify(HiveEvent::PeerE2eeEstablished {
1617 peer_node_id: msg.sender_node_id,
1618 });
1619 log::info!(
1620 "E2EE session established with peer {:08X}",
1621 msg.sender_node_id.as_u32()
1622 );
1623 }
1624
1625 let mut buf = Vec::with_capacity(2 + 37);
1627 buf.push(KEY_EXCHANGE_MARKER);
1628 buf.push(0x00);
1629 buf.extend_from_slice(&response.encode());
1630 Some(buf)
1631 }
1632
1633 fn handle_peer_e2ee_message(&self, data: &[u8], now_ms: u64) -> Option<Vec<u8>> {
1638 if data.len() < 2 || data[0] != PEER_E2EE_MARKER {
1639 return None;
1640 }
1641
1642 let payload = &data[2..];
1643 let msg = PeerEncryptedMessage::decode(payload)?;
1644
1645 let mut sessions = self.peer_sessions.lock().unwrap();
1646 let session_mgr = sessions.as_mut()?;
1647
1648 match session_mgr.decrypt_from_peer(&msg, now_ms) {
1649 Ok(plaintext) => {
1650 self.notify(HiveEvent::PeerE2eeMessageReceived {
1652 from_node: msg.sender_node_id,
1653 data: plaintext.clone(),
1654 });
1655 Some(plaintext)
1656 }
1657 Err(e) => {
1658 log::warn!(
1659 "Failed to decrypt E2EE message from {:08X}: {:?}",
1660 msg.sender_node_id.as_u32(),
1661 e
1662 );
1663 None
1664 }
1665 }
1666 }
1667
1668 pub fn node_id(&self) -> NodeId {
1672 self.config.node_id
1673 }
1674
1675 pub fn callsign(&self) -> &str {
1677 &self.config.callsign
1678 }
1679
1680 pub fn mesh_id(&self) -> &str {
1682 &self.config.mesh_id
1683 }
1684
1685 pub fn device_name(&self) -> String {
1687 format!(
1688 "HIVE_{}-{:08X}",
1689 self.config.mesh_id,
1690 self.config.node_id.as_u32()
1691 )
1692 }
1693
1694 pub fn get_peer_callsign(&self, node_id: NodeId) -> Option<String> {
1699 self.peer_peripherals.read().ok().and_then(|peripherals| {
1700 peripherals
1701 .get(&node_id)
1702 .map(|p| p.callsign_str().to_string())
1703 })
1704 }
1705
1706 pub fn get_peer_peripheral(&self, node_id: NodeId) -> Option<Peripheral> {
1711 self.peer_peripherals
1712 .read()
1713 .ok()
1714 .and_then(|peripherals| peripherals.get(&node_id).cloned())
1715 }
1716
1717 pub fn document_registry(&self) -> &DocumentRegistry {
1732 &self.document_registry
1733 }
1734
1735 pub fn store_app_document<T: crate::registry::DocumentType>(&self, doc: T) -> bool {
1742 let type_id = T::TYPE_ID;
1743 let (source_node, timestamp) = doc.identity();
1744 let key = (type_id, source_node, timestamp);
1745
1746 let mut docs = self.app_documents.write().unwrap();
1747
1748 if let Some(existing) = docs.get_mut(&key) {
1749 self.document_registry
1751 .merge(type_id, existing.as_mut(), &doc)
1752 } else {
1753 docs.insert(key, Box::new(doc));
1755 true
1756 }
1757 }
1758
1759 pub fn store_app_document_boxed(
1766 &self,
1767 type_id: u8,
1768 source_node: u32,
1769 timestamp: u64,
1770 doc: Box<dyn core::any::Any + Send + Sync>,
1771 ) -> bool {
1772 let key = (type_id, source_node, timestamp);
1773
1774 let mut docs = self.app_documents.write().unwrap();
1775
1776 if let Some(existing) = docs.get_mut(&key) {
1777 self.document_registry
1779 .merge(type_id, existing.as_mut(), doc.as_ref())
1780 } else {
1781 docs.insert(key, doc);
1783 true
1784 }
1785 }
1786
1787 pub fn get_app_document<T: crate::registry::DocumentType>(
1791 &self,
1792 source_node: u32,
1793 timestamp: u64,
1794 ) -> Option<T> {
1795 let key = (T::TYPE_ID, source_node, timestamp);
1796
1797 let docs = self.app_documents.read().unwrap();
1798 docs.get(&key).and_then(|d| d.downcast_ref::<T>()).cloned()
1799 }
1800
1801 pub fn get_all_app_documents_of_type<T: crate::registry::DocumentType>(&self) -> Vec<T> {
1805 let docs = self.app_documents.read().unwrap();
1806 docs.iter()
1807 .filter(|((type_id, _, _), _)| *type_id == T::TYPE_ID)
1808 .filter_map(|(_, doc)| doc.downcast_ref::<T>().cloned())
1809 .collect()
1810 }
1811
1812 pub fn app_document_delta_ops(&self) -> Vec<crate::registry::AppOperation> {
1816 let docs = self.app_documents.read().unwrap();
1817 let mut ops = Vec::new();
1818
1819 for ((type_id, _source, _ts), doc) in docs.iter() {
1820 if let Some(op) = self.document_registry.to_delta_op(*type_id, doc.as_ref()) {
1821 ops.push(op);
1822 }
1823 }
1824
1825 ops
1826 }
1827
1828 pub fn app_document_keys(&self, type_id: u8) -> Vec<(u32, u64)> {
1832 let docs = self.app_documents.read().unwrap();
1833 docs.keys()
1834 .filter(|(tid, _, _)| *tid == type_id)
1835 .map(|(_, source, ts)| (*source, *ts))
1836 .collect()
1837 }
1838
1839 pub fn app_document_count(&self) -> usize {
1841 self.app_documents.read().unwrap().len()
1842 }
1843
1844 pub fn add_observer(&self, observer: Arc<dyn HiveObserver>) {
1848 self.observers.add(observer);
1849 }
1850
1851 pub fn remove_observer(&self, observer: &Arc<dyn HiveObserver>) {
1853 self.observers.remove(observer);
1854 }
1855
1856 pub fn send_emergency(&self, timestamp: u64) -> Vec<u8> {
1863 let data = self.document_sync.send_emergency(timestamp);
1864 self.notify(HiveEvent::MeshStateChanged {
1865 peer_count: self.peer_manager.peer_count(),
1866 connected_count: self.peer_manager.connected_count(),
1867 });
1868 self.encrypt_document(&data)
1869 }
1870
1871 pub fn send_ack(&self, timestamp: u64) -> Vec<u8> {
1876 let data = self.document_sync.send_ack(timestamp);
1877 self.notify(HiveEvent::MeshStateChanged {
1878 peer_count: self.peer_manager.peer_count(),
1879 connected_count: self.peer_manager.connected_count(),
1880 });
1881 self.encrypt_document(&data)
1882 }
1883
1884 pub fn broadcast_bytes(&self, payload: &[u8]) -> Vec<u8> {
1891 self.encrypt_document(payload)
1892 }
1893
1894 pub fn clear_event(&self) {
1896 self.document_sync.clear_event();
1897 }
1898
1899 pub fn is_emergency_active(&self) -> bool {
1901 self.document_sync.is_emergency_active()
1902 }
1903
1904 pub fn is_ack_active(&self) -> bool {
1906 self.document_sync.is_ack_active()
1907 }
1908
1909 pub fn current_event(&self) -> Option<EventType> {
1911 self.document_sync.current_event()
1912 }
1913
1914 pub fn start_emergency(&self, timestamp: u64, known_peers: &[u32]) -> Vec<u8> {
1923 let data = self.document_sync.start_emergency(timestamp, known_peers);
1924 self.notify(HiveEvent::MeshStateChanged {
1925 peer_count: self.peer_manager.peer_count(),
1926 connected_count: self.peer_manager.connected_count(),
1927 });
1928 self.encrypt_document(&data)
1929 }
1930
1931 pub fn start_emergency_with_known_peers(&self, timestamp: u64) -> Vec<u8> {
1935 let peers: Vec<u32> = self
1936 .peer_manager
1937 .get_peers()
1938 .iter()
1939 .map(|p| p.node_id.as_u32())
1940 .collect();
1941 self.start_emergency(timestamp, &peers)
1942 }
1943
1944 pub fn ack_emergency(&self, timestamp: u64) -> Option<Vec<u8>> {
1949 let result = self.document_sync.ack_emergency(timestamp);
1950 if result.is_some() {
1951 self.notify(HiveEvent::MeshStateChanged {
1952 peer_count: self.peer_manager.peer_count(),
1953 connected_count: self.peer_manager.connected_count(),
1954 });
1955 }
1956 result.map(|data| self.encrypt_document(&data))
1957 }
1958
1959 pub fn clear_emergency(&self) {
1961 self.document_sync.clear_emergency();
1962 }
1963
1964 pub fn has_active_emergency(&self) -> bool {
1966 self.document_sync.has_active_emergency()
1967 }
1968
1969 pub fn get_emergency_status(&self) -> Option<(u32, u64, usize, usize)> {
1973 self.document_sync.get_emergency_status()
1974 }
1975
1976 pub fn has_peer_acked(&self, peer_id: u32) -> bool {
1978 self.document_sync.has_peer_acked(peer_id)
1979 }
1980
1981 pub fn all_peers_acked(&self) -> bool {
1983 self.document_sync.all_peers_acked()
1984 }
1985
1986 pub fn send_chat(&self, sender: &str, text: &str, timestamp: u64) -> Option<Vec<u8>> {
1996 if self.document_sync.add_chat_message(sender, text, timestamp) {
1997 Some(self.encrypt_document(&self.build_document()))
1998 } else {
1999 None
2000 }
2001 }
2002
2003 pub fn send_chat_reply(
2011 &self,
2012 sender: &str,
2013 text: &str,
2014 reply_to_node: u32,
2015 reply_to_timestamp: u64,
2016 timestamp: u64,
2017 ) -> Option<Vec<u8>> {
2018 if self.document_sync.add_chat_reply(
2019 sender,
2020 text,
2021 reply_to_node,
2022 reply_to_timestamp,
2023 timestamp,
2024 ) {
2025 Some(self.encrypt_document(&self.build_document()))
2026 } else {
2027 None
2028 }
2029 }
2030
2031 pub fn chat_count(&self) -> usize {
2033 self.document_sync.chat_count()
2034 }
2035
2036 pub fn chat_messages_since(
2040 &self,
2041 since_timestamp: u64,
2042 ) -> Vec<(u32, u64, String, String, u32, u64)> {
2043 self.document_sync.chat_messages_since(since_timestamp)
2044 }
2045
2046 pub fn all_chat_messages(&self) -> Vec<(u32, u64, String, String, u32, u64)> {
2050 self.document_sync.all_chat_messages()
2051 }
2052
2053 pub fn on_ble_discovered(
2059 &self,
2060 identifier: &str,
2061 name: Option<&str>,
2062 rssi: i8,
2063 mesh_id: Option<&str>,
2064 now_ms: u64,
2065 ) -> Option<HivePeer> {
2066 let (node_id, is_new) = self
2067 .peer_manager
2068 .on_discovered(identifier, name, rssi, mesh_id, now_ms)?;
2069
2070 let peer = self.peer_manager.get_peer(node_id)?;
2071
2072 {
2074 let mut graph = self.connection_graph.lock().unwrap();
2075 graph.on_discovered(
2076 node_id,
2077 identifier.to_string(),
2078 name.map(|s| s.to_string()),
2079 mesh_id.map(|s| s.to_string()),
2080 rssi,
2081 now_ms,
2082 );
2083 }
2084
2085 if is_new {
2086 self.notify(HiveEvent::PeerDiscovered { peer: peer.clone() });
2087 self.notify_mesh_state_changed();
2088 }
2089
2090 Some(peer)
2091 }
2092
2093 pub fn on_ble_connected(&self, identifier: &str, now_ms: u64) -> Option<NodeId> {
2097 let node_id = self.peer_manager.on_connected(identifier, now_ms)?;
2098
2099 {
2101 let mut graph = self.connection_graph.lock().unwrap();
2102 graph.on_connected(node_id, now_ms);
2103 }
2104
2105 self.register_peer_for_delta(&node_id);
2107
2108 self.notify(HiveEvent::PeerConnected { node_id });
2109 self.notify_mesh_state_changed();
2110 Some(node_id)
2111 }
2112
2113 pub fn on_ble_disconnected(
2115 &self,
2116 identifier: &str,
2117 reason: DisconnectReason,
2118 ) -> Option<NodeId> {
2119 let (node_id, observer_reason) = self.peer_manager.on_disconnected(identifier, reason)?;
2120
2121 {
2123 let mut graph = self.connection_graph.lock().unwrap();
2124 let platform_reason = match observer_reason {
2125 DisconnectReason::LocalRequest => crate::platform::DisconnectReason::LocalRequest,
2126 DisconnectReason::RemoteRequest => crate::platform::DisconnectReason::RemoteRequest,
2127 DisconnectReason::Timeout => crate::platform::DisconnectReason::Timeout,
2128 DisconnectReason::LinkLoss => crate::platform::DisconnectReason::LinkLoss,
2129 DisconnectReason::ConnectionFailed => {
2130 crate::platform::DisconnectReason::ConnectionFailed
2131 }
2132 DisconnectReason::Unknown => crate::platform::DisconnectReason::Unknown,
2133 };
2134 let now_ms = std::time::SystemTime::now()
2135 .duration_since(std::time::UNIX_EPOCH)
2136 .map(|d| d.as_millis() as u64)
2137 .unwrap_or(0);
2138 graph.on_disconnected(node_id, platform_reason, now_ms);
2139
2140 graph.remove_via_peer(node_id);
2143 }
2144
2145 self.unregister_peer_for_delta(&node_id);
2147
2148 self.notify(HiveEvent::PeerDisconnected {
2149 node_id,
2150 reason: observer_reason,
2151 });
2152 self.notify_mesh_state_changed();
2153 Some(node_id)
2154 }
2155
2156 pub fn on_peer_disconnected(&self, node_id: NodeId, reason: DisconnectReason) {
2160 if self
2161 .peer_manager
2162 .on_disconnected_by_node_id(node_id, reason)
2163 {
2164 {
2166 let mut graph = self.connection_graph.lock().unwrap();
2167 let platform_reason = match reason {
2168 DisconnectReason::LocalRequest => {
2169 crate::platform::DisconnectReason::LocalRequest
2170 }
2171 DisconnectReason::RemoteRequest => {
2172 crate::platform::DisconnectReason::RemoteRequest
2173 }
2174 DisconnectReason::Timeout => crate::platform::DisconnectReason::Timeout,
2175 DisconnectReason::LinkLoss => crate::platform::DisconnectReason::LinkLoss,
2176 DisconnectReason::ConnectionFailed => {
2177 crate::platform::DisconnectReason::ConnectionFailed
2178 }
2179 DisconnectReason::Unknown => crate::platform::DisconnectReason::Unknown,
2180 };
2181 let now_ms = std::time::SystemTime::now()
2182 .duration_since(std::time::UNIX_EPOCH)
2183 .map(|d| d.as_millis() as u64)
2184 .unwrap_or(0);
2185 graph.on_disconnected(node_id, platform_reason, now_ms);
2186
2187 graph.remove_via_peer(node_id);
2189 }
2190
2191 self.unregister_peer_for_delta(&node_id);
2193
2194 self.notify(HiveEvent::PeerDisconnected { node_id, reason });
2195 self.notify_mesh_state_changed();
2196 }
2197 }
2198
2199 pub fn on_incoming_connection(&self, identifier: &str, node_id: NodeId, now_ms: u64) -> bool {
2203 let is_new = self
2204 .peer_manager
2205 .on_incoming_connection(identifier, node_id, now_ms);
2206
2207 {
2209 let mut graph = self.connection_graph.lock().unwrap();
2210 if is_new {
2211 graph.on_discovered(
2212 node_id,
2213 identifier.to_string(),
2214 None,
2215 Some(self.config.mesh_id.clone()),
2216 -50, now_ms,
2218 );
2219 }
2220 graph.on_connected(node_id, now_ms);
2221 }
2222
2223 self.register_peer_for_delta(&node_id);
2225
2226 if is_new {
2227 if let Some(peer) = self.peer_manager.get_peer(node_id) {
2228 self.notify(HiveEvent::PeerDiscovered { peer });
2229 }
2230 }
2231
2232 self.notify(HiveEvent::PeerConnected { node_id });
2233 self.notify_mesh_state_changed();
2234
2235 is_new
2236 }
2237
2238 pub fn on_ble_data_received(
2245 &self,
2246 identifier: &str,
2247 data: &[u8],
2248 now_ms: u64,
2249 ) -> Option<DataReceivedResult> {
2250 let node_id = self.peer_manager.get_node_id(identifier)?;
2252
2253 if data.len() >= 2 {
2255 match data[0] {
2256 KEY_EXCHANGE_MARKER => {
2257 let _response = self.handle_key_exchange(data, now_ms);
2259 return None;
2261 }
2262 PEER_E2EE_MARKER => {
2263 let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
2265 return None;
2267 }
2268 RELAY_ENVELOPE_MARKER => {
2269 return self
2271 .handle_relay_envelope_with_identifier(node_id, identifier, data, now_ms);
2272 }
2273 _ => {}
2274 }
2275 }
2276
2277 self.process_document_data_with_identifier(node_id, identifier, data, now_ms, None, None, 0)
2279 }
2280
2281 #[allow(clippy::too_many_arguments)]
2283 fn process_document_data_with_identifier(
2284 &self,
2285 source_node: NodeId,
2286 identifier: &str,
2287 data: &[u8],
2288 now_ms: u64,
2289 relay_data: Option<Vec<u8>>,
2290 origin_node: Option<NodeId>,
2291 hop_count: u8,
2292 ) -> Option<DataReceivedResult> {
2293 let decrypted = self.decrypt_document(data, Some(identifier))?;
2295
2296 if DeltaDocument::is_delta_document(&decrypted) {
2298 return self.process_delta_document_internal(
2299 source_node,
2300 &decrypted,
2301 now_ms,
2302 relay_data,
2303 origin_node,
2304 hop_count,
2305 );
2306 }
2307
2308 let result = self.document_sync.merge_document(&decrypted)?;
2310
2311 if let Some(ref peripheral) = result.peer_peripheral {
2313 if let Ok(mut peripherals) = self.peer_peripherals.write() {
2314 peripherals.insert(result.source_node, peripheral.clone());
2315 }
2316 }
2317
2318 self.peer_manager.record_sync(source_node, now_ms);
2320
2321 if result.is_emergency() {
2323 self.notify(HiveEvent::EmergencyReceived {
2324 from_node: result.source_node,
2325 });
2326 } else if result.is_ack() {
2327 self.notify(HiveEvent::AckReceived {
2328 from_node: result.source_node,
2329 });
2330 }
2331
2332 if result.counter_changed {
2333 self.notify(HiveEvent::DocumentSynced {
2334 from_node: result.source_node,
2335 total_count: result.total_count,
2336 });
2337 }
2338
2339 if relay_data.is_some() {
2341 let relay_targets = self.get_relay_targets(Some(source_node));
2342 self.notify(HiveEvent::MessageRelayed {
2343 origin_node: origin_node.unwrap_or(result.source_node),
2344 relay_count: relay_targets.len(),
2345 hop_count,
2346 });
2347 }
2348
2349 let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2350 DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2351
2352 Some(DataReceivedResult {
2353 source_node: result.source_node,
2354 is_emergency: result.is_emergency(),
2355 is_ack: result.is_ack(),
2356 counter_changed: result.counter_changed,
2357 emergency_changed: result.emergency_changed,
2358 total_count: result.total_count,
2359 event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2360 relay_data,
2361 origin_node,
2362 hop_count,
2363 callsign,
2364 battery_percent,
2365 heart_rate,
2366 event_type,
2367 latitude,
2368 longitude,
2369 altitude,
2370 })
2371 }
2372
2373 fn handle_relay_envelope_with_identifier(
2375 &self,
2376 source_node: NodeId,
2377 identifier: &str,
2378 data: &[u8],
2379 now_ms: u64,
2380 ) -> Option<DataReceivedResult> {
2381 let envelope = RelayEnvelope::decode(data)?;
2383
2384 if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
2386 let stats = self
2387 .seen_cache
2388 .lock()
2389 .unwrap()
2390 .get_stats(&envelope.message_id);
2391 let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
2392
2393 self.notify(HiveEvent::DuplicateMessageDropped {
2394 origin_node: envelope.origin_node,
2395 seen_count,
2396 });
2397 return None;
2398 }
2399
2400 let relay_data = if envelope.can_relay() && self.config.enable_relay {
2402 envelope.relay().map(|e| e.encode())
2403 } else {
2404 if !envelope.can_relay() {
2405 self.notify(HiveEvent::MessageTtlExpired {
2406 origin_node: envelope.origin_node,
2407 hop_count: envelope.hop_count,
2408 });
2409 }
2410 None
2411 };
2412
2413 self.process_document_data_with_identifier(
2415 source_node,
2416 identifier,
2417 &envelope.payload,
2418 now_ms,
2419 relay_data,
2420 Some(envelope.origin_node),
2421 envelope.hop_count,
2422 )
2423 }
2424
2425 pub fn on_ble_data_received_from_node(
2432 &self,
2433 node_id: NodeId,
2434 data: &[u8],
2435 now_ms: u64,
2436 ) -> Option<DataReceivedResult> {
2437 if data.len() >= 2 {
2439 match data[0] {
2440 KEY_EXCHANGE_MARKER => {
2441 let _response = self.handle_key_exchange(data, now_ms);
2442 return None;
2443 }
2444 PEER_E2EE_MARKER => {
2445 let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
2446 return None;
2447 }
2448 RELAY_ENVELOPE_MARKER => {
2449 return self.handle_relay_envelope(node_id, data, now_ms);
2451 }
2452 _ => {}
2453 }
2454 }
2455
2456 self.process_document_data(node_id, data, now_ms, None, None, 0)
2458 }
2459
2460 pub fn on_ble_data_received_anonymous(
2470 &self,
2471 identifier: &str,
2472 data: &[u8],
2473 now_ms: u64,
2474 ) -> Option<DataReceivedResult> {
2475 log::debug!(
2476 "on_ble_data_received_anonymous: identifier={}, len={}, marker=0x{:02X}",
2477 identifier,
2478 data.len(),
2479 data.first().copied().unwrap_or(0)
2480 );
2481
2482 let decrypted = match self.decrypt_document(data, Some(identifier)) {
2484 Some(d) => d,
2485 None => {
2486 log::warn!(
2487 "on_ble_data_received_anonymous: decrypt/parse FAILED for {} byte doc from {}",
2488 data.len(),
2489 identifier
2490 );
2491 return None;
2492 }
2493 };
2494
2495 if decrypted.len() < 8 {
2498 log::warn!("Decrypted document too short to extract source_node");
2499 return None;
2500 }
2501
2502 let source_node_u32 =
2503 u32::from_le_bytes([decrypted[4], decrypted[5], decrypted[6], decrypted[7]]);
2504 let source_node = NodeId::new(source_node_u32);
2505
2506 log::info!(
2507 "Anonymous document from {}: source_node={:08X}, len={}",
2508 identifier,
2509 source_node_u32,
2510 decrypted.len()
2511 );
2512
2513 self.peer_manager
2516 .register_identifier(identifier, source_node);
2517
2518 let is_delta = DeltaDocument::is_delta_document(&decrypted);
2520 log::info!(
2521 "Document format: delta={}, first_byte=0x{:02X}, len={}",
2522 is_delta,
2523 decrypted.first().copied().unwrap_or(0),
2524 decrypted.len()
2525 );
2526
2527 if is_delta {
2528 return self.process_delta_document_internal(
2529 source_node,
2530 &decrypted,
2531 now_ms,
2532 None,
2533 None,
2534 0,
2535 );
2536 }
2537
2538 const APP_LAYER_MARKER: u8 = 0xAF;
2541 if decrypted.first().copied() == Some(APP_LAYER_MARKER) {
2542 #[cfg(feature = "hive-lite-sync")]
2543 {
2544 use crate::hive_lite_sync::CannedMessageDocument;
2545 use crate::registry::DocumentType;
2546
2547 log::info!(
2548 "App-layer message (0xAF) from {:08X}, {} bytes - storing in registry",
2549 source_node.as_u32(),
2550 decrypted.len()
2551 );
2552
2553 let payload = &decrypted[1..];
2556 if let Some(doc) = CannedMessageDocument::decode(payload) {
2557 let (doc_source, doc_ts) = doc.identity();
2558 let changed = self.store_app_document(doc);
2559 log::info!(
2560 "Stored CannedMessage: source={:08X} ts={} changed={}",
2561 doc_source,
2562 doc_ts,
2563 changed
2564 );
2565
2566 self.observers.notify(HiveEvent::app_document_received(
2568 CannedMessageDocument::TYPE_ID,
2569 NodeId::new(doc_source),
2570 doc_ts,
2571 changed,
2572 ));
2573
2574 return Some(DataReceivedResult {
2576 source_node,
2577 is_emergency: false,
2578 is_ack: false,
2579 counter_changed: false,
2580 emergency_changed: false,
2581 total_count: 0,
2582 event_timestamp: doc_ts,
2583 relay_data: None, origin_node: None,
2585 hop_count: 0,
2586 callsign: None,
2587 battery_percent: None,
2588 heart_rate: None,
2589 event_type: None,
2590 latitude: None,
2591 longitude: None,
2592 altitude: None,
2593 });
2594 } else {
2595 log::warn!("Failed to decode 0xAF message as CannedMessageDocument");
2596 }
2597 }
2598
2599 #[cfg(not(feature = "hive-lite-sync"))]
2600 {
2601 log::debug!("Ignoring 0xAF message (hive-lite-sync feature not enabled)");
2602 }
2603
2604 return None;
2605 }
2606
2607 log::info!(
2609 "Processing legacy document from {:08X}",
2610 source_node.as_u32()
2611 );
2612 let result = self.document_sync.merge_document(&decrypted)?;
2613
2614 log::info!(
2616 "Merge result: peer_peripheral={}, counter_changed={}",
2617 result.peer_peripheral.is_some(),
2618 result.counter_changed
2619 );
2620 if let Some(ref p) = result.peer_peripheral {
2621 log::info!("Peripheral callsign: '{}'", p.callsign_str());
2622 }
2623
2624 self.peer_manager.record_sync(source_node, now_ms);
2626
2627 if result.is_emergency() {
2629 self.notify(HiveEvent::EmergencyReceived {
2630 from_node: result.source_node,
2631 });
2632 } else if result.is_ack() {
2633 self.notify(HiveEvent::AckReceived {
2634 from_node: result.source_node,
2635 });
2636 }
2637
2638 if result.counter_changed {
2639 self.notify(HiveEvent::DocumentSynced {
2640 from_node: result.source_node,
2641 total_count: result.total_count,
2642 });
2643 }
2644
2645 let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2646 DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2647
2648 Some(DataReceivedResult {
2649 source_node: result.source_node,
2650 is_emergency: result.is_emergency(),
2651 is_ack: result.is_ack(),
2652 counter_changed: result.counter_changed,
2653 emergency_changed: result.emergency_changed,
2654 total_count: result.total_count,
2655 event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2656 relay_data: None,
2657 origin_node: None,
2658 hop_count: 0,
2659 callsign,
2660 battery_percent,
2661 heart_rate,
2662 event_type,
2663 latitude,
2664 longitude,
2665 altitude,
2666 })
2667 }
2668
2669 fn process_document_data(
2671 &self,
2672 source_node: NodeId,
2673 data: &[u8],
2674 now_ms: u64,
2675 relay_data: Option<Vec<u8>>,
2676 origin_node: Option<NodeId>,
2677 hop_count: u8,
2678 ) -> Option<DataReceivedResult> {
2679 let source_hint = format!("node:{:08X}", source_node.as_u32());
2681 let decrypted = self.decrypt_document(data, Some(&source_hint))?;
2682
2683 if DeltaDocument::is_delta_document(&decrypted) {
2685 return self.process_delta_document_internal(
2686 source_node,
2687 &decrypted,
2688 now_ms,
2689 relay_data,
2690 origin_node,
2691 hop_count,
2692 );
2693 }
2694
2695 let result = self.document_sync.merge_document(&decrypted)?;
2697
2698 if let Some(ref peripheral) = result.peer_peripheral {
2700 if let Ok(mut peripherals) = self.peer_peripherals.write() {
2701 peripherals.insert(result.source_node, peripheral.clone());
2702 }
2703 }
2704
2705 self.peer_manager.record_sync(source_node, now_ms);
2707
2708 if result.is_emergency() {
2710 self.notify(HiveEvent::EmergencyReceived {
2711 from_node: result.source_node,
2712 });
2713 } else if result.is_ack() {
2714 self.notify(HiveEvent::AckReceived {
2715 from_node: result.source_node,
2716 });
2717 }
2718
2719 if result.counter_changed {
2720 self.notify(HiveEvent::DocumentSynced {
2721 from_node: result.source_node,
2722 total_count: result.total_count,
2723 });
2724 }
2725
2726 if relay_data.is_some() {
2728 let relay_targets = self.get_relay_targets(Some(source_node));
2729 self.notify(HiveEvent::MessageRelayed {
2730 origin_node: origin_node.unwrap_or(result.source_node),
2731 relay_count: relay_targets.len(),
2732 hop_count,
2733 });
2734 }
2735
2736 let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2737 DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2738
2739 Some(DataReceivedResult {
2740 source_node: result.source_node,
2741 is_emergency: result.is_emergency(),
2742 is_ack: result.is_ack(),
2743 counter_changed: result.counter_changed,
2744 emergency_changed: result.emergency_changed,
2745 total_count: result.total_count,
2746 event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2747 relay_data,
2748 origin_node,
2749 hop_count,
2750 callsign,
2751 battery_percent,
2752 heart_rate,
2753 event_type,
2754 latitude,
2755 longitude,
2756 altitude,
2757 })
2758 }
2759
2760 fn handle_relay_envelope(
2762 &self,
2763 source_node: NodeId,
2764 data: &[u8],
2765 now_ms: u64,
2766 ) -> Option<DataReceivedResult> {
2767 let decision = self.process_relay_envelope(data, source_node, now_ms)?;
2769
2770 let relay_data = if decision.should_relay {
2772 decision.relay_data()
2773 } else {
2774 None
2775 };
2776
2777 self.process_document_data(
2779 source_node,
2780 &decision.payload,
2781 now_ms,
2782 relay_data,
2783 Some(decision.origin_node),
2784 decision.hop_count,
2785 )
2786 }
2787
2788 pub fn on_ble_data(
2797 &self,
2798 identifier: &str,
2799 data: &[u8],
2800 now_ms: u64,
2801 ) -> Option<DataReceivedResult> {
2802 if data.len() >= 2 {
2804 match data[0] {
2805 KEY_EXCHANGE_MARKER => {
2806 let _response = self.handle_key_exchange(data, now_ms);
2807 return None;
2808 }
2809 PEER_E2EE_MARKER => {
2810 let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
2811 return None;
2812 }
2813 RELAY_ENVELOPE_MARKER => {
2814 return self.handle_relay_envelope_with_incoming(identifier, data, now_ms);
2816 }
2817 _ => {}
2818 }
2819 }
2820
2821 self.process_incoming_document(identifier, data, now_ms, None, None, 0)
2823 }
2824
2825 fn process_incoming_document(
2827 &self,
2828 identifier: &str,
2829 data: &[u8],
2830 now_ms: u64,
2831 relay_data: Option<Vec<u8>>,
2832 origin_node: Option<NodeId>,
2833 hop_count: u8,
2834 ) -> Option<DataReceivedResult> {
2835 let decrypted = self.decrypt_document(data, Some(identifier))?;
2837
2838 let result = self.document_sync.merge_document(&decrypted)?;
2840
2841 self.peer_manager.record_sync(result.source_node, now_ms);
2843
2844 if origin_node.is_none() {
2849 let is_new =
2851 self.peer_manager
2852 .on_incoming_connection(identifier, result.source_node, now_ms);
2853
2854 {
2856 let mut graph = self.connection_graph.lock().unwrap();
2857 if is_new {
2858 graph.on_discovered(
2859 result.source_node,
2860 identifier.to_string(),
2861 None,
2862 Some(self.config.mesh_id.clone()),
2863 -50, now_ms,
2865 );
2866 }
2867 graph.on_connected(result.source_node, now_ms);
2868 }
2869 }
2870
2871 if result.is_emergency() {
2873 self.notify(HiveEvent::EmergencyReceived {
2874 from_node: result.source_node,
2875 });
2876 } else if result.is_ack() {
2877 self.notify(HiveEvent::AckReceived {
2878 from_node: result.source_node,
2879 });
2880 }
2881
2882 if result.counter_changed {
2883 self.notify(HiveEvent::DocumentSynced {
2884 from_node: result.source_node,
2885 total_count: result.total_count,
2886 });
2887 }
2888
2889 if relay_data.is_some() {
2891 let relay_targets = self.get_relay_targets(Some(result.source_node));
2892 self.notify(HiveEvent::MessageRelayed {
2893 origin_node: origin_node.unwrap_or(result.source_node),
2894 relay_count: relay_targets.len(),
2895 hop_count,
2896 });
2897 }
2898
2899 let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2900 DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2901
2902 Some(DataReceivedResult {
2903 source_node: result.source_node,
2904 is_emergency: result.is_emergency(),
2905 is_ack: result.is_ack(),
2906 counter_changed: result.counter_changed,
2907 emergency_changed: result.emergency_changed,
2908 total_count: result.total_count,
2909 event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2910 relay_data,
2911 origin_node,
2912 hop_count,
2913 callsign,
2914 battery_percent,
2915 heart_rate,
2916 event_type,
2917 latitude,
2918 longitude,
2919 altitude,
2920 })
2921 }
2922
2923 fn handle_relay_envelope_with_incoming(
2925 &self,
2926 identifier: &str,
2927 data: &[u8],
2928 now_ms: u64,
2929 ) -> Option<DataReceivedResult> {
2930 let envelope = RelayEnvelope::decode(data)?;
2932
2933 if let Some(source_peer) = self.peer_manager.get_node_id(identifier) {
2936 if envelope.origin_node != source_peer && envelope.origin_node != self.node_id() {
2937 let is_new = self.connection_graph.lock().unwrap().on_relay_received(
2938 source_peer,
2939 envelope.origin_node,
2940 envelope.hop_count,
2941 now_ms,
2942 );
2943
2944 if is_new {
2945 log::debug!(
2946 "Discovered indirect peer {:08X} via {:08X} ({} hops)",
2947 envelope.origin_node.as_u32(),
2948 source_peer.as_u32(),
2949 envelope.hop_count
2950 );
2951 }
2952 }
2953 }
2954
2955 if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
2957 let stats = self
2959 .seen_cache
2960 .lock()
2961 .unwrap()
2962 .get_stats(&envelope.message_id);
2963 let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
2964
2965 self.notify(HiveEvent::DuplicateMessageDropped {
2966 origin_node: envelope.origin_node,
2967 seen_count,
2968 });
2969 return None;
2970 }
2971
2972 let (should_relay, relay_data) = if envelope.can_relay() && self.config.enable_relay {
2974 let relay_env = envelope.relay();
2975 (true, relay_env.map(|e| e.encode()))
2976 } else {
2977 if !envelope.can_relay() {
2978 self.notify(HiveEvent::MessageTtlExpired {
2979 origin_node: envelope.origin_node,
2980 hop_count: envelope.hop_count,
2981 });
2982 }
2983 (false, None)
2984 };
2985
2986 self.process_incoming_document(
2988 identifier,
2989 &envelope.payload,
2990 now_ms,
2991 if should_relay { relay_data } else { None },
2992 Some(envelope.origin_node),
2993 envelope.hop_count,
2994 )
2995 }
2996
2997 pub fn tick(&self, now_ms: u64) -> Option<Vec<u8>> {
3007 use std::sync::atomic::Ordering;
3008
3009 let now_ms_32 = now_ms as u32;
3011
3012 let last_cleanup = self.last_cleanup_ms.load(Ordering::Relaxed);
3014 let cleanup_elapsed = now_ms_32.wrapping_sub(last_cleanup);
3015 if cleanup_elapsed >= self.config.peer_config.cleanup_interval_ms as u32 {
3016 self.last_cleanup_ms.store(now_ms_32, Ordering::Relaxed);
3017 let removed = self.peer_manager.cleanup_stale(now_ms);
3018 for node_id in &removed {
3019 self.notify(HiveEvent::PeerLost { node_id: *node_id });
3020 }
3021 if !removed.is_empty() {
3022 self.notify_mesh_state_changed();
3023 }
3024
3025 {
3027 let mut graph = self.connection_graph.lock().unwrap();
3028 let newly_lost = graph.tick(now_ms);
3029 graph.cleanup_lost(self.config.peer_config.peer_timeout_ms, now_ms);
3031 drop(graph);
3032
3033 for node_id in newly_lost {
3036 if !removed.contains(&node_id) {
3038 self.notify(HiveEvent::PeerLost { node_id });
3039 }
3040 }
3041 }
3042 }
3043
3044 let last_sync = self.last_sync_ms.load(Ordering::Relaxed);
3046 let sync_elapsed = now_ms_32.wrapping_sub(last_sync);
3047 if sync_elapsed >= self.config.sync_interval_ms as u32 {
3048 self.last_sync_ms.store(now_ms_32, Ordering::Relaxed);
3049 if self.peer_manager.connected_count() > 0 {
3051 let doc = self.document_sync.build_document();
3052 return Some(self.encrypt_document(&doc));
3053 }
3054 }
3055
3056 None
3057 }
3058
3059 pub fn tick_with_peer_deltas(&self, now_ms: u64) -> Vec<(NodeId, Vec<u8>)> {
3068 use std::sync::atomic::Ordering;
3069 let now_ms_32 = now_ms as u32;
3070
3071 let last_cleanup = self.last_cleanup_ms.load(Ordering::Relaxed);
3073 let cleanup_elapsed = now_ms_32.wrapping_sub(last_cleanup);
3074 if cleanup_elapsed >= self.config.peer_config.cleanup_interval_ms as u32 {
3075 self.last_cleanup_ms.store(now_ms_32, Ordering::Relaxed);
3076 let removed = self.peer_manager.cleanup_stale(now_ms);
3077 for node_id in &removed {
3078 self.notify(HiveEvent::PeerLost { node_id: *node_id });
3079 }
3080 if !removed.is_empty() {
3081 self.notify_mesh_state_changed();
3082 }
3083
3084 {
3086 let mut graph = self.connection_graph.lock().unwrap();
3087 let newly_lost = graph.tick(now_ms);
3088 graph.cleanup_lost(self.config.peer_config.peer_timeout_ms, now_ms);
3089 drop(graph);
3090
3091 for node_id in newly_lost {
3092 if !removed.contains(&node_id) {
3093 self.notify(HiveEvent::PeerLost { node_id });
3094 }
3095 }
3096 }
3097 }
3098
3099 let last_sync = self.last_sync_ms.load(Ordering::Relaxed);
3101 let sync_elapsed = now_ms_32.wrapping_sub(last_sync);
3102 if sync_elapsed >= self.config.sync_interval_ms as u32 {
3103 self.last_sync_ms.store(now_ms_32, Ordering::Relaxed);
3104
3105 let doc = self.document_sync.build_document();
3107 let encrypted = self.encrypt_document(&doc);
3108 let mut results = Vec::new();
3109 for peer in self.get_connected_peers() {
3110 results.push((peer.node_id, encrypted.clone()));
3111 }
3112 return results;
3113 }
3114
3115 Vec::new()
3116 }
3117
3118 pub fn get_peers(&self) -> Vec<HivePeer> {
3122 self.peer_manager.get_peers()
3123 }
3124
3125 pub fn get_connected_peers(&self) -> Vec<HivePeer> {
3127 self.peer_manager.get_connected_peers()
3128 }
3129
3130 pub fn get_peer(&self, node_id: NodeId) -> Option<HivePeer> {
3132 self.peer_manager.get_peer(node_id)
3133 }
3134
3135 pub fn peer_count(&self) -> usize {
3137 self.peer_manager.peer_count()
3138 }
3139
3140 pub fn connected_count(&self) -> usize {
3142 self.peer_manager.connected_count()
3143 }
3144
3145 pub fn matches_mesh(&self, device_mesh_id: Option<&str>) -> bool {
3147 self.peer_manager.matches_mesh(device_mesh_id)
3148 }
3149
3150 pub fn get_connection_graph(&self) -> Vec<PeerConnectionState> {
3174 self.connection_graph.lock().unwrap().get_all_owned()
3175 }
3176
3177 pub fn get_peer_connection_state(&self, node_id: NodeId) -> Option<PeerConnectionState> {
3179 self.connection_graph
3180 .lock()
3181 .unwrap()
3182 .get_peer(node_id)
3183 .cloned()
3184 }
3185
3186 pub fn get_connected_states(&self) -> Vec<PeerConnectionState> {
3188 self.connection_graph
3189 .lock()
3190 .unwrap()
3191 .get_connected()
3192 .into_iter()
3193 .cloned()
3194 .collect()
3195 }
3196
3197 pub fn get_degraded_peers(&self) -> Vec<PeerConnectionState> {
3199 self.connection_graph
3200 .lock()
3201 .unwrap()
3202 .get_degraded()
3203 .into_iter()
3204 .cloned()
3205 .collect()
3206 }
3207
3208 pub fn get_recently_disconnected(
3212 &self,
3213 within_ms: u64,
3214 now_ms: u64,
3215 ) -> Vec<PeerConnectionState> {
3216 self.connection_graph
3217 .lock()
3218 .unwrap()
3219 .get_recently_disconnected(within_ms, now_ms)
3220 .into_iter()
3221 .cloned()
3222 .collect()
3223 }
3224
3225 pub fn get_lost_peers(&self) -> Vec<PeerConnectionState> {
3227 self.connection_graph
3228 .lock()
3229 .unwrap()
3230 .get_lost()
3231 .into_iter()
3232 .cloned()
3233 .collect()
3234 }
3235
3236 pub fn get_connection_state_counts(&self) -> StateCountSummary {
3238 self.connection_graph.lock().unwrap().state_counts()
3239 }
3240
3241 pub fn get_indirect_peers(&self) -> Vec<IndirectPeer> {
3249 self.connection_graph
3250 .lock()
3251 .unwrap()
3252 .get_indirect_peers_owned()
3253 }
3254
3255 pub fn get_peer_degree(&self, node_id: NodeId) -> Option<PeerDegree> {
3262 self.connection_graph.lock().unwrap().peer_degree(node_id)
3263 }
3264
3265 pub fn get_full_state_counts(&self) -> FullStateCountSummary {
3270 self.connection_graph.lock().unwrap().full_state_counts()
3271 }
3272
3273 pub fn get_paths_to_peer(&self, node_id: NodeId) -> Vec<(NodeId, u8)> {
3278 self.connection_graph.lock().unwrap().get_paths_to(node_id)
3279 }
3280
3281 pub fn is_peer_known(&self, node_id: NodeId) -> bool {
3283 self.connection_graph.lock().unwrap().is_known(node_id)
3284 }
3285
3286 pub fn indirect_peer_count(&self) -> usize {
3288 self.connection_graph.lock().unwrap().indirect_peer_count()
3289 }
3290
3291 pub fn cleanup_indirect_peers(&self, now_ms: u64) -> Vec<NodeId> {
3296 self.connection_graph
3297 .lock()
3298 .unwrap()
3299 .cleanup_indirect(now_ms)
3300 }
3301
3302 pub fn total_count(&self) -> u64 {
3304 self.document_sync.total_count()
3305 }
3306
3307 pub fn document_version(&self) -> u32 {
3309 self.document_sync.version()
3310 }
3311
3312 pub fn version(&self) -> u32 {
3314 self.document_sync.version()
3315 }
3316
3317 pub fn update_health(&self, battery_percent: u8) {
3319 self.document_sync.update_health(battery_percent);
3320 }
3321
3322 pub fn update_activity(&self, activity: u8) {
3324 self.document_sync.update_activity(activity);
3325 }
3326
3327 pub fn update_health_full(&self, battery_percent: u8, activity: u8) {
3329 self.document_sync
3330 .update_health_full(battery_percent, activity);
3331 }
3332
3333 pub fn update_heart_rate(&self, heart_rate: u8) {
3335 self.document_sync.update_heart_rate(heart_rate);
3336 }
3337
3338 pub fn update_location(&self, latitude: f32, longitude: f32, altitude: Option<f32>) {
3340 self.document_sync
3341 .update_location(latitude, longitude, altitude);
3342 }
3343
3344 pub fn clear_location(&self) {
3346 self.document_sync.clear_location();
3347 }
3348
3349 pub fn update_callsign(&self, callsign: &str) {
3351 self.document_sync.update_callsign(callsign);
3352 }
3353
3354 pub fn set_peripheral_event(&self, event_type: EventType, timestamp: u64) {
3356 self.document_sync
3357 .set_peripheral_event(event_type, timestamp);
3358 }
3359
3360 pub fn clear_peripheral_event(&self) {
3362 self.document_sync.clear_peripheral_event();
3363 }
3364
3365 #[allow(clippy::too_many_arguments)]
3370 pub fn update_peripheral_state(
3371 &self,
3372 callsign: &str,
3373 battery_percent: u8,
3374 heart_rate: Option<u8>,
3375 latitude: Option<f32>,
3376 longitude: Option<f32>,
3377 altitude: Option<f32>,
3378 event_type: Option<EventType>,
3379 timestamp: u64,
3380 ) {
3381 self.document_sync.update_peripheral_state(
3382 callsign,
3383 battery_percent,
3384 heart_rate,
3385 latitude,
3386 longitude,
3387 altitude,
3388 event_type,
3389 timestamp,
3390 );
3391 }
3392
3393 pub fn build_document(&self) -> Vec<u8> {
3397 let doc = self.document_sync.build_document();
3398 self.encrypt_document(&doc)
3399 }
3400
3401 pub fn peers_needing_sync(&self, now_ms: u64) -> Vec<HivePeer> {
3403 self.peer_manager.peers_needing_sync(now_ms)
3404 }
3405
3406 fn notify(&self, event: HiveEvent) {
3409 self.observers.notify(event);
3410 }
3411
3412 fn notify_mesh_state_changed(&self) {
3413 self.notify(HiveEvent::MeshStateChanged {
3414 peer_count: self.peer_manager.peer_count(),
3415 connected_count: self.peer_manager.connected_count(),
3416 });
3417 }
3418
3419 pub fn check_canned_message(&self, source_node: u32, timestamp: u64, _ttl_ms: u64) -> bool {
3439 let mut id_bytes = [0u8; 16];
3442 id_bytes[0..4].copy_from_slice(&source_node.to_le_bytes());
3443 id_bytes[4..12].copy_from_slice(×tamp.to_le_bytes());
3444 let message_id = crate::relay::MessageId::from_bytes(id_bytes);
3445
3446 let seen = self.seen_cache.lock().unwrap();
3448 !seen.has_seen(&message_id)
3449 }
3450
3451 pub fn mark_canned_message_seen(&self, source_node: u32, timestamp: u64) {
3456 let now = std::time::SystemTime::now()
3457 .duration_since(std::time::UNIX_EPOCH)
3458 .map(|d| d.as_millis() as u64)
3459 .unwrap_or(0);
3460
3461 let mut id_bytes = [0u8; 16];
3463 id_bytes[0..4].copy_from_slice(&source_node.to_le_bytes());
3464 id_bytes[4..12].copy_from_slice(×tamp.to_le_bytes());
3465 let message_id = crate::relay::MessageId::from_bytes(id_bytes);
3466 let origin = NodeId::new(source_node);
3467
3468 let mut seen = self.seen_cache.lock().unwrap();
3469 seen.mark_seen(message_id, origin, now);
3470 }
3471
3472 pub fn get_connected_peer_identifiers(&self) -> Vec<String> {
3477 self.peer_manager.get_connected_identifiers()
3478 }
3479}
3480
3481#[derive(Debug, Clone)]
3483pub struct DataReceivedResult {
3484 pub source_node: NodeId,
3486
3487 pub is_emergency: bool,
3489
3490 pub is_ack: bool,
3492
3493 pub counter_changed: bool,
3495
3496 pub emergency_changed: bool,
3498
3499 pub total_count: u64,
3501
3502 pub event_timestamp: u64,
3504
3505 pub relay_data: Option<Vec<u8>>,
3510
3511 pub origin_node: Option<NodeId>,
3513
3514 pub hop_count: u8,
3516
3517 pub callsign: Option<String>,
3520
3521 pub battery_percent: Option<u8>,
3523
3524 pub heart_rate: Option<u8>,
3526
3527 pub event_type: Option<u8>,
3529
3530 pub latitude: Option<f32>,
3532
3533 pub longitude: Option<f32>,
3535
3536 pub altitude: Option<f32>,
3538}
3539
3540impl DataReceivedResult {
3541 #[allow(clippy::type_complexity)]
3543 fn peripheral_fields(
3544 peripheral: &Option<crate::sync::crdt::Peripheral>,
3545 ) -> (
3546 Option<String>,
3547 Option<u8>,
3548 Option<u8>,
3549 Option<u8>,
3550 Option<f32>,
3551 Option<f32>,
3552 Option<f32>,
3553 ) {
3554 match peripheral {
3555 Some(p) => {
3556 let callsign = {
3557 let s = p.callsign_str();
3558 if s.is_empty() {
3559 None
3560 } else {
3561 Some(s.to_string())
3562 }
3563 };
3564 let battery = if p.health.battery_percent > 0 {
3565 Some(p.health.battery_percent)
3566 } else {
3567 None
3568 };
3569 let heart_rate = p.health.heart_rate;
3570 let event_type = p.last_event.as_ref().map(|e| e.event_type as u8);
3571 let (lat, lon, alt) = match &p.location {
3572 Some(loc) => (Some(loc.latitude), Some(loc.longitude), loc.altitude),
3573 None => (None, None, None),
3574 };
3575 (callsign, battery, heart_rate, event_type, lat, lon, alt)
3576 }
3577 None => (None, None, None, None, None, None, None),
3578 }
3579 }
3580}
3581
3582#[derive(Debug, Clone)]
3584pub struct RelayDecision {
3585 pub payload: Vec<u8>,
3587
3588 pub origin_node: NodeId,
3590
3591 pub hop_count: u8,
3593
3594 pub should_relay: bool,
3596
3597 pub relay_envelope: Option<RelayEnvelope>,
3601}
3602
3603impl RelayDecision {
3604 pub fn relay_data(&self) -> Option<Vec<u8>> {
3608 self.relay_envelope.as_ref().map(|e| e.encode())
3609 }
3610}
3611
3612#[cfg(all(test, feature = "std"))]
3613mod tests {
3614 use super::*;
3615 use crate::observer::CollectingObserver;
3616
3617 const TEST_TIMESTAMP: u64 = 1705276800000;
3619
3620 fn create_mesh(node_id: u32, callsign: &str) -> HiveMesh {
3621 let config = HiveMeshConfig::new(NodeId::new(node_id), callsign, "TEST");
3622 HiveMesh::new(config)
3623 }
3624
3625 #[test]
3626 fn test_mesh_creation() {
3627 let mesh = create_mesh(0x12345678, "ALPHA-1");
3628
3629 assert_eq!(mesh.node_id().as_u32(), 0x12345678);
3630 assert_eq!(mesh.callsign(), "ALPHA-1");
3631 assert_eq!(mesh.mesh_id(), "TEST");
3632 assert_eq!(mesh.device_name(), "HIVE_TEST-12345678");
3633 }
3634
3635 #[test]
3636 fn test_peer_discovery() {
3637 let mesh = create_mesh(0x11111111, "ALPHA-1");
3638 let observer = Arc::new(CollectingObserver::new());
3639 mesh.add_observer(observer.clone());
3640
3641 let peer = mesh.on_ble_discovered(
3643 "device-uuid",
3644 Some("HIVE_TEST-22222222"),
3645 -65,
3646 Some("TEST"),
3647 1000,
3648 );
3649
3650 assert!(peer.is_some());
3651 let peer = peer.unwrap();
3652 assert_eq!(peer.node_id.as_u32(), 0x22222222);
3653
3654 let events = observer.events();
3656 assert!(events
3657 .iter()
3658 .any(|e| matches!(e, HiveEvent::PeerDiscovered { .. })));
3659 assert!(events
3660 .iter()
3661 .any(|e| matches!(e, HiveEvent::MeshStateChanged { .. })));
3662 }
3663
3664 #[test]
3665 fn test_connection_lifecycle() {
3666 let mesh = create_mesh(0x11111111, "ALPHA-1");
3667 let observer = Arc::new(CollectingObserver::new());
3668 mesh.add_observer(observer.clone());
3669
3670 mesh.on_ble_discovered(
3672 "device-uuid",
3673 Some("HIVE_TEST-22222222"),
3674 -65,
3675 Some("TEST"),
3676 1000,
3677 );
3678
3679 let node_id = mesh.on_ble_connected("device-uuid", 2000);
3680 assert_eq!(node_id, Some(NodeId::new(0x22222222)));
3681 assert_eq!(mesh.connected_count(), 1);
3682
3683 let node_id = mesh.on_ble_disconnected("device-uuid", DisconnectReason::RemoteRequest);
3685 assert_eq!(node_id, Some(NodeId::new(0x22222222)));
3686 assert_eq!(mesh.connected_count(), 0);
3687
3688 let events = observer.events();
3690 assert!(events
3691 .iter()
3692 .any(|e| matches!(e, HiveEvent::PeerConnected { .. })));
3693 assert!(events
3694 .iter()
3695 .any(|e| matches!(e, HiveEvent::PeerDisconnected { .. })));
3696 }
3697
3698 #[test]
3699 fn test_emergency_flow() {
3700 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
3701 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
3702
3703 let observer2 = Arc::new(CollectingObserver::new());
3704 mesh2.add_observer(observer2.clone());
3705
3706 let doc = mesh1.send_emergency(TEST_TIMESTAMP);
3708 assert!(mesh1.is_emergency_active());
3709
3710 let result =
3712 mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
3713
3714 assert!(result.is_some());
3715 let result = result.unwrap();
3716 assert!(result.is_emergency);
3717 assert_eq!(result.source_node.as_u32(), 0x11111111);
3718
3719 let events = observer2.events();
3721 assert!(events
3722 .iter()
3723 .any(|e| matches!(e, HiveEvent::EmergencyReceived { .. })));
3724 }
3725
3726 #[test]
3727 fn test_ack_flow() {
3728 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
3729 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
3730
3731 let observer2 = Arc::new(CollectingObserver::new());
3732 mesh2.add_observer(observer2.clone());
3733
3734 let doc = mesh1.send_ack(TEST_TIMESTAMP);
3736 assert!(mesh1.is_ack_active());
3737
3738 let result =
3740 mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
3741
3742 assert!(result.is_some());
3743 let result = result.unwrap();
3744 assert!(result.is_ack);
3745
3746 let events = observer2.events();
3748 assert!(events
3749 .iter()
3750 .any(|e| matches!(e, HiveEvent::AckReceived { .. })));
3751 }
3752
3753 #[test]
3754 fn test_tick_cleanup() {
3755 let config = HiveMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
3756 .with_peer_timeout(10_000);
3757 let mesh = HiveMesh::new(config);
3758
3759 let observer = Arc::new(CollectingObserver::new());
3760 mesh.add_observer(observer.clone());
3761
3762 mesh.on_ble_discovered(
3764 "device-uuid",
3765 Some("HIVE_TEST-22222222"),
3766 -65,
3767 Some("TEST"),
3768 1000,
3769 );
3770 assert_eq!(mesh.peer_count(), 1);
3771
3772 mesh.tick(5000);
3774 assert_eq!(mesh.peer_count(), 1);
3775
3776 mesh.tick(20000);
3778 assert_eq!(mesh.peer_count(), 0);
3779
3780 let events = observer.events();
3782 assert!(events
3783 .iter()
3784 .any(|e| matches!(e, HiveEvent::PeerLost { .. })));
3785 }
3786
3787 #[test]
3788 fn test_tick_sync_broadcast() {
3789 let config = HiveMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
3790 .with_sync_interval(5000);
3791 let mesh = HiveMesh::new(config);
3792
3793 mesh.on_ble_discovered(
3795 "device-uuid",
3796 Some("HIVE_TEST-22222222"),
3797 -65,
3798 Some("TEST"),
3799 1000,
3800 );
3801 mesh.on_ble_connected("device-uuid", 1000);
3802
3803 let _result = mesh.tick(0);
3805 let result = mesh.tick(3000);
3809 assert!(result.is_none());
3810
3811 let result = mesh.tick(6000);
3813 assert!(result.is_some());
3814
3815 let result = mesh.tick(6100);
3817 assert!(result.is_none());
3818
3819 let result = mesh.tick(12000);
3821 assert!(result.is_some());
3822 }
3823
3824 #[test]
3825 fn test_incoming_connection() {
3826 let mesh = create_mesh(0x11111111, "ALPHA-1");
3827 let observer = Arc::new(CollectingObserver::new());
3828 mesh.add_observer(observer.clone());
3829
3830 let is_new = mesh.on_incoming_connection("central-uuid", NodeId::new(0x22222222), 1000);
3832
3833 assert!(is_new);
3834 assert_eq!(mesh.peer_count(), 1);
3835 assert_eq!(mesh.connected_count(), 1);
3836
3837 let events = observer.events();
3839 assert!(events
3840 .iter()
3841 .any(|e| matches!(e, HiveEvent::PeerDiscovered { .. })));
3842 assert!(events
3843 .iter()
3844 .any(|e| matches!(e, HiveEvent::PeerConnected { .. })));
3845 }
3846
3847 #[test]
3848 fn test_mesh_filtering() {
3849 let mesh = create_mesh(0x11111111, "ALPHA-1");
3850
3851 let peer = mesh.on_ble_discovered(
3853 "device-uuid-1",
3854 Some("HIVE_OTHER-22222222"),
3855 -65,
3856 Some("OTHER"),
3857 1000,
3858 );
3859 assert!(peer.is_none());
3860 assert_eq!(mesh.peer_count(), 0);
3861
3862 let peer = mesh.on_ble_discovered(
3864 "device-uuid-2",
3865 Some("HIVE_TEST-33333333"),
3866 -65,
3867 Some("TEST"),
3868 1000,
3869 );
3870 assert!(peer.is_some());
3871 assert_eq!(mesh.peer_count(), 1);
3872 }
3873
3874 fn create_encrypted_mesh(node_id: u32, callsign: &str, secret: [u8; 32]) -> HiveMesh {
3877 let config =
3878 HiveMeshConfig::new(NodeId::new(node_id), callsign, "TEST").with_encryption(secret);
3879 HiveMesh::new(config)
3880 }
3881
3882 #[test]
3883 fn test_encryption_enabled() {
3884 let secret = [0x42u8; 32];
3885 let mesh = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
3886
3887 assert!(mesh.is_encryption_enabled());
3888 }
3889
3890 #[test]
3891 fn test_encryption_disabled_by_default() {
3892 let mesh = create_mesh(0x11111111, "ALPHA-1");
3893
3894 assert!(!mesh.is_encryption_enabled());
3895 }
3896
3897 #[test]
3898 fn test_encrypted_document_exchange() {
3899 let secret = [0x42u8; 32];
3900 let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
3901 let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret);
3902
3903 let doc = mesh1.build_document();
3905
3906 assert!(doc.len() >= 2);
3908 assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
3909
3910 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
3912
3913 assert!(result.is_some());
3914 let result = result.unwrap();
3915 assert_eq!(result.source_node.as_u32(), 0x11111111);
3916 }
3917
3918 #[test]
3919 fn test_encrypted_emergency_exchange() {
3920 let secret = [0x42u8; 32];
3921 let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
3922 let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret);
3923
3924 let observer = Arc::new(CollectingObserver::new());
3925 mesh2.add_observer(observer.clone());
3926
3927 let doc = mesh1.send_emergency(TEST_TIMESTAMP);
3929
3930 let result =
3932 mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
3933
3934 assert!(result.is_some());
3935 let result = result.unwrap();
3936 assert!(result.is_emergency);
3937
3938 let events = observer.events();
3940 assert!(events
3941 .iter()
3942 .any(|e| matches!(e, HiveEvent::EmergencyReceived { .. })));
3943 }
3944
3945 #[test]
3946 fn test_wrong_key_fails_decrypt() {
3947 let secret1 = [0x42u8; 32];
3948 let secret2 = [0x43u8; 32]; let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret1);
3950 let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret2);
3951
3952 let doc = mesh1.build_document();
3954
3955 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
3957
3958 assert!(result.is_none());
3959 }
3960
3961 #[test]
3962 fn test_unencrypted_mesh_can_read_unencrypted() {
3963 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
3964 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
3965
3966 let doc = mesh1.build_document();
3968
3969 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
3971
3972 assert!(result.is_some());
3973 }
3974
3975 #[test]
3976 fn test_encrypted_mesh_can_receive_unencrypted() {
3977 let secret = [0x42u8; 32];
3979 let mesh1 = create_mesh(0x11111111, "ALPHA-1"); let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret); let doc = mesh1.build_document();
3984
3985 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
3987
3988 assert!(result.is_some());
3989 }
3990
3991 #[test]
3992 fn test_unencrypted_mesh_cannot_receive_encrypted() {
3993 let secret = [0x42u8; 32];
3994 let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret); let mesh2 = create_mesh(0x22222222, "BRAVO-1"); let doc = mesh1.build_document();
3999
4000 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4002
4003 assert!(result.is_none());
4004 }
4005
4006 #[test]
4007 fn test_enable_disable_encryption() {
4008 let mut mesh = create_mesh(0x11111111, "ALPHA-1");
4009
4010 assert!(!mesh.is_encryption_enabled());
4011
4012 let secret = [0x42u8; 32];
4014 mesh.enable_encryption(&secret);
4015 assert!(mesh.is_encryption_enabled());
4016
4017 let doc = mesh.build_document();
4019 assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
4020
4021 mesh.disable_encryption();
4023 assert!(!mesh.is_encryption_enabled());
4024
4025 let doc = mesh.build_document();
4027 assert_ne!(doc[0], crate::document::ENCRYPTED_MARKER);
4028 }
4029
4030 #[test]
4031 fn test_encryption_overhead() {
4032 let secret = [0x42u8; 32];
4033 let mesh_encrypted = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4034 let mesh_unencrypted = create_mesh(0x22222222, "BRAVO-1");
4035
4036 let doc_encrypted = mesh_encrypted.build_document();
4037 let doc_unencrypted = mesh_unencrypted.build_document();
4038
4039 let overhead = doc_encrypted.len() - doc_unencrypted.len();
4045 assert_eq!(overhead, 30); }
4047
4048 #[test]
4051 fn test_peer_e2ee_enable_disable() {
4052 let mesh = create_mesh(0x11111111, "ALPHA-1");
4053
4054 assert!(!mesh.is_peer_e2ee_enabled());
4055 assert!(mesh.peer_e2ee_public_key().is_none());
4056
4057 mesh.enable_peer_e2ee();
4058 assert!(mesh.is_peer_e2ee_enabled());
4059 assert!(mesh.peer_e2ee_public_key().is_some());
4060
4061 mesh.disable_peer_e2ee();
4062 assert!(!mesh.is_peer_e2ee_enabled());
4063 }
4064
4065 #[test]
4066 fn test_peer_e2ee_initiate_session() {
4067 let mesh = create_mesh(0x11111111, "ALPHA-1");
4068 mesh.enable_peer_e2ee();
4069
4070 let key_exchange = mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4071 assert!(key_exchange.is_some());
4072
4073 let key_exchange = key_exchange.unwrap();
4074 assert_eq!(key_exchange[0], crate::document::KEY_EXCHANGE_MARKER);
4076
4077 assert_eq!(mesh.peer_e2ee_session_count(), 1);
4079 assert_eq!(mesh.peer_e2ee_established_count(), 0);
4080 }
4081
4082 #[test]
4083 fn test_peer_e2ee_full_handshake() {
4084 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4085 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4086
4087 mesh1.enable_peer_e2ee();
4088 mesh2.enable_peer_e2ee();
4089
4090 let observer1 = Arc::new(CollectingObserver::new());
4091 let observer2 = Arc::new(CollectingObserver::new());
4092 mesh1.add_observer(observer1.clone());
4093 mesh2.add_observer(observer2.clone());
4094
4095 let key_exchange1 = mesh1
4097 .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4098 .unwrap();
4099
4100 let response = mesh2.handle_key_exchange(&key_exchange1, 1000);
4102 assert!(response.is_some());
4103
4104 assert!(mesh2.has_peer_e2ee_session(NodeId::new(0x11111111)));
4106
4107 let key_exchange2 = response.unwrap();
4109 let _ = mesh1.handle_key_exchange(&key_exchange2, 1000);
4110
4111 assert!(mesh1.has_peer_e2ee_session(NodeId::new(0x22222222)));
4113
4114 let events1 = observer1.events();
4116 assert!(events1
4117 .iter()
4118 .any(|e| matches!(e, HiveEvent::PeerE2eeEstablished { .. })));
4119
4120 let events2 = observer2.events();
4121 assert!(events2
4122 .iter()
4123 .any(|e| matches!(e, HiveEvent::PeerE2eeEstablished { .. })));
4124 }
4125
4126 #[test]
4127 fn test_peer_e2ee_encrypt_decrypt() {
4128 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4129 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4130
4131 mesh1.enable_peer_e2ee();
4132 mesh2.enable_peer_e2ee();
4133
4134 let key_exchange1 = mesh1
4136 .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4137 .unwrap();
4138 let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4139 mesh1.handle_key_exchange(&key_exchange2, 1000);
4140
4141 let plaintext = b"Secret message from mesh1";
4143 let encrypted = mesh1.send_peer_e2ee(NodeId::new(0x22222222), plaintext, 2000);
4144 assert!(encrypted.is_some());
4145
4146 let encrypted = encrypted.unwrap();
4147 assert_eq!(encrypted[0], crate::document::PEER_E2EE_MARKER);
4149
4150 let observer2 = Arc::new(CollectingObserver::new());
4152 mesh2.add_observer(observer2.clone());
4153
4154 let decrypted = mesh2.handle_peer_e2ee_message(&encrypted, 2000);
4155 assert!(decrypted.is_some());
4156 assert_eq!(decrypted.unwrap(), plaintext);
4157
4158 let events = observer2.events();
4160 assert!(events.iter().any(|e| matches!(
4161 e,
4162 HiveEvent::PeerE2eeMessageReceived { from_node, data }
4163 if from_node.as_u32() == 0x11111111 && data == plaintext
4164 )));
4165 }
4166
4167 #[test]
4168 fn test_peer_e2ee_bidirectional() {
4169 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4170 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4171
4172 mesh1.enable_peer_e2ee();
4173 mesh2.enable_peer_e2ee();
4174
4175 let key_exchange1 = mesh1
4177 .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4178 .unwrap();
4179 let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4180 mesh1.handle_key_exchange(&key_exchange2, 1000);
4181
4182 let msg1 = mesh1
4184 .send_peer_e2ee(NodeId::new(0x22222222), b"Hello from mesh1", 2000)
4185 .unwrap();
4186 let dec1 = mesh2.handle_peer_e2ee_message(&msg1, 2000).unwrap();
4187 assert_eq!(dec1, b"Hello from mesh1");
4188
4189 let msg2 = mesh2
4191 .send_peer_e2ee(NodeId::new(0x11111111), b"Hello from mesh2", 2000)
4192 .unwrap();
4193 let dec2 = mesh1.handle_peer_e2ee_message(&msg2, 2000).unwrap();
4194 assert_eq!(dec2, b"Hello from mesh2");
4195 }
4196
4197 #[test]
4198 fn test_peer_e2ee_close_session() {
4199 let mesh = create_mesh(0x11111111, "ALPHA-1");
4200 mesh.enable_peer_e2ee();
4201
4202 let observer = Arc::new(CollectingObserver::new());
4203 mesh.add_observer(observer.clone());
4204
4205 mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4207 assert_eq!(mesh.peer_e2ee_session_count(), 1);
4208
4209 mesh.close_peer_e2ee(NodeId::new(0x22222222));
4211
4212 let events = observer.events();
4214 assert!(events
4215 .iter()
4216 .any(|e| matches!(e, HiveEvent::PeerE2eeClosed { .. })));
4217 }
4218
4219 #[test]
4220 fn test_peer_e2ee_without_enabling() {
4221 let mesh = create_mesh(0x11111111, "ALPHA-1");
4222
4223 let result = mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4225 assert!(result.is_none());
4226
4227 let result = mesh.send_peer_e2ee(NodeId::new(0x22222222), b"test", 1000);
4228 assert!(result.is_none());
4229
4230 assert!(!mesh.has_peer_e2ee_session(NodeId::new(0x22222222)));
4231 }
4232
4233 #[test]
4234 fn test_peer_e2ee_overhead() {
4235 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4236 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4237
4238 mesh1.enable_peer_e2ee();
4239 mesh2.enable_peer_e2ee();
4240
4241 let key_exchange1 = mesh1
4243 .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4244 .unwrap();
4245 let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4246 mesh1.handle_key_exchange(&key_exchange2, 1000);
4247
4248 let plaintext = b"Test message";
4250 let encrypted = mesh1
4251 .send_peer_e2ee(NodeId::new(0x22222222), plaintext, 2000)
4252 .unwrap();
4253
4254 let overhead = encrypted.len() - plaintext.len();
4263 assert_eq!(overhead, 46);
4264 }
4265
4266 fn create_strict_encrypted_mesh(node_id: u32, callsign: &str, secret: [u8; 32]) -> HiveMesh {
4269 let config = HiveMeshConfig::new(NodeId::new(node_id), callsign, "TEST")
4270 .with_encryption(secret)
4271 .with_strict_encryption();
4272 HiveMesh::new(config)
4273 }
4274
4275 #[test]
4276 fn test_strict_encryption_enabled() {
4277 let secret = [0x42u8; 32];
4278 let mesh = create_strict_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4279
4280 assert!(mesh.is_encryption_enabled());
4281 assert!(mesh.is_strict_encryption_enabled());
4282 }
4283
4284 #[test]
4285 fn test_strict_encryption_disabled_by_default() {
4286 let secret = [0x42u8; 32];
4287 let mesh = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4288
4289 assert!(mesh.is_encryption_enabled());
4290 assert!(!mesh.is_strict_encryption_enabled());
4291 }
4292
4293 #[test]
4294 fn test_strict_encryption_requires_encryption_enabled() {
4295 let config = HiveMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4297 .with_strict_encryption(); let mesh = HiveMesh::new(config);
4299
4300 assert!(!mesh.is_encryption_enabled());
4301 assert!(!mesh.is_strict_encryption_enabled());
4302 }
4303
4304 #[test]
4305 fn test_strict_mode_accepts_encrypted_documents() {
4306 let secret = [0x42u8; 32];
4307 let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4308 let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4309
4310 let doc = mesh1.build_document();
4312 assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
4313
4314 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4316 assert!(result.is_some());
4317 }
4318
4319 #[test]
4320 fn test_strict_mode_rejects_unencrypted_documents() {
4321 let secret = [0x42u8; 32];
4322 let mesh1 = create_mesh(0x11111111, "ALPHA-1"); let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret); let observer = Arc::new(CollectingObserver::new());
4326 mesh2.add_observer(observer.clone());
4327
4328 let doc = mesh1.build_document();
4330 assert_ne!(doc[0], crate::document::ENCRYPTED_MARKER);
4331
4332 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4334 assert!(result.is_none());
4335
4336 let events = observer.events();
4338 assert!(events.iter().any(|e| matches!(
4339 e,
4340 HiveEvent::SecurityViolation {
4341 kind: crate::observer::SecurityViolationKind::UnencryptedInStrictMode,
4342 ..
4343 }
4344 )));
4345 }
4346
4347 #[test]
4348 fn test_non_strict_mode_accepts_unencrypted_documents() {
4349 let secret = [0x42u8; 32];
4350 let mesh1 = create_mesh(0x11111111, "ALPHA-1"); let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret); let doc = mesh1.build_document();
4355
4356 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4358 assert!(result.is_some());
4359 }
4360
4361 #[test]
4362 fn test_strict_mode_security_violation_event_includes_source() {
4363 let secret = [0x42u8; 32];
4364 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4365 let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4366
4367 let observer = Arc::new(CollectingObserver::new());
4368 mesh2.add_observer(observer.clone());
4369
4370 let doc = mesh1.build_document();
4371
4372 mesh2.on_ble_discovered(
4374 "test-device-uuid",
4375 Some("HIVE_TEST-11111111"),
4376 -65,
4377 Some("TEST"),
4378 500,
4379 );
4380 mesh2.on_ble_connected("test-device-uuid", 600);
4381
4382 let _result = mesh2.on_ble_data_received("test-device-uuid", &doc, 1000);
4383
4384 let events = observer.events();
4386 let violation = events.iter().find(|e| {
4387 matches!(
4388 e,
4389 HiveEvent::SecurityViolation {
4390 kind: crate::observer::SecurityViolationKind::UnencryptedInStrictMode,
4391 ..
4392 }
4393 )
4394 });
4395 assert!(violation.is_some());
4396
4397 if let Some(HiveEvent::SecurityViolation { source, .. }) = violation {
4398 assert!(source.is_some());
4399 assert_eq!(source.as_ref().unwrap(), "test-device-uuid");
4400 }
4401 }
4402
4403 #[test]
4404 fn test_decryption_failure_emits_security_violation() {
4405 let secret1 = [0x42u8; 32];
4406 let secret2 = [0x43u8; 32]; let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret1);
4408 let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret2);
4409
4410 let observer = Arc::new(CollectingObserver::new());
4411 mesh2.add_observer(observer.clone());
4412
4413 let doc = mesh1.build_document();
4415
4416 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4418 assert!(result.is_none());
4419
4420 let events = observer.events();
4422 assert!(events.iter().any(|e| matches!(
4423 e,
4424 HiveEvent::SecurityViolation {
4425 kind: crate::observer::SecurityViolationKind::DecryptionFailed,
4426 ..
4427 }
4428 )));
4429 }
4430
4431 #[test]
4432 fn test_strict_mode_builder_chain() {
4433 let secret = [0x42u8; 32];
4434 let config = HiveMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4435 .with_encryption(secret)
4436 .with_strict_encryption()
4437 .with_sync_interval(10_000)
4438 .with_peer_timeout(60_000);
4439
4440 let mesh = HiveMesh::new(config);
4441
4442 assert!(mesh.is_encryption_enabled());
4443 assert!(mesh.is_strict_encryption_enabled());
4444 }
4445
4446 fn create_relay_mesh(node_id: u32, callsign: &str) -> HiveMesh {
4449 let config = HiveMeshConfig::new(NodeId::new(node_id), callsign, "TEST").with_relay();
4450 HiveMesh::new(config)
4451 }
4452
4453 #[test]
4454 fn test_relay_disabled_by_default() {
4455 let mesh = create_mesh(0x11111111, "ALPHA-1");
4456 assert!(!mesh.is_relay_enabled());
4457 }
4458
4459 #[test]
4460 fn test_relay_enabled() {
4461 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4462 assert!(mesh.is_relay_enabled());
4463 }
4464
4465 #[test]
4466 fn test_relay_config_builder() {
4467 let config = HiveMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4468 .with_relay()
4469 .with_max_relay_hops(5)
4470 .with_relay_fanout(3)
4471 .with_seen_cache_ttl(60_000);
4472
4473 assert!(config.enable_relay);
4474 assert_eq!(config.max_relay_hops, 5);
4475 assert_eq!(config.relay_fanout, 3);
4476 assert_eq!(config.seen_cache_ttl_ms, 60_000);
4477 }
4478
4479 #[test]
4480 fn test_seen_message_deduplication() {
4481 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4482 let origin = NodeId::new(0x22222222);
4483 let msg_id = crate::relay::MessageId::from_content(origin, 1000, 0xDEADBEEF);
4484
4485 assert!(mesh.mark_message_seen(msg_id, origin, 1000));
4487
4488 assert!(!mesh.mark_message_seen(msg_id, origin, 2000));
4490
4491 assert_eq!(mesh.seen_cache_size(), 1);
4492 }
4493
4494 #[test]
4495 fn test_wrap_for_relay() {
4496 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4497
4498 let payload = vec![1, 2, 3, 4, 5];
4499 let wrapped = mesh.wrap_for_relay(payload.clone());
4500
4501 assert_eq!(wrapped[0], crate::relay::RELAY_ENVELOPE_MARKER);
4503
4504 let envelope = crate::relay::RelayEnvelope::decode(&wrapped).unwrap();
4506 assert_eq!(envelope.payload, payload);
4507 assert_eq!(envelope.origin_node, NodeId::new(0x11111111));
4508 assert_eq!(envelope.hop_count, 0);
4509 }
4510
4511 #[test]
4512 fn test_process_relay_envelope_new_message() {
4513 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4514 let observer = Arc::new(CollectingObserver::new());
4515 mesh.add_observer(observer.clone());
4516
4517 let payload = vec![1, 2, 3, 4, 5];
4519 let envelope =
4520 crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload.clone())
4521 .with_max_hops(7);
4522 let data = envelope.encode();
4523
4524 let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
4526
4527 assert!(decision.is_some());
4528 let decision = decision.unwrap();
4529 assert_eq!(decision.payload, payload);
4530 assert_eq!(decision.origin_node.as_u32(), 0x22222222);
4531 assert_eq!(decision.hop_count, 0);
4532 assert!(decision.should_relay);
4533 assert!(decision.relay_envelope.is_some());
4534
4535 let relay_env = decision.relay_envelope.unwrap();
4537 assert_eq!(relay_env.hop_count, 1);
4538 }
4539
4540 #[test]
4541 fn test_process_relay_envelope_duplicate() {
4542 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4543 let observer = Arc::new(CollectingObserver::new());
4544 mesh.add_observer(observer.clone());
4545
4546 let payload = vec![1, 2, 3, 4, 5];
4547 let envelope = crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload);
4548 let data = envelope.encode();
4549
4550 let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
4552 assert!(decision.is_some());
4553
4554 let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 2000);
4556 assert!(decision.is_none());
4557
4558 let events = observer.events();
4560 assert!(events
4561 .iter()
4562 .any(|e| matches!(e, HiveEvent::DuplicateMessageDropped { .. })));
4563 }
4564
4565 #[test]
4566 fn test_process_relay_envelope_ttl_expired() {
4567 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4568 let observer = Arc::new(CollectingObserver::new());
4569 mesh.add_observer(observer.clone());
4570
4571 let payload = vec![1, 2, 3, 4, 5];
4573 let mut envelope =
4574 crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload.clone())
4575 .with_max_hops(3);
4576
4577 envelope = envelope.relay().unwrap(); envelope = envelope.relay().unwrap(); envelope = envelope.relay().unwrap(); let data = envelope.encode();
4583
4584 let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
4586
4587 assert!(decision.is_some());
4588 let decision = decision.unwrap();
4589 assert_eq!(decision.payload, payload);
4590 assert!(!decision.should_relay); assert!(decision.relay_envelope.is_none());
4592
4593 let events = observer.events();
4595 assert!(events
4596 .iter()
4597 .any(|e| matches!(e, HiveEvent::MessageTtlExpired { .. })));
4598 }
4599
4600 #[test]
4601 fn test_build_relay_document() {
4602 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4603
4604 let relay_doc = mesh.build_relay_document();
4605
4606 assert_eq!(relay_doc[0], crate::relay::RELAY_ENVELOPE_MARKER);
4608
4609 let envelope = crate::relay::RelayEnvelope::decode(&relay_doc).unwrap();
4611 assert_eq!(envelope.origin_node.as_u32(), 0x11111111);
4612
4613 let doc = crate::document::HiveDocument::decode(&envelope.payload);
4615 assert!(doc.is_some());
4616 }
4617
4618 #[test]
4619 fn test_relay_targets_excludes_source() {
4620 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4621
4622 mesh.on_ble_discovered(
4624 "peer-1",
4625 Some("HIVE_TEST-22222222"),
4626 -60,
4627 Some("TEST"),
4628 1000,
4629 );
4630 mesh.on_ble_connected("peer-1", 1000);
4631
4632 mesh.on_ble_discovered(
4633 "peer-2",
4634 Some("HIVE_TEST-33333333"),
4635 -65,
4636 Some("TEST"),
4637 1000,
4638 );
4639 mesh.on_ble_connected("peer-2", 1000);
4640
4641 mesh.on_ble_discovered(
4642 "peer-3",
4643 Some("HIVE_TEST-44444444"),
4644 -70,
4645 Some("TEST"),
4646 1000,
4647 );
4648 mesh.on_ble_connected("peer-3", 1000);
4649
4650 let targets = mesh.get_relay_targets(Some(NodeId::new(0x33333333)));
4652
4653 assert!(targets.iter().all(|p| p.node_id.as_u32() != 0x33333333));
4655 }
4656
4657 #[test]
4658 fn test_clear_seen_cache() {
4659 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4660 let origin = NodeId::new(0x22222222);
4661
4662 mesh.mark_message_seen(
4664 crate::relay::MessageId::from_content(origin, 1000, 0x11111111),
4665 origin,
4666 1000,
4667 );
4668 mesh.mark_message_seen(
4669 crate::relay::MessageId::from_content(origin, 2000, 0x22222222),
4670 origin,
4671 2000,
4672 );
4673
4674 assert_eq!(mesh.seen_cache_size(), 2);
4675
4676 mesh.clear_seen_cache();
4678 assert_eq!(mesh.seen_cache_size(), 0);
4679 }
4680}