1#[cfg(not(feature = "std"))]
57use alloc::{string::String, sync::Arc, vec::Vec};
58#[cfg(feature = "std")]
59use std::collections::HashMap;
60#[cfg(feature = "std")]
61use std::sync::Arc;
62
63use crate::document::{ENCRYPTED_MARKER, KEY_EXCHANGE_MARKER, PEER_E2EE_MARKER};
64use crate::document_sync::DocumentSync;
65use crate::gossip::{GossipStrategy, RandomFanout};
66use crate::observer::{DisconnectReason, HiveEvent, HiveObserver, SecurityViolationKind};
67use crate::peer::{
68 ConnectionStateGraph, FullStateCountSummary, HivePeer, IndirectPeer, PeerConnectionState,
69 PeerDegree, PeerManagerConfig, StateCountSummary,
70};
71use crate::peer_manager::PeerManager;
72use crate::relay::{
73 MessageId, RelayEnvelope, SeenMessageCache, DEFAULT_MAX_HOPS, DEFAULT_SEEN_TTL_MS,
74 RELAY_ENVELOPE_MARKER,
75};
76use crate::security::{
77 DeviceIdentity, IdentityAttestation, IdentityRegistry, KeyExchangeMessage, MeshEncryptionKey,
78 PeerEncryptedMessage, PeerSessionManager, RegistryResult, SessionState,
79};
80use crate::sync::crdt::{EventType, Peripheral, PeripheralType};
81use crate::sync::delta::{DeltaEncoder, DeltaStats};
82use crate::sync::delta_document::{DeltaDocument, Operation};
83use crate::NodeId;
84
85#[cfg(feature = "std")]
86use crate::observer::ObserverManager;
87
88use crate::registry::DocumentRegistry;
89
90#[derive(Debug, Clone)]
92pub struct HiveMeshConfig {
93 pub node_id: NodeId,
95
96 pub callsign: String,
98
99 pub mesh_id: String,
101
102 pub peripheral_type: PeripheralType,
104
105 pub peer_config: PeerManagerConfig,
107
108 pub sync_interval_ms: u64,
110
111 pub auto_broadcast_events: bool,
113
114 pub encryption_secret: Option<[u8; 32]>,
120
121 pub strict_encryption: bool,
129
130 pub enable_relay: bool,
137
138 pub max_relay_hops: u8,
143
144 pub relay_fanout: usize,
150
151 pub seen_cache_ttl_ms: u64,
156}
157
158impl HiveMeshConfig {
159 pub fn new(node_id: NodeId, callsign: &str, mesh_id: &str) -> Self {
161 Self {
162 node_id,
163 callsign: callsign.into(),
164 mesh_id: mesh_id.into(),
165 peripheral_type: PeripheralType::SoldierSensor,
166 peer_config: PeerManagerConfig::with_mesh_id(mesh_id),
167 sync_interval_ms: 5000,
168 auto_broadcast_events: true,
169 encryption_secret: None,
170 strict_encryption: false,
171 enable_relay: false,
172 max_relay_hops: DEFAULT_MAX_HOPS,
173 relay_fanout: 2,
174 seen_cache_ttl_ms: DEFAULT_SEEN_TTL_MS,
175 }
176 }
177
178 pub fn with_encryption(mut self, secret: [u8; 32]) -> Self {
183 self.encryption_secret = Some(secret);
184 self
185 }
186
187 pub fn with_peripheral_type(mut self, ptype: PeripheralType) -> Self {
189 self.peripheral_type = ptype;
190 self
191 }
192
193 pub fn with_sync_interval(mut self, interval_ms: u64) -> Self {
195 self.sync_interval_ms = interval_ms;
196 self
197 }
198
199 pub fn with_peer_timeout(mut self, timeout_ms: u64) -> Self {
201 self.peer_config.peer_timeout_ms = timeout_ms;
202 self
203 }
204
205 pub fn with_max_peers(mut self, max: usize) -> Self {
207 self.peer_config.max_peers = max;
208 self
209 }
210
211 pub fn with_strict_encryption(mut self) -> Self {
219 self.strict_encryption = true;
220 self
221 }
222
223 pub fn with_relay(mut self) -> Self {
228 self.enable_relay = true;
229 self
230 }
231
232 pub fn with_max_relay_hops(mut self, max_hops: u8) -> Self {
236 self.max_relay_hops = max_hops;
237 self
238 }
239
240 pub fn with_relay_fanout(mut self, fanout: usize) -> Self {
244 self.relay_fanout = fanout.max(1);
245 self
246 }
247
248 pub fn with_seen_cache_ttl(mut self, ttl_ms: u64) -> Self {
252 self.seen_cache_ttl_ms = ttl_ms;
253 self
254 }
255}
256
257#[cfg(feature = "std")]
259type AppDocumentStore =
260 std::sync::RwLock<HashMap<(u8, u32, u64), Box<dyn core::any::Any + Send + Sync>>>;
261
262#[cfg(feature = "std")]
267pub struct HiveMesh {
268 config: HiveMeshConfig,
270
271 peer_manager: PeerManager,
273
274 document_sync: DocumentSync,
276
277 observers: ObserverManager,
279
280 last_sync_ms: std::sync::atomic::AtomicU32,
282
283 last_cleanup_ms: std::sync::atomic::AtomicU32,
285
286 encryption_key: Option<MeshEncryptionKey>,
288
289 peer_sessions: std::sync::Mutex<Option<PeerSessionManager>>,
291
292 connection_graph: std::sync::Mutex<ConnectionStateGraph>,
294
295 seen_cache: std::sync::Mutex<SeenMessageCache>,
297
298 gossip_strategy: Box<dyn GossipStrategy>,
300
301 delta_encoder: std::sync::Mutex<DeltaEncoder>,
306
307 identity: Option<DeviceIdentity>,
312
313 identity_registry: std::sync::Mutex<IdentityRegistry>,
317
318 peer_peripherals: std::sync::RwLock<HashMap<NodeId, Peripheral>>,
323
324 document_registry: DocumentRegistry,
329
330 app_documents: AppDocumentStore,
336}
337
338#[cfg(feature = "std")]
339impl HiveMesh {
340 pub fn new(config: HiveMeshConfig) -> Self {
342 let peer_manager = PeerManager::new(config.node_id, config.peer_config.clone());
343 let document_sync = DocumentSync::with_peripheral_type(
344 config.node_id,
345 &config.callsign,
346 config.peripheral_type,
347 );
348
349 let encryption_key = config
351 .encryption_secret
352 .map(|secret| MeshEncryptionKey::from_shared_secret(&config.mesh_id, &secret));
353
354 let connection_graph = ConnectionStateGraph::with_config(
356 config.peer_config.rssi_degraded_threshold,
357 config.peer_config.lost_timeout_ms,
358 );
359
360 let seen_cache = SeenMessageCache::with_ttl(config.seen_cache_ttl_ms);
362
363 let gossip_strategy: Box<dyn GossipStrategy> =
365 Box::new(RandomFanout::new(config.relay_fanout));
366
367 let delta_encoder = DeltaEncoder::new(config.node_id);
369
370 let document_registry = DocumentRegistry::new();
372 #[cfg(feature = "hive-lite-sync")]
373 {
374 use crate::hive_lite_sync::CannedMessageDocument;
375 document_registry.register::<CannedMessageDocument>();
376 log::info!("Auto-registered CannedMessageDocument (0xC0) for hive-lite sync");
377 }
378
379 Self {
380 config,
381 peer_manager,
382 document_sync,
383 observers: ObserverManager::new(),
384 last_sync_ms: std::sync::atomic::AtomicU32::new(0),
385 last_cleanup_ms: std::sync::atomic::AtomicU32::new(0),
386 encryption_key,
387 peer_sessions: std::sync::Mutex::new(None),
388 connection_graph: std::sync::Mutex::new(connection_graph),
389 seen_cache: std::sync::Mutex::new(seen_cache),
390 gossip_strategy,
391 delta_encoder: std::sync::Mutex::new(delta_encoder),
392 identity: None,
393 identity_registry: std::sync::Mutex::new(IdentityRegistry::new()),
394 peer_peripherals: std::sync::RwLock::new(HashMap::new()),
395 document_registry,
396 app_documents: std::sync::RwLock::new(HashMap::new()),
397 }
398 }
399
400 pub fn with_identity(config: HiveMeshConfig, identity: DeviceIdentity) -> Self {
406 let mut config = config;
408 config.node_id = identity.node_id();
409
410 let peer_manager = PeerManager::new(config.node_id, config.peer_config.clone());
411 let document_sync = DocumentSync::with_peripheral_type(
412 config.node_id,
413 &config.callsign,
414 config.peripheral_type,
415 );
416
417 let encryption_key = config
418 .encryption_secret
419 .map(|secret| MeshEncryptionKey::from_shared_secret(&config.mesh_id, &secret));
420
421 let connection_graph = ConnectionStateGraph::with_config(
422 config.peer_config.rssi_degraded_threshold,
423 config.peer_config.lost_timeout_ms,
424 );
425
426 let seen_cache = SeenMessageCache::with_ttl(config.seen_cache_ttl_ms);
427 let gossip_strategy: Box<dyn GossipStrategy> =
428 Box::new(RandomFanout::new(config.relay_fanout));
429 let delta_encoder = DeltaEncoder::new(config.node_id);
430
431 let document_registry = DocumentRegistry::new();
433 #[cfg(feature = "hive-lite-sync")]
434 {
435 use crate::hive_lite_sync::CannedMessageDocument;
436 document_registry.register::<CannedMessageDocument>();
437 log::info!("Auto-registered CannedMessageDocument (0xC0) for hive-lite sync");
438 }
439
440 Self {
441 config,
442 peer_manager,
443 document_sync,
444 observers: ObserverManager::new(),
445 last_sync_ms: std::sync::atomic::AtomicU32::new(0),
446 last_cleanup_ms: std::sync::atomic::AtomicU32::new(0),
447 encryption_key,
448 peer_sessions: std::sync::Mutex::new(None),
449 connection_graph: std::sync::Mutex::new(connection_graph),
450 seen_cache: std::sync::Mutex::new(seen_cache),
451 gossip_strategy,
452 delta_encoder: std::sync::Mutex::new(delta_encoder),
453 identity: Some(identity),
454 identity_registry: std::sync::Mutex::new(IdentityRegistry::new()),
455 peer_peripherals: std::sync::RwLock::new(HashMap::new()),
456 document_registry,
457 app_documents: std::sync::RwLock::new(HashMap::new()),
458 }
459 }
460
461 pub fn from_genesis(
469 genesis: &crate::security::MeshGenesis,
470 identity: DeviceIdentity,
471 callsign: &str,
472 ) -> Self {
473 let config = HiveMeshConfig::new(identity.node_id(), callsign, &genesis.mesh_id())
474 .with_encryption(genesis.encryption_secret());
475
476 Self::with_identity(config, identity)
477 }
478
479 #[cfg(feature = "std")]
505 pub fn from_persisted(
506 state: crate::security::PersistedState,
507 callsign: &str,
508 ) -> Result<Self, crate::security::PersistenceError> {
509 let identity = state.restore_identity()?;
511
512 let genesis = state.restore_genesis();
514
515 let mesh = if let Some(ref gen) = genesis {
517 Self::from_genesis(gen, identity, callsign)
518 } else {
519 let config = HiveMeshConfig::new(identity.node_id(), callsign, "RESTORED");
520 Self::with_identity(config, identity)
521 };
522
523 let restored_registry = state.restore_registry();
525 if let Ok(mut registry) = mesh.identity_registry.lock() {
526 *registry = restored_registry;
527 }
528
529 log::info!(
530 "HiveMesh restored from persisted state: node_id={:08X}, known_peers={}",
531 mesh.config.node_id.as_u32(),
532 mesh.known_identity_count()
533 );
534
535 Ok(mesh)
536 }
537
538 #[cfg(feature = "std")]
551 pub fn to_persisted_state(
552 &self,
553 genesis: Option<&crate::security::MeshGenesis>,
554 ) -> Option<crate::security::PersistedState> {
555 let identity = self.identity.as_ref()?;
556 let registry = self.identity_registry.lock().ok()?;
557
558 Some(crate::security::PersistedState::with_registry(
559 identity, genesis, ®istry,
560 ))
561 }
562
563 pub fn is_encryption_enabled(&self) -> bool {
567 self.encryption_key.is_some()
568 }
569
570 pub fn is_strict_encryption_enabled(&self) -> bool {
574 self.config.strict_encryption && self.encryption_key.is_some()
575 }
576
577 pub fn enable_encryption(&mut self, secret: &[u8; 32]) {
582 self.encryption_key = Some(MeshEncryptionKey::from_shared_secret(
583 &self.config.mesh_id,
584 secret,
585 ));
586 }
587
588 pub fn disable_encryption(&mut self) {
590 self.encryption_key = None;
591 }
592
593 fn encrypt_document(&self, plaintext: &[u8]) -> Vec<u8> {
598 match &self.encryption_key {
599 Some(key) => {
600 match key.encrypt_to_bytes(plaintext) {
602 Ok(ciphertext) => {
603 let mut buf = Vec::with_capacity(2 + ciphertext.len());
604 buf.push(ENCRYPTED_MARKER);
605 buf.push(0x00); buf.extend_from_slice(&ciphertext);
607 buf
608 }
609 Err(e) => {
610 log::error!("Encryption failed: {}", e);
611 plaintext.to_vec()
613 }
614 }
615 }
616 None => plaintext.to_vec(),
617 }
618 }
619
620 fn decrypt_document<'a>(
628 &self,
629 data: &'a [u8],
630 source_hint: Option<&str>,
631 ) -> Option<std::borrow::Cow<'a, [u8]>> {
632 log::debug!(
633 "decrypt_document: len={}, first_byte=0x{:02X}, source={:?}",
634 data.len(),
635 data.first().copied().unwrap_or(0),
636 source_hint
637 );
638
639 if data.len() >= 2 && data[0] == ENCRYPTED_MARKER {
641 let _reserved = data[1];
643 let encrypted_payload = &data[2..];
644
645 log::debug!(
646 "decrypt_document: encrypted payload len={}, nonce+ciphertext",
647 encrypted_payload.len()
648 );
649
650 match &self.encryption_key {
651 Some(key) => match key.decrypt_from_bytes(encrypted_payload) {
652 Ok(plaintext) => {
653 log::debug!(
654 "decrypt_document: SUCCESS, plaintext len={}",
655 plaintext.len()
656 );
657 Some(std::borrow::Cow::Owned(plaintext))
658 }
659 Err(e) => {
660 log::warn!(
661 "decrypt_document: FAILED (wrong key or corrupted): {} [payload_len={}, source={:?}]",
662 e,
663 encrypted_payload.len(),
664 source_hint
665 );
666 self.notify(HiveEvent::SecurityViolation {
667 kind: SecurityViolationKind::DecryptionFailed,
668 source: source_hint.map(String::from),
669 });
670 None
671 }
672 },
673 None => {
674 log::warn!(
675 "decrypt_document: encryption not enabled but received encrypted doc"
676 );
677 None
678 }
679 }
680 } else {
681 if self.config.strict_encryption && self.encryption_key.is_some() {
684 log::warn!(
685 "Rejected unencrypted document in strict encryption mode (source: {:?})",
686 source_hint
687 );
688 self.notify(HiveEvent::SecurityViolation {
689 kind: SecurityViolationKind::UnencryptedInStrictMode,
690 source: source_hint.map(String::from),
691 });
692 None
693 } else {
694 Some(std::borrow::Cow::Borrowed(data))
696 }
697 }
698 }
699
700 pub fn decrypt_only(&self, data: &[u8]) -> Option<Vec<u8>> {
714 self.decrypt_document(data, None)
715 .map(|cow| cow.into_owned())
716 }
717
718 pub fn has_identity(&self) -> bool {
722 self.identity.is_some()
723 }
724
725 pub fn public_key(&self) -> Option<[u8; 32]> {
727 self.identity.as_ref().map(|id| id.public_key())
728 }
729
730 pub fn create_attestation(&self, now_ms: u64) -> Option<IdentityAttestation> {
734 self.identity
735 .as_ref()
736 .map(|id| id.create_attestation(now_ms))
737 }
738
739 pub fn verify_peer_identity(&self, attestation: &IdentityAttestation) -> RegistryResult {
748 self.identity_registry
749 .lock()
750 .unwrap()
751 .verify_or_register(attestation)
752 }
753
754 pub fn is_peer_identity_known(&self, node_id: NodeId) -> bool {
756 self.identity_registry.lock().unwrap().is_known(node_id)
757 }
758
759 pub fn peer_public_key(&self, node_id: NodeId) -> Option<[u8; 32]> {
761 self.identity_registry
762 .lock()
763 .unwrap()
764 .get_public_key(node_id)
765 .copied()
766 }
767
768 pub fn known_identity_count(&self) -> usize {
770 self.identity_registry.lock().unwrap().len()
771 }
772
773 pub fn pre_register_peer_identity(&self, node_id: NodeId, public_key: [u8; 32], now_ms: u64) {
778 self.identity_registry
779 .lock()
780 .unwrap()
781 .pre_register(node_id, public_key, now_ms);
782 }
783
784 pub fn forget_peer_identity(&self, node_id: NodeId) {
788 self.identity_registry.lock().unwrap().remove(node_id);
789 }
790
791 pub fn sign(&self, data: &[u8]) -> Option<[u8; 64]> {
795 self.identity.as_ref().map(|id| id.sign(data))
796 }
797
798 pub fn verify_peer_signature(
803 &self,
804 node_id: NodeId,
805 data: &[u8],
806 signature: &[u8; 64],
807 ) -> bool {
808 if let Some(public_key) = self.peer_public_key(node_id) {
809 crate::security::verify_signature(&public_key, data, signature)
810 } else {
811 false
812 }
813 }
814
815 pub fn is_relay_enabled(&self) -> bool {
819 self.config.enable_relay
820 }
821
822 pub fn enable_relay(&mut self) {
824 self.config.enable_relay = true;
825 }
826
827 pub fn disable_relay(&mut self) {
829 self.config.enable_relay = false;
830 }
831
832 pub fn has_seen_message(&self, message_id: &MessageId) -> bool {
836 self.seen_cache.lock().unwrap().has_seen(message_id)
837 }
838
839 pub fn mark_message_seen(&self, message_id: MessageId, origin: NodeId, now_ms: u64) -> bool {
843 self.seen_cache
844 .lock()
845 .unwrap()
846 .check_and_mark(message_id, origin, now_ms)
847 }
848
849 pub fn seen_cache_size(&self) -> usize {
851 self.seen_cache.lock().unwrap().len()
852 }
853
854 pub fn clear_seen_cache(&self) {
856 self.seen_cache.lock().unwrap().clear();
857 }
858
859 pub fn wrap_for_relay(&self, payload: Vec<u8>) -> Vec<u8> {
864 let envelope = RelayEnvelope::broadcast(self.config.node_id, payload)
865 .with_max_hops(self.config.max_relay_hops);
866 envelope.encode()
867 }
868
869 pub fn get_relay_targets(&self, exclude_peer: Option<NodeId>) -> Vec<HivePeer> {
874 let connected = self.peer_manager.get_connected_peers();
875 let filtered: Vec<_> = if let Some(exclude) = exclude_peer {
876 connected
877 .into_iter()
878 .filter(|p| p.node_id != exclude)
879 .collect()
880 } else {
881 connected
882 };
883
884 self.gossip_strategy
885 .select_peers(&filtered)
886 .into_iter()
887 .cloned()
888 .collect()
889 }
890
891 pub fn process_relay_envelope(
901 &self,
902 data: &[u8],
903 source_peer: NodeId,
904 now_ms: u64,
905 ) -> Option<RelayDecision> {
906 let envelope = RelayEnvelope::decode(data)?;
908
909 if envelope.origin_node != source_peer && envelope.origin_node != self.node_id() {
912 let is_new = self.connection_graph.lock().unwrap().on_relay_received(
913 source_peer,
914 envelope.origin_node,
915 envelope.hop_count,
916 now_ms,
917 );
918
919 if is_new {
920 log::debug!(
921 "Discovered indirect peer {:08X} via {:08X} ({} hops)",
922 envelope.origin_node.as_u32(),
923 source_peer.as_u32(),
924 envelope.hop_count
925 );
926 }
927 }
928
929 if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
931 let stats = self
933 .seen_cache
934 .lock()
935 .unwrap()
936 .get_stats(&envelope.message_id);
937 let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
938
939 self.notify(HiveEvent::DuplicateMessageDropped {
940 origin_node: envelope.origin_node,
941 seen_count,
942 });
943
944 log::debug!(
945 "Dropped duplicate message {} from {:08X} (seen {} times)",
946 envelope.message_id,
947 envelope.origin_node.as_u32(),
948 seen_count
949 );
950 return None;
951 }
952
953 if !envelope.can_relay() {
955 self.notify(HiveEvent::MessageTtlExpired {
956 origin_node: envelope.origin_node,
957 hop_count: envelope.hop_count,
958 });
959
960 log::debug!(
961 "Message {} from {:08X} TTL expired at hop {}",
962 envelope.message_id,
963 envelope.origin_node.as_u32(),
964 envelope.hop_count
965 );
966
967 return Some(RelayDecision {
969 payload: envelope.payload,
970 origin_node: envelope.origin_node,
971 hop_count: envelope.hop_count,
972 should_relay: false,
973 relay_envelope: None,
974 });
975 }
976
977 let should_relay = self.config.enable_relay;
979 let relay_envelope = if should_relay {
980 envelope.relay() } else {
982 None
983 };
984
985 Some(RelayDecision {
986 payload: envelope.payload,
987 origin_node: envelope.origin_node,
988 hop_count: envelope.hop_count,
989 should_relay,
990 relay_envelope,
991 })
992 }
993
994 pub fn build_relay_document(&self) -> Vec<u8> {
999 let doc = self.build_document(); self.wrap_for_relay(doc)
1001 }
1002
1003 pub fn register_peer_for_delta(&self, peer_id: &NodeId) {
1010 let mut encoder = self.delta_encoder.lock().unwrap();
1011 encoder.add_peer(peer_id);
1012 log::debug!(
1013 "Registered peer {:08X} for delta sync tracking",
1014 peer_id.as_u32()
1015 );
1016 }
1017
1018 pub fn unregister_peer_for_delta(&self, peer_id: &NodeId) {
1022 let mut encoder = self.delta_encoder.lock().unwrap();
1023 encoder.remove_peer(peer_id);
1024 log::debug!(
1025 "Unregistered peer {:08X} from delta sync tracking",
1026 peer_id.as_u32()
1027 );
1028 }
1029
1030 pub fn reset_peer_delta_state(&self, peer_id: &NodeId) {
1035 let mut encoder = self.delta_encoder.lock().unwrap();
1036 encoder.reset_peer(peer_id);
1037 log::debug!("Reset delta sync state for peer {:08X}", peer_id.as_u32());
1038 }
1039
1040 pub fn record_delta_sent(&self, peer_id: &NodeId, bytes: usize) {
1042 let mut encoder = self.delta_encoder.lock().unwrap();
1043 encoder.record_sent(peer_id, bytes);
1044 }
1045
1046 pub fn record_delta_received(&self, peer_id: &NodeId, bytes: usize, timestamp: u64) {
1048 let mut encoder = self.delta_encoder.lock().unwrap();
1049 encoder.record_received(peer_id, bytes, timestamp);
1050 }
1051
1052 pub fn delta_stats(&self) -> DeltaStats {
1057 self.delta_encoder.lock().unwrap().stats()
1058 }
1059
1060 pub fn peer_delta_stats(&self, peer_id: &NodeId) -> Option<(u64, u64, u32)> {
1064 let encoder = self.delta_encoder.lock().unwrap();
1065 encoder
1066 .get_peer_state(peer_id)
1067 .map(|state| (state.bytes_sent, state.bytes_received, state.sync_count))
1068 }
1069
1070 pub fn build_delta_document_for_peer(&self, peer_id: &NodeId, now_ms: u64) -> Option<Vec<u8>> {
1078 let mut all_operations: Vec<Operation> = Vec::new();
1080
1081 for (node_id_u32, count) in self.document_sync.counter_entries() {
1084 all_operations.push(Operation::IncrementCounter {
1085 counter_id: 0, node_id: NodeId::new(node_id_u32),
1087 amount: count,
1088 timestamp: count, });
1090 }
1091
1092 let peripheral = self.document_sync.peripheral_snapshot();
1095 let peripheral_timestamp = peripheral
1096 .last_event
1097 .as_ref()
1098 .map(|e| e.timestamp)
1099 .unwrap_or(1); all_operations.push(Operation::UpdatePeripheral {
1101 peripheral,
1102 timestamp: peripheral_timestamp,
1103 });
1104
1105 if let Some(emergency) = self.document_sync.emergency_snapshot() {
1107 let source_node = NodeId::new(emergency.source_node());
1108 let timestamp = emergency.timestamp();
1109
1110 all_operations.push(Operation::SetEmergency {
1112 source_node,
1113 timestamp,
1114 known_peers: emergency.all_nodes(),
1115 });
1116
1117 for acked_node in emergency.acked_nodes() {
1119 all_operations.push(Operation::AckEmergency {
1120 node_id: NodeId::new(acked_node),
1121 emergency_timestamp: timestamp,
1122 });
1123 }
1124 }
1125
1126 for app_op in self.app_document_delta_ops() {
1128 all_operations.push(Operation::App(app_op));
1129 }
1130
1131 let filtered_operations: Vec<Operation> = {
1133 let encoder = self.delta_encoder.lock().unwrap();
1134 if let Some(peer_state) = encoder.get_peer_state(peer_id) {
1135 all_operations
1136 .into_iter()
1137 .filter(|op| peer_state.needs_send(&op.key(), op.timestamp()))
1138 .collect()
1139 } else {
1140 all_operations
1142 }
1143 };
1144
1145 if filtered_operations.is_empty() {
1147 return None;
1148 }
1149
1150 {
1152 let mut encoder = self.delta_encoder.lock().unwrap();
1153 if let Some(peer_state) = encoder.get_peer_state_mut(peer_id) {
1154 for op in &filtered_operations {
1155 peer_state.mark_sent(&op.key(), op.timestamp());
1156 }
1157 }
1158 }
1159
1160 let mut delta = DeltaDocument::new(self.config.node_id, now_ms);
1162 for op in filtered_operations {
1163 delta.add_operation(op);
1164 }
1165
1166 let encoded = delta.encode();
1168 let result = self.encrypt_document(&encoded);
1169
1170 {
1172 let mut encoder = self.delta_encoder.lock().unwrap();
1173 encoder.record_sent(peer_id, result.len());
1174 }
1175
1176 Some(result)
1177 }
1178
1179 pub fn build_full_delta_document(&self, now_ms: u64) -> Vec<u8> {
1184 let mut delta = DeltaDocument::new(self.config.node_id, now_ms);
1185
1186 for (node_id_u32, count) in self.document_sync.counter_entries() {
1188 delta.add_operation(Operation::IncrementCounter {
1189 counter_id: 0,
1190 node_id: NodeId::new(node_id_u32),
1191 amount: count,
1192 timestamp: now_ms,
1193 });
1194 }
1195
1196 let peripheral = self.document_sync.peripheral_snapshot();
1198 let peripheral_timestamp = peripheral
1199 .last_event
1200 .as_ref()
1201 .map(|e| e.timestamp)
1202 .unwrap_or(now_ms);
1203 delta.add_operation(Operation::UpdatePeripheral {
1204 peripheral,
1205 timestamp: peripheral_timestamp,
1206 });
1207
1208 if let Some(emergency) = self.document_sync.emergency_snapshot() {
1210 let source_node = NodeId::new(emergency.source_node());
1211 let timestamp = emergency.timestamp();
1212
1213 delta.add_operation(Operation::SetEmergency {
1214 source_node,
1215 timestamp,
1216 known_peers: emergency.all_nodes(),
1217 });
1218
1219 for acked_node in emergency.acked_nodes() {
1220 delta.add_operation(Operation::AckEmergency {
1221 node_id: NodeId::new(acked_node),
1222 emergency_timestamp: timestamp,
1223 });
1224 }
1225 }
1226
1227 for app_op in self.app_document_delta_ops() {
1229 delta.add_operation(Operation::App(app_op));
1230 }
1231
1232 let encoded = delta.encode();
1233 self.encrypt_document(&encoded)
1234 }
1235
1236 fn process_delta_document_internal(
1240 &self,
1241 source_node: NodeId,
1242 data: &[u8],
1243 now_ms: u64,
1244 relay_data: Option<Vec<u8>>,
1245 origin_node: Option<NodeId>,
1246 hop_count: u8,
1247 ) -> Option<DataReceivedResult> {
1248 let delta = DeltaDocument::decode(data)?;
1250
1251 if delta.origin_node == self.config.node_id {
1253 return None;
1254 }
1255
1256 let mut counter_changed = false;
1258 let mut emergency_changed = false;
1259 let mut is_emergency = false;
1260 let mut is_ack = false;
1261 let mut event_timestamp = 0u64;
1262 let mut peer_peripheral: Option<crate::sync::crdt::Peripheral> = None;
1263
1264 log::info!(
1265 "Delta document from {:08X}: {} operations, data_len={}",
1266 delta.origin_node.as_u32(),
1267 delta.operations.len(),
1268 data.len()
1269 );
1270 for op in &delta.operations {
1271 log::info!(" Operation: {}", op.key());
1272 match op {
1273 Operation::IncrementCounter {
1274 node_id, amount, ..
1275 } => {
1276 let current = self.document_sync.counter_entries();
1278 let current_value = current
1279 .iter()
1280 .find(|(id, _)| *id == node_id.as_u32())
1281 .map(|(_, v)| *v)
1282 .unwrap_or(0);
1283
1284 if *amount > current_value {
1285 counter_changed = true;
1288 }
1289 }
1290 Operation::UpdatePeripheral {
1291 peripheral,
1292 timestamp,
1293 } => {
1294 if let Ok(mut peripherals) = self.peer_peripherals.write() {
1296 peripherals.insert(delta.origin_node, peripheral.clone());
1297 }
1298 peer_peripheral = Some(peripheral.clone());
1300 if *timestamp > event_timestamp {
1302 event_timestamp = *timestamp;
1303 }
1304 }
1305 Operation::SetEmergency { timestamp, .. } => {
1306 is_emergency = true;
1307 emergency_changed = true;
1308 event_timestamp = *timestamp;
1309 }
1310 Operation::AckEmergency {
1311 emergency_timestamp,
1312 ..
1313 } => {
1314 is_ack = true;
1315 emergency_changed = true;
1316 if *emergency_timestamp > event_timestamp {
1317 event_timestamp = *emergency_timestamp;
1318 }
1319 }
1320 Operation::ClearEmergency {
1321 emergency_timestamp,
1322 } => {
1323 emergency_changed = true;
1324 if *emergency_timestamp > event_timestamp {
1325 event_timestamp = *emergency_timestamp;
1326 }
1327 }
1328 Operation::App(app_op) => {
1329 let doc_timestamp = app_op.timestamp & 0x0000_FFFF_FFFF_FFFF;
1335
1336 log::info!(
1337 "App operation received: type={:02X} op_code={:02X} from {:08X} ts={} payload_len={}",
1338 app_op.type_id,
1339 app_op.op_code,
1340 app_op.source_node,
1341 doc_timestamp,
1342 app_op.payload.len()
1343 );
1344
1345 let doc_key = (app_op.type_id, app_op.source_node, doc_timestamp);
1347 let changed = {
1348 let mut docs = self.app_documents.write().unwrap();
1349
1350 if let Some(existing) = docs.get_mut(&doc_key) {
1351 self.document_registry.apply_delta_op(
1353 app_op.type_id,
1354 existing.as_mut(),
1355 app_op,
1356 )
1357 } else {
1358 if let Some(decoded) = self
1360 .document_registry
1361 .decode(app_op.type_id, &app_op.payload)
1362 {
1363 docs.insert(doc_key, decoded);
1364 true
1365 } else {
1366 log::debug!(
1369 "Received delta for unknown doc {:?}, waiting for full state",
1370 doc_key
1371 );
1372 false
1373 }
1374 }
1375 };
1376
1377 self.observers.notify(HiveEvent::app_document_received(
1379 app_op.type_id,
1380 NodeId::new(app_op.source_node),
1381 doc_timestamp,
1382 changed,
1383 ));
1384 }
1385 }
1386 }
1387
1388 self.peer_manager.record_sync(source_node, now_ms);
1390
1391 {
1393 let mut encoder = self.delta_encoder.lock().unwrap();
1394 encoder.record_received(&source_node, data.len(), now_ms);
1395 }
1396
1397 if is_emergency {
1399 self.notify(HiveEvent::EmergencyReceived {
1400 from_node: delta.origin_node,
1401 });
1402 } else if is_ack {
1403 self.notify(HiveEvent::AckReceived {
1404 from_node: delta.origin_node,
1405 });
1406 }
1407
1408 if counter_changed {
1409 let total_count = self.document_sync.total_count();
1410 self.notify(HiveEvent::DocumentSynced {
1411 from_node: delta.origin_node,
1412 total_count,
1413 });
1414 }
1415
1416 if relay_data.is_some() {
1418 let relay_targets = self.get_relay_targets(Some(source_node));
1419 self.notify(HiveEvent::MessageRelayed {
1420 origin_node: origin_node.unwrap_or(delta.origin_node),
1421 relay_count: relay_targets.len(),
1422 hop_count,
1423 });
1424 }
1425
1426 let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
1427 DataReceivedResult::peripheral_fields(&peer_peripheral);
1428
1429 Some(DataReceivedResult {
1430 source_node: delta.origin_node,
1431 is_emergency,
1432 is_ack,
1433 counter_changed,
1434 emergency_changed,
1435 total_count: self.document_sync.total_count(),
1436 event_timestamp,
1437 relay_data,
1438 origin_node,
1439 hop_count,
1440 callsign,
1441 battery_percent,
1442 heart_rate,
1443 event_type,
1444 latitude,
1445 longitude,
1446 altitude,
1447 })
1448 }
1449
1450 pub fn enable_peer_e2ee(&self) {
1458 let mut sessions = self.peer_sessions.lock().unwrap();
1459 if sessions.is_none() {
1460 *sessions = Some(PeerSessionManager::new(self.config.node_id));
1461 log::info!(
1462 "Per-peer E2EE enabled for node {:08X}",
1463 self.config.node_id.as_u32()
1464 );
1465 }
1466 }
1467
1468 pub fn disable_peer_e2ee(&self) {
1472 let mut sessions = self.peer_sessions.lock().unwrap();
1473 *sessions = None;
1474 log::info!("Per-peer E2EE disabled");
1475 }
1476
1477 pub fn is_peer_e2ee_enabled(&self) -> bool {
1479 self.peer_sessions.lock().unwrap().is_some()
1480 }
1481
1482 pub fn peer_e2ee_public_key(&self) -> Option<[u8; 32]> {
1486 self.peer_sessions
1487 .lock()
1488 .unwrap()
1489 .as_ref()
1490 .map(|s| s.our_public_key())
1491 }
1492
1493 pub fn initiate_peer_e2ee(&self, peer_node_id: NodeId, now_ms: u64) -> Option<Vec<u8>> {
1499 let mut sessions = self.peer_sessions.lock().unwrap();
1500 let session_mgr = sessions.as_mut()?;
1501
1502 let key_exchange = session_mgr.initiate_session(peer_node_id, now_ms);
1503 let mut buf = Vec::with_capacity(2 + 37);
1504 buf.push(KEY_EXCHANGE_MARKER);
1505 buf.push(0x00); buf.extend_from_slice(&key_exchange.encode());
1507
1508 log::info!(
1509 "Initiated E2EE session with peer {:08X}",
1510 peer_node_id.as_u32()
1511 );
1512 Some(buf)
1513 }
1514
1515 pub fn has_peer_e2ee_session(&self, peer_node_id: NodeId) -> bool {
1517 self.peer_sessions
1518 .lock()
1519 .unwrap()
1520 .as_ref()
1521 .is_some_and(|s| s.has_session(peer_node_id))
1522 }
1523
1524 pub fn peer_e2ee_session_state(&self, peer_node_id: NodeId) -> Option<SessionState> {
1526 self.peer_sessions
1527 .lock()
1528 .unwrap()
1529 .as_ref()
1530 .and_then(|s| s.session_state(peer_node_id))
1531 }
1532
1533 pub fn send_peer_e2ee(
1538 &self,
1539 peer_node_id: NodeId,
1540 plaintext: &[u8],
1541 now_ms: u64,
1542 ) -> Option<Vec<u8>> {
1543 let mut sessions = self.peer_sessions.lock().unwrap();
1544 let session_mgr = sessions.as_mut()?;
1545
1546 match session_mgr.encrypt_for_peer(peer_node_id, plaintext, now_ms) {
1547 Ok(encrypted) => {
1548 let mut buf = Vec::with_capacity(2 + encrypted.encode().len());
1549 buf.push(PEER_E2EE_MARKER);
1550 buf.push(0x00); buf.extend_from_slice(&encrypted.encode());
1552 Some(buf)
1553 }
1554 Err(e) => {
1555 log::warn!(
1556 "Failed to encrypt for peer {:08X}: {:?}",
1557 peer_node_id.as_u32(),
1558 e
1559 );
1560 None
1561 }
1562 }
1563 }
1564
1565 pub fn close_peer_e2ee(&self, peer_node_id: NodeId) {
1567 let mut sessions = self.peer_sessions.lock().unwrap();
1568 if let Some(session_mgr) = sessions.as_mut() {
1569 session_mgr.close_session(peer_node_id);
1570 self.notify(HiveEvent::PeerE2eeClosed { peer_node_id });
1571 log::info!(
1572 "Closed E2EE session with peer {:08X}",
1573 peer_node_id.as_u32()
1574 );
1575 }
1576 }
1577
1578 pub fn peer_e2ee_session_count(&self) -> usize {
1580 self.peer_sessions
1581 .lock()
1582 .unwrap()
1583 .as_ref()
1584 .map(|s| s.session_count())
1585 .unwrap_or(0)
1586 }
1587
1588 pub fn peer_e2ee_established_count(&self) -> usize {
1590 self.peer_sessions
1591 .lock()
1592 .unwrap()
1593 .as_ref()
1594 .map(|s| s.established_count())
1595 .unwrap_or(0)
1596 }
1597
1598 fn handle_key_exchange(&self, data: &[u8], now_ms: u64) -> Option<Vec<u8>> {
1603 if data.len() < 2 || data[0] != KEY_EXCHANGE_MARKER {
1604 return None;
1605 }
1606
1607 let payload = &data[2..];
1608 let msg = KeyExchangeMessage::decode(payload)?;
1609
1610 let mut sessions = self.peer_sessions.lock().unwrap();
1611 let session_mgr = sessions.as_mut()?;
1612
1613 let (response, established) = session_mgr.handle_key_exchange(&msg, now_ms)?;
1614
1615 if established {
1616 self.notify(HiveEvent::PeerE2eeEstablished {
1617 peer_node_id: msg.sender_node_id,
1618 });
1619 log::info!(
1620 "E2EE session established with peer {:08X}",
1621 msg.sender_node_id.as_u32()
1622 );
1623 }
1624
1625 let mut buf = Vec::with_capacity(2 + 37);
1627 buf.push(KEY_EXCHANGE_MARKER);
1628 buf.push(0x00);
1629 buf.extend_from_slice(&response.encode());
1630 Some(buf)
1631 }
1632
1633 fn handle_peer_e2ee_message(&self, data: &[u8], now_ms: u64) -> Option<Vec<u8>> {
1638 if data.len() < 2 || data[0] != PEER_E2EE_MARKER {
1639 return None;
1640 }
1641
1642 let payload = &data[2..];
1643 let msg = PeerEncryptedMessage::decode(payload)?;
1644
1645 let mut sessions = self.peer_sessions.lock().unwrap();
1646 let session_mgr = sessions.as_mut()?;
1647
1648 match session_mgr.decrypt_from_peer(&msg, now_ms) {
1649 Ok(plaintext) => {
1650 self.notify(HiveEvent::PeerE2eeMessageReceived {
1652 from_node: msg.sender_node_id,
1653 data: plaintext.clone(),
1654 });
1655 Some(plaintext)
1656 }
1657 Err(e) => {
1658 log::warn!(
1659 "Failed to decrypt E2EE message from {:08X}: {:?}",
1660 msg.sender_node_id.as_u32(),
1661 e
1662 );
1663 None
1664 }
1665 }
1666 }
1667
1668 pub fn node_id(&self) -> NodeId {
1672 self.config.node_id
1673 }
1674
1675 pub fn callsign(&self) -> &str {
1677 &self.config.callsign
1678 }
1679
1680 pub fn mesh_id(&self) -> &str {
1682 &self.config.mesh_id
1683 }
1684
1685 pub fn device_name(&self) -> String {
1687 format!(
1688 "HIVE_{}-{:08X}",
1689 self.config.mesh_id,
1690 self.config.node_id.as_u32()
1691 )
1692 }
1693
1694 pub fn get_peer_callsign(&self, node_id: NodeId) -> Option<String> {
1699 self.peer_peripherals.read().ok().and_then(|peripherals| {
1700 peripherals
1701 .get(&node_id)
1702 .map(|p| p.callsign_str().to_string())
1703 })
1704 }
1705
1706 pub fn get_peer_peripheral(&self, node_id: NodeId) -> Option<Peripheral> {
1711 self.peer_peripherals
1712 .read()
1713 .ok()
1714 .and_then(|peripherals| peripherals.get(&node_id).cloned())
1715 }
1716
1717 pub fn document_registry(&self) -> &DocumentRegistry {
1732 &self.document_registry
1733 }
1734
1735 pub fn store_app_document<T: crate::registry::DocumentType>(&self, doc: T) -> bool {
1742 let type_id = T::TYPE_ID;
1743 let (source_node, timestamp) = doc.identity();
1744 let key = (type_id, source_node, timestamp);
1745
1746 let mut docs = self.app_documents.write().unwrap();
1747
1748 if let Some(existing) = docs.get_mut(&key) {
1749 self.document_registry
1751 .merge(type_id, existing.as_mut(), &doc)
1752 } else {
1753 docs.insert(key, Box::new(doc));
1755 true
1756 }
1757 }
1758
1759 pub fn store_app_document_boxed(
1766 &self,
1767 type_id: u8,
1768 source_node: u32,
1769 timestamp: u64,
1770 doc: Box<dyn core::any::Any + Send + Sync>,
1771 ) -> bool {
1772 let key = (type_id, source_node, timestamp);
1773
1774 let mut docs = self.app_documents.write().unwrap();
1775
1776 if let Some(existing) = docs.get_mut(&key) {
1777 self.document_registry
1779 .merge(type_id, existing.as_mut(), doc.as_ref())
1780 } else {
1781 docs.insert(key, doc);
1783 true
1784 }
1785 }
1786
1787 pub fn get_app_document<T: crate::registry::DocumentType>(
1791 &self,
1792 source_node: u32,
1793 timestamp: u64,
1794 ) -> Option<T> {
1795 let key = (T::TYPE_ID, source_node, timestamp);
1796
1797 let docs = self.app_documents.read().unwrap();
1798 docs.get(&key).and_then(|d| d.downcast_ref::<T>()).cloned()
1799 }
1800
1801 pub fn get_all_app_documents_of_type<T: crate::registry::DocumentType>(&self) -> Vec<T> {
1805 let docs = self.app_documents.read().unwrap();
1806 docs.iter()
1807 .filter(|((type_id, _, _), _)| *type_id == T::TYPE_ID)
1808 .filter_map(|(_, doc)| doc.downcast_ref::<T>().cloned())
1809 .collect()
1810 }
1811
1812 pub fn app_document_delta_ops(&self) -> Vec<crate::registry::AppOperation> {
1816 let docs = self.app_documents.read().unwrap();
1817 let mut ops = Vec::new();
1818
1819 for ((type_id, _source, _ts), doc) in docs.iter() {
1820 if let Some(op) = self.document_registry.to_delta_op(*type_id, doc.as_ref()) {
1821 ops.push(op);
1822 }
1823 }
1824
1825 ops
1826 }
1827
1828 pub fn app_document_keys(&self, type_id: u8) -> Vec<(u32, u64)> {
1832 let docs = self.app_documents.read().unwrap();
1833 docs.keys()
1834 .filter(|(tid, _, _)| *tid == type_id)
1835 .map(|(_, source, ts)| (*source, *ts))
1836 .collect()
1837 }
1838
1839 pub fn app_document_count(&self) -> usize {
1841 self.app_documents.read().unwrap().len()
1842 }
1843
1844 pub fn add_observer(&self, observer: Arc<dyn HiveObserver>) {
1848 self.observers.add(observer);
1849 }
1850
1851 pub fn remove_observer(&self, observer: &Arc<dyn HiveObserver>) {
1853 self.observers.remove(observer);
1854 }
1855
1856 pub fn send_emergency(&self, timestamp: u64) -> Vec<u8> {
1863 let data = self.document_sync.send_emergency(timestamp);
1864 self.notify(HiveEvent::MeshStateChanged {
1865 peer_count: self.peer_manager.peer_count(),
1866 connected_count: self.peer_manager.connected_count(),
1867 });
1868 self.encrypt_document(&data)
1869 }
1870
1871 pub fn send_ack(&self, timestamp: u64) -> Vec<u8> {
1876 let data = self.document_sync.send_ack(timestamp);
1877 self.notify(HiveEvent::MeshStateChanged {
1878 peer_count: self.peer_manager.peer_count(),
1879 connected_count: self.peer_manager.connected_count(),
1880 });
1881 self.encrypt_document(&data)
1882 }
1883
1884 pub fn broadcast_bytes(&self, payload: &[u8]) -> Vec<u8> {
1891 self.encrypt_document(payload)
1892 }
1893
1894 pub fn clear_event(&self) {
1896 self.document_sync.clear_event();
1897 }
1898
1899 pub fn is_emergency_active(&self) -> bool {
1901 self.document_sync.is_emergency_active()
1902 }
1903
1904 pub fn is_ack_active(&self) -> bool {
1906 self.document_sync.is_ack_active()
1907 }
1908
1909 pub fn current_event(&self) -> Option<EventType> {
1911 self.document_sync.current_event()
1912 }
1913
1914 pub fn start_emergency(&self, timestamp: u64, known_peers: &[u32]) -> Vec<u8> {
1923 let data = self.document_sync.start_emergency(timestamp, known_peers);
1924 self.notify(HiveEvent::MeshStateChanged {
1925 peer_count: self.peer_manager.peer_count(),
1926 connected_count: self.peer_manager.connected_count(),
1927 });
1928 self.encrypt_document(&data)
1929 }
1930
1931 pub fn start_emergency_with_known_peers(&self, timestamp: u64) -> Vec<u8> {
1935 let peers: Vec<u32> = self
1936 .peer_manager
1937 .get_peers()
1938 .iter()
1939 .map(|p| p.node_id.as_u32())
1940 .collect();
1941 self.start_emergency(timestamp, &peers)
1942 }
1943
1944 pub fn ack_emergency(&self, timestamp: u64) -> Option<Vec<u8>> {
1949 let result = self.document_sync.ack_emergency(timestamp);
1950 if result.is_some() {
1951 self.notify(HiveEvent::MeshStateChanged {
1952 peer_count: self.peer_manager.peer_count(),
1953 connected_count: self.peer_manager.connected_count(),
1954 });
1955 }
1956 result.map(|data| self.encrypt_document(&data))
1957 }
1958
1959 pub fn clear_emergency(&self) {
1961 self.document_sync.clear_emergency();
1962 }
1963
1964 pub fn has_active_emergency(&self) -> bool {
1966 self.document_sync.has_active_emergency()
1967 }
1968
1969 pub fn get_emergency_status(&self) -> Option<(u32, u64, usize, usize)> {
1973 self.document_sync.get_emergency_status()
1974 }
1975
1976 pub fn has_peer_acked(&self, peer_id: u32) -> bool {
1978 self.document_sync.has_peer_acked(peer_id)
1979 }
1980
1981 pub fn all_peers_acked(&self) -> bool {
1983 self.document_sync.all_peers_acked()
1984 }
1985
1986 #[cfg(feature = "legacy-chat")]
1996 pub fn send_chat(&self, sender: &str, text: &str, timestamp: u64) -> Option<Vec<u8>> {
1997 if self.document_sync.add_chat_message(sender, text, timestamp) {
1998 Some(self.encrypt_document(&self.build_document()))
1999 } else {
2000 None
2001 }
2002 }
2003
2004 #[cfg(feature = "legacy-chat")]
2012 pub fn send_chat_reply(
2013 &self,
2014 sender: &str,
2015 text: &str,
2016 reply_to_node: u32,
2017 reply_to_timestamp: u64,
2018 timestamp: u64,
2019 ) -> Option<Vec<u8>> {
2020 if self.document_sync.add_chat_reply(
2021 sender,
2022 text,
2023 reply_to_node,
2024 reply_to_timestamp,
2025 timestamp,
2026 ) {
2027 Some(self.encrypt_document(&self.build_document()))
2028 } else {
2029 None
2030 }
2031 }
2032
2033 #[cfg(feature = "legacy-chat")]
2035 pub fn chat_count(&self) -> usize {
2036 self.document_sync.chat_count()
2037 }
2038
2039 #[cfg(feature = "legacy-chat")]
2043 pub fn chat_messages_since(
2044 &self,
2045 since_timestamp: u64,
2046 ) -> Vec<(u32, u64, String, String, u32, u64)> {
2047 self.document_sync.chat_messages_since(since_timestamp)
2048 }
2049
2050 #[cfg(feature = "legacy-chat")]
2054 pub fn all_chat_messages(&self) -> Vec<(u32, u64, String, String, u32, u64)> {
2055 self.document_sync.all_chat_messages()
2056 }
2057
2058 pub fn on_ble_discovered(
2064 &self,
2065 identifier: &str,
2066 name: Option<&str>,
2067 rssi: i8,
2068 mesh_id: Option<&str>,
2069 now_ms: u64,
2070 ) -> Option<HivePeer> {
2071 let (node_id, is_new) = self
2072 .peer_manager
2073 .on_discovered(identifier, name, rssi, mesh_id, now_ms)?;
2074
2075 let peer = self.peer_manager.get_peer(node_id)?;
2076
2077 {
2079 let mut graph = self.connection_graph.lock().unwrap();
2080 graph.on_discovered(
2081 node_id,
2082 identifier.to_string(),
2083 name.map(|s| s.to_string()),
2084 mesh_id.map(|s| s.to_string()),
2085 rssi,
2086 now_ms,
2087 );
2088 }
2089
2090 if is_new {
2091 self.notify(HiveEvent::PeerDiscovered { peer: peer.clone() });
2092 self.notify_mesh_state_changed();
2093 }
2094
2095 Some(peer)
2096 }
2097
2098 pub fn on_ble_connected(&self, identifier: &str, now_ms: u64) -> Option<NodeId> {
2102 let node_id = match self.peer_manager.on_connected(identifier, now_ms) {
2103 Some(id) => id,
2104 None => {
2105 log::warn!(
2106 "on_ble_connected: identifier {:?} not in peer map — \
2107 use on_incoming_connection() for peripheral connections",
2108 identifier
2109 );
2110 return None;
2111 }
2112 };
2113
2114 {
2116 let mut graph = self.connection_graph.lock().unwrap();
2117 graph.on_connected(node_id, now_ms);
2118 }
2119
2120 self.register_peer_for_delta(&node_id);
2122
2123 self.notify(HiveEvent::PeerConnected { node_id });
2124 self.notify_mesh_state_changed();
2125 Some(node_id)
2126 }
2127
2128 pub fn on_ble_disconnected(
2130 &self,
2131 identifier: &str,
2132 reason: DisconnectReason,
2133 ) -> Option<NodeId> {
2134 let (node_id, observer_reason) = self.peer_manager.on_disconnected(identifier, reason)?;
2135
2136 {
2138 let mut graph = self.connection_graph.lock().unwrap();
2139 let platform_reason = match observer_reason {
2140 DisconnectReason::LocalRequest => crate::platform::DisconnectReason::LocalRequest,
2141 DisconnectReason::RemoteRequest => crate::platform::DisconnectReason::RemoteRequest,
2142 DisconnectReason::Timeout => crate::platform::DisconnectReason::Timeout,
2143 DisconnectReason::LinkLoss => crate::platform::DisconnectReason::LinkLoss,
2144 DisconnectReason::ConnectionFailed => {
2145 crate::platform::DisconnectReason::ConnectionFailed
2146 }
2147 DisconnectReason::Unknown => crate::platform::DisconnectReason::Unknown,
2148 };
2149 let now_ms = std::time::SystemTime::now()
2150 .duration_since(std::time::UNIX_EPOCH)
2151 .map(|d| d.as_millis() as u64)
2152 .unwrap_or(0);
2153 graph.on_disconnected(node_id, platform_reason, now_ms);
2154
2155 graph.remove_via_peer(node_id);
2158 }
2159
2160 self.unregister_peer_for_delta(&node_id);
2162
2163 self.notify(HiveEvent::PeerDisconnected {
2164 node_id,
2165 reason: observer_reason,
2166 });
2167 self.notify_mesh_state_changed();
2168 Some(node_id)
2169 }
2170
2171 pub fn on_peer_disconnected(&self, node_id: NodeId, reason: DisconnectReason) {
2175 if self
2176 .peer_manager
2177 .on_disconnected_by_node_id(node_id, reason)
2178 {
2179 {
2181 let mut graph = self.connection_graph.lock().unwrap();
2182 let platform_reason = match reason {
2183 DisconnectReason::LocalRequest => {
2184 crate::platform::DisconnectReason::LocalRequest
2185 }
2186 DisconnectReason::RemoteRequest => {
2187 crate::platform::DisconnectReason::RemoteRequest
2188 }
2189 DisconnectReason::Timeout => crate::platform::DisconnectReason::Timeout,
2190 DisconnectReason::LinkLoss => crate::platform::DisconnectReason::LinkLoss,
2191 DisconnectReason::ConnectionFailed => {
2192 crate::platform::DisconnectReason::ConnectionFailed
2193 }
2194 DisconnectReason::Unknown => crate::platform::DisconnectReason::Unknown,
2195 };
2196 let now_ms = std::time::SystemTime::now()
2197 .duration_since(std::time::UNIX_EPOCH)
2198 .map(|d| d.as_millis() as u64)
2199 .unwrap_or(0);
2200 graph.on_disconnected(node_id, platform_reason, now_ms);
2201
2202 graph.remove_via_peer(node_id);
2204 }
2205
2206 self.unregister_peer_for_delta(&node_id);
2208
2209 self.notify(HiveEvent::PeerDisconnected { node_id, reason });
2210 self.notify_mesh_state_changed();
2211 }
2212 }
2213
2214 pub fn on_incoming_connection(&self, identifier: &str, node_id: NodeId, now_ms: u64) -> bool {
2218 let is_new = self
2219 .peer_manager
2220 .on_incoming_connection(identifier, node_id, now_ms);
2221
2222 {
2224 let mut graph = self.connection_graph.lock().unwrap();
2225 if is_new {
2226 graph.on_discovered(
2227 node_id,
2228 identifier.to_string(),
2229 None,
2230 Some(self.config.mesh_id.clone()),
2231 -50, now_ms,
2233 );
2234 }
2235 graph.on_connected(node_id, now_ms);
2236 }
2237
2238 self.register_peer_for_delta(&node_id);
2240
2241 if is_new {
2242 if let Some(peer) = self.peer_manager.get_peer(node_id) {
2243 self.notify(HiveEvent::PeerDiscovered { peer });
2244 }
2245 }
2246
2247 self.notify(HiveEvent::PeerConnected { node_id });
2248 self.notify_mesh_state_changed();
2249
2250 is_new
2251 }
2252
2253 pub fn on_ble_data_received(
2260 &self,
2261 identifier: &str,
2262 data: &[u8],
2263 now_ms: u64,
2264 ) -> Option<DataReceivedResult> {
2265 let node_id = self.peer_manager.get_node_id(identifier)?;
2267
2268 if data.len() >= 2 {
2270 match data[0] {
2271 KEY_EXCHANGE_MARKER => {
2272 let _response = self.handle_key_exchange(data, now_ms);
2274 return None;
2276 }
2277 PEER_E2EE_MARKER => {
2278 let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
2280 return None;
2282 }
2283 RELAY_ENVELOPE_MARKER => {
2284 return self
2286 .handle_relay_envelope_with_identifier(node_id, identifier, data, now_ms);
2287 }
2288 _ => {}
2289 }
2290 }
2291
2292 self.process_document_data_with_identifier(node_id, identifier, data, now_ms, None, None, 0)
2294 }
2295
2296 #[allow(clippy::too_many_arguments)]
2298 fn process_document_data_with_identifier(
2299 &self,
2300 source_node: NodeId,
2301 identifier: &str,
2302 data: &[u8],
2303 now_ms: u64,
2304 relay_data: Option<Vec<u8>>,
2305 origin_node: Option<NodeId>,
2306 hop_count: u8,
2307 ) -> Option<DataReceivedResult> {
2308 let decrypted = self.decrypt_document(data, Some(identifier))?;
2310
2311 if DeltaDocument::is_delta_document(&decrypted) {
2313 return self.process_delta_document_internal(
2314 source_node,
2315 &decrypted,
2316 now_ms,
2317 relay_data,
2318 origin_node,
2319 hop_count,
2320 );
2321 }
2322
2323 let result = self.document_sync.merge_document(&decrypted)?;
2325
2326 if let Some(ref peripheral) = result.peer_peripheral {
2328 if let Ok(mut peripherals) = self.peer_peripherals.write() {
2329 peripherals.insert(result.source_node, peripheral.clone());
2330 }
2331 }
2332
2333 self.peer_manager.record_sync(source_node, now_ms);
2335
2336 if result.is_emergency() {
2338 self.notify(HiveEvent::EmergencyReceived {
2339 from_node: result.source_node,
2340 });
2341 } else if result.is_ack() {
2342 self.notify(HiveEvent::AckReceived {
2343 from_node: result.source_node,
2344 });
2345 }
2346
2347 if result.counter_changed {
2348 self.notify(HiveEvent::DocumentSynced {
2349 from_node: result.source_node,
2350 total_count: result.total_count,
2351 });
2352 }
2353
2354 if relay_data.is_some() {
2356 let relay_targets = self.get_relay_targets(Some(source_node));
2357 self.notify(HiveEvent::MessageRelayed {
2358 origin_node: origin_node.unwrap_or(result.source_node),
2359 relay_count: relay_targets.len(),
2360 hop_count,
2361 });
2362 }
2363
2364 let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2365 DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2366
2367 Some(DataReceivedResult {
2368 source_node: result.source_node,
2369 is_emergency: result.is_emergency(),
2370 is_ack: result.is_ack(),
2371 counter_changed: result.counter_changed,
2372 emergency_changed: result.emergency_changed,
2373 total_count: result.total_count,
2374 event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2375 relay_data,
2376 origin_node,
2377 hop_count,
2378 callsign,
2379 battery_percent,
2380 heart_rate,
2381 event_type,
2382 latitude,
2383 longitude,
2384 altitude,
2385 })
2386 }
2387
2388 fn handle_relay_envelope_with_identifier(
2390 &self,
2391 source_node: NodeId,
2392 identifier: &str,
2393 data: &[u8],
2394 now_ms: u64,
2395 ) -> Option<DataReceivedResult> {
2396 let envelope = RelayEnvelope::decode(data)?;
2398
2399 if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
2401 let stats = self
2402 .seen_cache
2403 .lock()
2404 .unwrap()
2405 .get_stats(&envelope.message_id);
2406 let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
2407
2408 self.notify(HiveEvent::DuplicateMessageDropped {
2409 origin_node: envelope.origin_node,
2410 seen_count,
2411 });
2412 return None;
2413 }
2414
2415 let relay_data = if envelope.can_relay() && self.config.enable_relay {
2417 envelope.relay().map(|e| e.encode())
2418 } else {
2419 if !envelope.can_relay() {
2420 self.notify(HiveEvent::MessageTtlExpired {
2421 origin_node: envelope.origin_node,
2422 hop_count: envelope.hop_count,
2423 });
2424 }
2425 None
2426 };
2427
2428 self.process_document_data_with_identifier(
2430 source_node,
2431 identifier,
2432 &envelope.payload,
2433 now_ms,
2434 relay_data,
2435 Some(envelope.origin_node),
2436 envelope.hop_count,
2437 )
2438 }
2439
2440 pub fn on_ble_data_received_from_node(
2447 &self,
2448 node_id: NodeId,
2449 data: &[u8],
2450 now_ms: u64,
2451 ) -> Option<DataReceivedResult> {
2452 if data.len() >= 2 {
2454 match data[0] {
2455 KEY_EXCHANGE_MARKER => {
2456 let _response = self.handle_key_exchange(data, now_ms);
2457 return None;
2458 }
2459 PEER_E2EE_MARKER => {
2460 let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
2461 return None;
2462 }
2463 RELAY_ENVELOPE_MARKER => {
2464 return self.handle_relay_envelope(node_id, data, now_ms);
2466 }
2467 _ => {}
2468 }
2469 }
2470
2471 self.process_document_data(node_id, data, now_ms, None, None, 0)
2473 }
2474
2475 pub fn on_ble_data_received_anonymous(
2485 &self,
2486 identifier: &str,
2487 data: &[u8],
2488 now_ms: u64,
2489 ) -> Option<DataReceivedResult> {
2490 log::debug!(
2491 "on_ble_data_received_anonymous: identifier={}, len={}, marker=0x{:02X}",
2492 identifier,
2493 data.len(),
2494 data.first().copied().unwrap_or(0)
2495 );
2496
2497 let decrypted = match self.decrypt_document(data, Some(identifier)) {
2499 Some(d) => d,
2500 None => {
2501 log::warn!(
2502 "on_ble_data_received_anonymous: decrypt/parse FAILED for {} byte doc from {}",
2503 data.len(),
2504 identifier
2505 );
2506 return None;
2507 }
2508 };
2509
2510 if decrypted.len() < 8 {
2513 log::warn!("Decrypted document too short to extract source_node");
2514 return None;
2515 }
2516
2517 let source_node_u32 =
2518 u32::from_le_bytes([decrypted[4], decrypted[5], decrypted[6], decrypted[7]]);
2519 let source_node = NodeId::new(source_node_u32);
2520
2521 log::info!(
2522 "Anonymous document from {}: source_node={:08X}, len={}",
2523 identifier,
2524 source_node_u32,
2525 decrypted.len()
2526 );
2527
2528 self.peer_manager
2531 .register_identifier(identifier, source_node);
2532
2533 let is_delta = DeltaDocument::is_delta_document(&decrypted);
2535 log::info!(
2536 "Document format: delta={}, first_byte=0x{:02X}, len={}",
2537 is_delta,
2538 decrypted.first().copied().unwrap_or(0),
2539 decrypted.len()
2540 );
2541
2542 if is_delta {
2543 return self.process_delta_document_internal(
2544 source_node,
2545 &decrypted,
2546 now_ms,
2547 None,
2548 None,
2549 0,
2550 );
2551 }
2552
2553 const APP_LAYER_MARKER: u8 = 0xAF;
2556 if decrypted.first().copied() == Some(APP_LAYER_MARKER) {
2557 #[cfg(feature = "hive-lite-sync")]
2558 {
2559 use crate::hive_lite_sync::CannedMessageDocument;
2560 use crate::registry::DocumentType;
2561
2562 log::info!(
2563 "App-layer message (0xAF) from {:08X}, {} bytes - storing in registry",
2564 source_node.as_u32(),
2565 decrypted.len()
2566 );
2567
2568 let payload = &decrypted[1..];
2571 if let Some(doc) = CannedMessageDocument::decode(payload) {
2572 let (doc_source, doc_ts) = doc.identity();
2573 let changed = self.store_app_document(doc);
2574 log::info!(
2575 "Stored CannedMessage: source={:08X} ts={} changed={}",
2576 doc_source,
2577 doc_ts,
2578 changed
2579 );
2580
2581 self.observers.notify(HiveEvent::app_document_received(
2583 CannedMessageDocument::TYPE_ID,
2584 NodeId::new(doc_source),
2585 doc_ts,
2586 changed,
2587 ));
2588
2589 return Some(DataReceivedResult {
2591 source_node,
2592 is_emergency: false,
2593 is_ack: false,
2594 counter_changed: false,
2595 emergency_changed: false,
2596 total_count: 0,
2597 event_timestamp: doc_ts,
2598 relay_data: None, origin_node: None,
2600 hop_count: 0,
2601 callsign: None,
2602 battery_percent: None,
2603 heart_rate: None,
2604 event_type: None,
2605 latitude: None,
2606 longitude: None,
2607 altitude: None,
2608 });
2609 } else {
2610 log::warn!("Failed to decode 0xAF message as CannedMessageDocument");
2611 }
2612 }
2613
2614 #[cfg(not(feature = "hive-lite-sync"))]
2615 {
2616 log::debug!("Ignoring 0xAF message (hive-lite-sync feature not enabled)");
2617 }
2618
2619 return None;
2620 }
2621
2622 log::info!(
2624 "Processing legacy document from {:08X}",
2625 source_node.as_u32()
2626 );
2627 let result = self.document_sync.merge_document(&decrypted)?;
2628
2629 log::info!(
2631 "Merge result: peer_peripheral={}, counter_changed={}",
2632 result.peer_peripheral.is_some(),
2633 result.counter_changed
2634 );
2635 if let Some(ref p) = result.peer_peripheral {
2636 log::info!("Peripheral callsign: '{}'", p.callsign_str());
2637 }
2638
2639 self.peer_manager.record_sync(source_node, now_ms);
2641
2642 if result.is_emergency() {
2644 self.notify(HiveEvent::EmergencyReceived {
2645 from_node: result.source_node,
2646 });
2647 } else if result.is_ack() {
2648 self.notify(HiveEvent::AckReceived {
2649 from_node: result.source_node,
2650 });
2651 }
2652
2653 if result.counter_changed {
2654 self.notify(HiveEvent::DocumentSynced {
2655 from_node: result.source_node,
2656 total_count: result.total_count,
2657 });
2658 }
2659
2660 let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2661 DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2662
2663 Some(DataReceivedResult {
2664 source_node: result.source_node,
2665 is_emergency: result.is_emergency(),
2666 is_ack: result.is_ack(),
2667 counter_changed: result.counter_changed,
2668 emergency_changed: result.emergency_changed,
2669 total_count: result.total_count,
2670 event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2671 relay_data: None,
2672 origin_node: None,
2673 hop_count: 0,
2674 callsign,
2675 battery_percent,
2676 heart_rate,
2677 event_type,
2678 latitude,
2679 longitude,
2680 altitude,
2681 })
2682 }
2683
2684 fn process_document_data(
2686 &self,
2687 source_node: NodeId,
2688 data: &[u8],
2689 now_ms: u64,
2690 relay_data: Option<Vec<u8>>,
2691 origin_node: Option<NodeId>,
2692 hop_count: u8,
2693 ) -> Option<DataReceivedResult> {
2694 let source_hint = format!("node:{:08X}", source_node.as_u32());
2696 let decrypted = self.decrypt_document(data, Some(&source_hint))?;
2697
2698 if DeltaDocument::is_delta_document(&decrypted) {
2700 return self.process_delta_document_internal(
2701 source_node,
2702 &decrypted,
2703 now_ms,
2704 relay_data,
2705 origin_node,
2706 hop_count,
2707 );
2708 }
2709
2710 let result = self.document_sync.merge_document(&decrypted)?;
2712
2713 if let Some(ref peripheral) = result.peer_peripheral {
2715 if let Ok(mut peripherals) = self.peer_peripherals.write() {
2716 peripherals.insert(result.source_node, peripheral.clone());
2717 }
2718 }
2719
2720 self.peer_manager.record_sync(source_node, now_ms);
2722
2723 if result.is_emergency() {
2725 self.notify(HiveEvent::EmergencyReceived {
2726 from_node: result.source_node,
2727 });
2728 } else if result.is_ack() {
2729 self.notify(HiveEvent::AckReceived {
2730 from_node: result.source_node,
2731 });
2732 }
2733
2734 if result.counter_changed {
2735 self.notify(HiveEvent::DocumentSynced {
2736 from_node: result.source_node,
2737 total_count: result.total_count,
2738 });
2739 }
2740
2741 if relay_data.is_some() {
2743 let relay_targets = self.get_relay_targets(Some(source_node));
2744 self.notify(HiveEvent::MessageRelayed {
2745 origin_node: origin_node.unwrap_or(result.source_node),
2746 relay_count: relay_targets.len(),
2747 hop_count,
2748 });
2749 }
2750
2751 let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2752 DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2753
2754 Some(DataReceivedResult {
2755 source_node: result.source_node,
2756 is_emergency: result.is_emergency(),
2757 is_ack: result.is_ack(),
2758 counter_changed: result.counter_changed,
2759 emergency_changed: result.emergency_changed,
2760 total_count: result.total_count,
2761 event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2762 relay_data,
2763 origin_node,
2764 hop_count,
2765 callsign,
2766 battery_percent,
2767 heart_rate,
2768 event_type,
2769 latitude,
2770 longitude,
2771 altitude,
2772 })
2773 }
2774
2775 fn handle_relay_envelope(
2777 &self,
2778 source_node: NodeId,
2779 data: &[u8],
2780 now_ms: u64,
2781 ) -> Option<DataReceivedResult> {
2782 let decision = self.process_relay_envelope(data, source_node, now_ms)?;
2784
2785 let relay_data = if decision.should_relay {
2787 decision.relay_data()
2788 } else {
2789 None
2790 };
2791
2792 self.process_document_data(
2794 source_node,
2795 &decision.payload,
2796 now_ms,
2797 relay_data,
2798 Some(decision.origin_node),
2799 decision.hop_count,
2800 )
2801 }
2802
2803 pub fn on_ble_data(
2812 &self,
2813 identifier: &str,
2814 data: &[u8],
2815 now_ms: u64,
2816 ) -> Option<DataReceivedResult> {
2817 if data.len() >= 2 {
2819 match data[0] {
2820 KEY_EXCHANGE_MARKER => {
2821 let _response = self.handle_key_exchange(data, now_ms);
2822 return None;
2823 }
2824 PEER_E2EE_MARKER => {
2825 let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
2826 return None;
2827 }
2828 RELAY_ENVELOPE_MARKER => {
2829 return self.handle_relay_envelope_with_incoming(identifier, data, now_ms);
2831 }
2832 _ => {}
2833 }
2834 }
2835
2836 self.process_incoming_document(identifier, data, now_ms, None, None, 0)
2838 }
2839
2840 fn process_incoming_document(
2842 &self,
2843 identifier: &str,
2844 data: &[u8],
2845 now_ms: u64,
2846 relay_data: Option<Vec<u8>>,
2847 origin_node: Option<NodeId>,
2848 hop_count: u8,
2849 ) -> Option<DataReceivedResult> {
2850 let decrypted = self.decrypt_document(data, Some(identifier))?;
2852
2853 let result = self.document_sync.merge_document(&decrypted)?;
2855
2856 self.peer_manager.record_sync(result.source_node, now_ms);
2858
2859 if origin_node.is_none() {
2864 let is_new =
2866 self.peer_manager
2867 .on_incoming_connection(identifier, result.source_node, now_ms);
2868
2869 {
2871 let mut graph = self.connection_graph.lock().unwrap();
2872 if is_new {
2873 graph.on_discovered(
2874 result.source_node,
2875 identifier.to_string(),
2876 None,
2877 Some(self.config.mesh_id.clone()),
2878 -50, now_ms,
2880 );
2881 }
2882 graph.on_connected(result.source_node, now_ms);
2883 }
2884 }
2885
2886 if result.is_emergency() {
2888 self.notify(HiveEvent::EmergencyReceived {
2889 from_node: result.source_node,
2890 });
2891 } else if result.is_ack() {
2892 self.notify(HiveEvent::AckReceived {
2893 from_node: result.source_node,
2894 });
2895 }
2896
2897 if result.counter_changed {
2898 self.notify(HiveEvent::DocumentSynced {
2899 from_node: result.source_node,
2900 total_count: result.total_count,
2901 });
2902 }
2903
2904 if relay_data.is_some() {
2906 let relay_targets = self.get_relay_targets(Some(result.source_node));
2907 self.notify(HiveEvent::MessageRelayed {
2908 origin_node: origin_node.unwrap_or(result.source_node),
2909 relay_count: relay_targets.len(),
2910 hop_count,
2911 });
2912 }
2913
2914 let (callsign, battery_percent, heart_rate, event_type, latitude, longitude, altitude) =
2915 DataReceivedResult::peripheral_fields(&result.peer_peripheral);
2916
2917 Some(DataReceivedResult {
2918 source_node: result.source_node,
2919 is_emergency: result.is_emergency(),
2920 is_ack: result.is_ack(),
2921 counter_changed: result.counter_changed,
2922 emergency_changed: result.emergency_changed,
2923 total_count: result.total_count,
2924 event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2925 relay_data,
2926 origin_node,
2927 hop_count,
2928 callsign,
2929 battery_percent,
2930 heart_rate,
2931 event_type,
2932 latitude,
2933 longitude,
2934 altitude,
2935 })
2936 }
2937
2938 fn handle_relay_envelope_with_incoming(
2940 &self,
2941 identifier: &str,
2942 data: &[u8],
2943 now_ms: u64,
2944 ) -> Option<DataReceivedResult> {
2945 let envelope = RelayEnvelope::decode(data)?;
2947
2948 if let Some(source_peer) = self.peer_manager.get_node_id(identifier) {
2951 if envelope.origin_node != source_peer && envelope.origin_node != self.node_id() {
2952 let is_new = self.connection_graph.lock().unwrap().on_relay_received(
2953 source_peer,
2954 envelope.origin_node,
2955 envelope.hop_count,
2956 now_ms,
2957 );
2958
2959 if is_new {
2960 log::debug!(
2961 "Discovered indirect peer {:08X} via {:08X} ({} hops)",
2962 envelope.origin_node.as_u32(),
2963 source_peer.as_u32(),
2964 envelope.hop_count
2965 );
2966 }
2967 }
2968 }
2969
2970 if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
2972 let stats = self
2974 .seen_cache
2975 .lock()
2976 .unwrap()
2977 .get_stats(&envelope.message_id);
2978 let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
2979
2980 self.notify(HiveEvent::DuplicateMessageDropped {
2981 origin_node: envelope.origin_node,
2982 seen_count,
2983 });
2984 return None;
2985 }
2986
2987 let (should_relay, relay_data) = if envelope.can_relay() && self.config.enable_relay {
2989 let relay_env = envelope.relay();
2990 (true, relay_env.map(|e| e.encode()))
2991 } else {
2992 if !envelope.can_relay() {
2993 self.notify(HiveEvent::MessageTtlExpired {
2994 origin_node: envelope.origin_node,
2995 hop_count: envelope.hop_count,
2996 });
2997 }
2998 (false, None)
2999 };
3000
3001 self.process_incoming_document(
3003 identifier,
3004 &envelope.payload,
3005 now_ms,
3006 if should_relay { relay_data } else { None },
3007 Some(envelope.origin_node),
3008 envelope.hop_count,
3009 )
3010 }
3011
3012 pub fn tick(&self, now_ms: u64) -> Option<Vec<u8>> {
3022 use std::sync::atomic::Ordering;
3023
3024 let now_ms_32 = now_ms as u32;
3026
3027 let last_cleanup = self.last_cleanup_ms.load(Ordering::Relaxed);
3029 let cleanup_elapsed = now_ms_32.wrapping_sub(last_cleanup);
3030 if cleanup_elapsed >= self.config.peer_config.cleanup_interval_ms as u32 {
3031 self.last_cleanup_ms.store(now_ms_32, Ordering::Relaxed);
3032 let removed = self.peer_manager.cleanup_stale(now_ms);
3033 for node_id in &removed {
3034 self.notify(HiveEvent::PeerLost { node_id: *node_id });
3035 }
3036 if !removed.is_empty() {
3037 self.notify_mesh_state_changed();
3038 }
3039
3040 {
3042 let mut graph = self.connection_graph.lock().unwrap();
3043 let newly_lost = graph.tick(now_ms);
3044 graph.cleanup_lost(self.config.peer_config.peer_timeout_ms, now_ms);
3046 drop(graph);
3047
3048 for node_id in newly_lost {
3051 if !removed.contains(&node_id) {
3053 self.notify(HiveEvent::PeerLost { node_id });
3054 }
3055 }
3056 }
3057 }
3058
3059 let last_sync = self.last_sync_ms.load(Ordering::Relaxed);
3061 let sync_elapsed = now_ms_32.wrapping_sub(last_sync);
3062 if sync_elapsed >= self.config.sync_interval_ms as u32 {
3063 self.last_sync_ms.store(now_ms_32, Ordering::Relaxed);
3064 if self.peer_manager.connected_count() > 0 {
3066 let doc = self.document_sync.build_document();
3067 return Some(self.encrypt_document(&doc));
3068 }
3069 }
3070
3071 None
3072 }
3073
3074 pub fn tick_with_peer_deltas(&self, now_ms: u64) -> Vec<(NodeId, Vec<u8>)> {
3083 use std::sync::atomic::Ordering;
3084 let now_ms_32 = now_ms as u32;
3085
3086 let last_cleanup = self.last_cleanup_ms.load(Ordering::Relaxed);
3088 let cleanup_elapsed = now_ms_32.wrapping_sub(last_cleanup);
3089 if cleanup_elapsed >= self.config.peer_config.cleanup_interval_ms as u32 {
3090 self.last_cleanup_ms.store(now_ms_32, Ordering::Relaxed);
3091 let removed = self.peer_manager.cleanup_stale(now_ms);
3092 for node_id in &removed {
3093 self.notify(HiveEvent::PeerLost { node_id: *node_id });
3094 }
3095 if !removed.is_empty() {
3096 self.notify_mesh_state_changed();
3097 }
3098
3099 {
3101 let mut graph = self.connection_graph.lock().unwrap();
3102 let newly_lost = graph.tick(now_ms);
3103 graph.cleanup_lost(self.config.peer_config.peer_timeout_ms, now_ms);
3104 drop(graph);
3105
3106 for node_id in newly_lost {
3107 if !removed.contains(&node_id) {
3108 self.notify(HiveEvent::PeerLost { node_id });
3109 }
3110 }
3111 }
3112 }
3113
3114 let last_sync = self.last_sync_ms.load(Ordering::Relaxed);
3116 let sync_elapsed = now_ms_32.wrapping_sub(last_sync);
3117 if sync_elapsed >= self.config.sync_interval_ms as u32 {
3118 self.last_sync_ms.store(now_ms_32, Ordering::Relaxed);
3119
3120 let doc = self.document_sync.build_document();
3122 let encrypted = self.encrypt_document(&doc);
3123 let mut results = Vec::new();
3124 for peer in self.get_connected_peers() {
3125 results.push((peer.node_id, encrypted.clone()));
3126 }
3127 return results;
3128 }
3129
3130 Vec::new()
3131 }
3132
3133 pub fn get_peers(&self) -> Vec<HivePeer> {
3137 self.peer_manager.get_peers()
3138 }
3139
3140 pub fn get_connected_peers(&self) -> Vec<HivePeer> {
3142 self.peer_manager.get_connected_peers()
3143 }
3144
3145 pub fn get_peer(&self, node_id: NodeId) -> Option<HivePeer> {
3147 self.peer_manager.get_peer(node_id)
3148 }
3149
3150 pub fn peer_count(&self) -> usize {
3152 self.peer_manager.peer_count()
3153 }
3154
3155 pub fn connected_count(&self) -> usize {
3157 self.peer_manager.connected_count()
3158 }
3159
3160 pub fn matches_mesh(&self, device_mesh_id: Option<&str>) -> bool {
3162 self.peer_manager.matches_mesh(device_mesh_id)
3163 }
3164
3165 pub fn get_connection_graph(&self) -> Vec<PeerConnectionState> {
3189 self.connection_graph.lock().unwrap().get_all_owned()
3190 }
3191
3192 pub fn get_peer_connection_state(&self, node_id: NodeId) -> Option<PeerConnectionState> {
3194 self.connection_graph
3195 .lock()
3196 .unwrap()
3197 .get_peer(node_id)
3198 .cloned()
3199 }
3200
3201 pub fn get_connected_states(&self) -> Vec<PeerConnectionState> {
3203 self.connection_graph
3204 .lock()
3205 .unwrap()
3206 .get_connected()
3207 .into_iter()
3208 .cloned()
3209 .collect()
3210 }
3211
3212 pub fn get_degraded_peers(&self) -> Vec<PeerConnectionState> {
3214 self.connection_graph
3215 .lock()
3216 .unwrap()
3217 .get_degraded()
3218 .into_iter()
3219 .cloned()
3220 .collect()
3221 }
3222
3223 pub fn get_recently_disconnected(
3227 &self,
3228 within_ms: u64,
3229 now_ms: u64,
3230 ) -> Vec<PeerConnectionState> {
3231 self.connection_graph
3232 .lock()
3233 .unwrap()
3234 .get_recently_disconnected(within_ms, now_ms)
3235 .into_iter()
3236 .cloned()
3237 .collect()
3238 }
3239
3240 pub fn get_lost_peers(&self) -> Vec<PeerConnectionState> {
3242 self.connection_graph
3243 .lock()
3244 .unwrap()
3245 .get_lost()
3246 .into_iter()
3247 .cloned()
3248 .collect()
3249 }
3250
3251 pub fn get_connection_state_counts(&self) -> StateCountSummary {
3253 self.connection_graph.lock().unwrap().state_counts()
3254 }
3255
3256 pub fn get_indirect_peers(&self) -> Vec<IndirectPeer> {
3264 self.connection_graph
3265 .lock()
3266 .unwrap()
3267 .get_indirect_peers_owned()
3268 }
3269
3270 pub fn get_peer_degree(&self, node_id: NodeId) -> Option<PeerDegree> {
3277 self.connection_graph.lock().unwrap().peer_degree(node_id)
3278 }
3279
3280 pub fn get_full_state_counts(&self) -> FullStateCountSummary {
3285 self.connection_graph.lock().unwrap().full_state_counts()
3286 }
3287
3288 pub fn get_paths_to_peer(&self, node_id: NodeId) -> Vec<(NodeId, u8)> {
3293 self.connection_graph.lock().unwrap().get_paths_to(node_id)
3294 }
3295
3296 pub fn is_peer_known(&self, node_id: NodeId) -> bool {
3298 self.connection_graph.lock().unwrap().is_known(node_id)
3299 }
3300
3301 pub fn indirect_peer_count(&self) -> usize {
3303 self.connection_graph.lock().unwrap().indirect_peer_count()
3304 }
3305
3306 pub fn cleanup_indirect_peers(&self, now_ms: u64) -> Vec<NodeId> {
3311 self.connection_graph
3312 .lock()
3313 .unwrap()
3314 .cleanup_indirect(now_ms)
3315 }
3316
3317 pub fn total_count(&self) -> u64 {
3319 self.document_sync.total_count()
3320 }
3321
3322 pub fn document_version(&self) -> u32 {
3324 self.document_sync.version()
3325 }
3326
3327 pub fn version(&self) -> u32 {
3329 self.document_sync.version()
3330 }
3331
3332 pub fn update_health(&self, battery_percent: u8) {
3334 self.document_sync.update_health(battery_percent);
3335 }
3336
3337 pub fn update_activity(&self, activity: u8) {
3339 self.document_sync.update_activity(activity);
3340 }
3341
3342 pub fn update_health_full(&self, battery_percent: u8, activity: u8) {
3344 self.document_sync
3345 .update_health_full(battery_percent, activity);
3346 }
3347
3348 pub fn update_heart_rate(&self, heart_rate: u8) {
3350 self.document_sync.update_heart_rate(heart_rate);
3351 }
3352
3353 pub fn update_location(&self, latitude: f32, longitude: f32, altitude: Option<f32>) {
3355 self.document_sync
3356 .update_location(latitude, longitude, altitude);
3357 }
3358
3359 pub fn clear_location(&self) {
3361 self.document_sync.clear_location();
3362 }
3363
3364 pub fn update_callsign(&self, callsign: &str) {
3366 self.document_sync.update_callsign(callsign);
3367 }
3368
3369 pub fn set_peripheral_event(&self, event_type: EventType, timestamp: u64) {
3371 self.document_sync
3372 .set_peripheral_event(event_type, timestamp);
3373 }
3374
3375 pub fn clear_peripheral_event(&self) {
3377 self.document_sync.clear_peripheral_event();
3378 }
3379
3380 #[allow(clippy::too_many_arguments)]
3385 pub fn update_peripheral_state(
3386 &self,
3387 callsign: &str,
3388 battery_percent: u8,
3389 heart_rate: Option<u8>,
3390 latitude: Option<f32>,
3391 longitude: Option<f32>,
3392 altitude: Option<f32>,
3393 event_type: Option<EventType>,
3394 timestamp: u64,
3395 ) {
3396 self.document_sync.update_peripheral_state(
3397 callsign,
3398 battery_percent,
3399 heart_rate,
3400 latitude,
3401 longitude,
3402 altitude,
3403 event_type,
3404 timestamp,
3405 );
3406 }
3407
3408 pub fn build_document(&self) -> Vec<u8> {
3412 let doc = self.document_sync.build_document();
3413 self.encrypt_document(&doc)
3414 }
3415
3416 pub fn peers_needing_sync(&self, now_ms: u64) -> Vec<HivePeer> {
3418 self.peer_manager.peers_needing_sync(now_ms)
3419 }
3420
3421 fn notify(&self, event: HiveEvent) {
3424 self.observers.notify(event);
3425 }
3426
3427 fn notify_mesh_state_changed(&self) {
3428 self.notify(HiveEvent::MeshStateChanged {
3429 peer_count: self.peer_manager.peer_count(),
3430 connected_count: self.peer_manager.connected_count(),
3431 });
3432 }
3433
3434 pub fn check_canned_message(&self, source_node: u32, timestamp: u64, _ttl_ms: u64) -> bool {
3454 let mut id_bytes = [0u8; 16];
3457 id_bytes[0..4].copy_from_slice(&source_node.to_le_bytes());
3458 id_bytes[4..12].copy_from_slice(×tamp.to_le_bytes());
3459 let message_id = crate::relay::MessageId::from_bytes(id_bytes);
3460
3461 let seen = self.seen_cache.lock().unwrap();
3463 !seen.has_seen(&message_id)
3464 }
3465
3466 pub fn mark_canned_message_seen(&self, source_node: u32, timestamp: u64) {
3471 let now = std::time::SystemTime::now()
3472 .duration_since(std::time::UNIX_EPOCH)
3473 .map(|d| d.as_millis() as u64)
3474 .unwrap_or(0);
3475
3476 let mut id_bytes = [0u8; 16];
3478 id_bytes[0..4].copy_from_slice(&source_node.to_le_bytes());
3479 id_bytes[4..12].copy_from_slice(×tamp.to_le_bytes());
3480 let message_id = crate::relay::MessageId::from_bytes(id_bytes);
3481 let origin = NodeId::new(source_node);
3482
3483 let mut seen = self.seen_cache.lock().unwrap();
3484 seen.mark_seen(message_id, origin, now);
3485 }
3486
3487 pub fn get_connected_peer_identifiers(&self) -> Vec<String> {
3492 self.peer_manager.get_connected_identifiers()
3493 }
3494}
3495
3496#[derive(Debug, Clone)]
3498pub struct DataReceivedResult {
3499 pub source_node: NodeId,
3501
3502 pub is_emergency: bool,
3504
3505 pub is_ack: bool,
3507
3508 pub counter_changed: bool,
3510
3511 pub emergency_changed: bool,
3513
3514 pub total_count: u64,
3516
3517 pub event_timestamp: u64,
3519
3520 pub relay_data: Option<Vec<u8>>,
3525
3526 pub origin_node: Option<NodeId>,
3528
3529 pub hop_count: u8,
3531
3532 pub callsign: Option<String>,
3535
3536 pub battery_percent: Option<u8>,
3538
3539 pub heart_rate: Option<u8>,
3541
3542 pub event_type: Option<u8>,
3544
3545 pub latitude: Option<f32>,
3547
3548 pub longitude: Option<f32>,
3550
3551 pub altitude: Option<f32>,
3553}
3554
3555impl DataReceivedResult {
3556 #[allow(clippy::type_complexity)]
3558 fn peripheral_fields(
3559 peripheral: &Option<crate::sync::crdt::Peripheral>,
3560 ) -> (
3561 Option<String>,
3562 Option<u8>,
3563 Option<u8>,
3564 Option<u8>,
3565 Option<f32>,
3566 Option<f32>,
3567 Option<f32>,
3568 ) {
3569 match peripheral {
3570 Some(p) => {
3571 let callsign = {
3572 let s = p.callsign_str();
3573 if s.is_empty() {
3574 None
3575 } else {
3576 Some(s.to_string())
3577 }
3578 };
3579 let battery = if p.health.battery_percent > 0 {
3580 Some(p.health.battery_percent)
3581 } else {
3582 None
3583 };
3584 let heart_rate = p.health.heart_rate;
3585 let event_type = p.last_event.as_ref().map(|e| e.event_type as u8);
3586 let (lat, lon, alt) = match &p.location {
3587 Some(loc) => (Some(loc.latitude), Some(loc.longitude), loc.altitude),
3588 None => (None, None, None),
3589 };
3590 (callsign, battery, heart_rate, event_type, lat, lon, alt)
3591 }
3592 None => (None, None, None, None, None, None, None),
3593 }
3594 }
3595}
3596
3597#[derive(Debug, Clone)]
3599pub struct RelayDecision {
3600 pub payload: Vec<u8>,
3602
3603 pub origin_node: NodeId,
3605
3606 pub hop_count: u8,
3608
3609 pub should_relay: bool,
3611
3612 pub relay_envelope: Option<RelayEnvelope>,
3616}
3617
3618impl RelayDecision {
3619 pub fn relay_data(&self) -> Option<Vec<u8>> {
3623 self.relay_envelope.as_ref().map(|e| e.encode())
3624 }
3625}
3626
3627#[cfg(all(test, feature = "std"))]
3628mod tests {
3629 use super::*;
3630 use crate::observer::CollectingObserver;
3631
3632 const TEST_TIMESTAMP: u64 = 1705276800000;
3634
3635 fn create_mesh(node_id: u32, callsign: &str) -> HiveMesh {
3636 let config = HiveMeshConfig::new(NodeId::new(node_id), callsign, "TEST");
3637 HiveMesh::new(config)
3638 }
3639
3640 #[test]
3641 fn test_mesh_creation() {
3642 let mesh = create_mesh(0x12345678, "ALPHA-1");
3643
3644 assert_eq!(mesh.node_id().as_u32(), 0x12345678);
3645 assert_eq!(mesh.callsign(), "ALPHA-1");
3646 assert_eq!(mesh.mesh_id(), "TEST");
3647 assert_eq!(mesh.device_name(), "HIVE_TEST-12345678");
3648 }
3649
3650 #[test]
3651 fn test_peer_discovery() {
3652 let mesh = create_mesh(0x11111111, "ALPHA-1");
3653 let observer = Arc::new(CollectingObserver::new());
3654 mesh.add_observer(observer.clone());
3655
3656 let peer = mesh.on_ble_discovered(
3658 "device-uuid",
3659 Some("HIVE_TEST-22222222"),
3660 -65,
3661 Some("TEST"),
3662 1000,
3663 );
3664
3665 assert!(peer.is_some());
3666 let peer = peer.unwrap();
3667 assert_eq!(peer.node_id.as_u32(), 0x22222222);
3668
3669 let events = observer.events();
3671 assert!(events
3672 .iter()
3673 .any(|e| matches!(e, HiveEvent::PeerDiscovered { .. })));
3674 assert!(events
3675 .iter()
3676 .any(|e| matches!(e, HiveEvent::MeshStateChanged { .. })));
3677 }
3678
3679 #[test]
3680 fn test_connection_lifecycle() {
3681 let mesh = create_mesh(0x11111111, "ALPHA-1");
3682 let observer = Arc::new(CollectingObserver::new());
3683 mesh.add_observer(observer.clone());
3684
3685 mesh.on_ble_discovered(
3687 "device-uuid",
3688 Some("HIVE_TEST-22222222"),
3689 -65,
3690 Some("TEST"),
3691 1000,
3692 );
3693
3694 let node_id = mesh.on_ble_connected("device-uuid", 2000);
3695 assert_eq!(node_id, Some(NodeId::new(0x22222222)));
3696 assert_eq!(mesh.connected_count(), 1);
3697
3698 let node_id = mesh.on_ble_disconnected("device-uuid", DisconnectReason::RemoteRequest);
3700 assert_eq!(node_id, Some(NodeId::new(0x22222222)));
3701 assert_eq!(mesh.connected_count(), 0);
3702
3703 let events = observer.events();
3705 assert!(events
3706 .iter()
3707 .any(|e| matches!(e, HiveEvent::PeerConnected { .. })));
3708 assert!(events
3709 .iter()
3710 .any(|e| matches!(e, HiveEvent::PeerDisconnected { .. })));
3711 }
3712
3713 #[test]
3714 fn test_emergency_flow() {
3715 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
3716 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
3717
3718 let observer2 = Arc::new(CollectingObserver::new());
3719 mesh2.add_observer(observer2.clone());
3720
3721 let doc = mesh1.send_emergency(TEST_TIMESTAMP);
3723 assert!(mesh1.is_emergency_active());
3724
3725 let result =
3727 mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
3728
3729 assert!(result.is_some());
3730 let result = result.unwrap();
3731 assert!(result.is_emergency);
3732 assert_eq!(result.source_node.as_u32(), 0x11111111);
3733
3734 let events = observer2.events();
3736 assert!(events
3737 .iter()
3738 .any(|e| matches!(e, HiveEvent::EmergencyReceived { .. })));
3739 }
3740
3741 #[test]
3742 fn test_ack_flow() {
3743 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
3744 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
3745
3746 let observer2 = Arc::new(CollectingObserver::new());
3747 mesh2.add_observer(observer2.clone());
3748
3749 let doc = mesh1.send_ack(TEST_TIMESTAMP);
3751 assert!(mesh1.is_ack_active());
3752
3753 let result =
3755 mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
3756
3757 assert!(result.is_some());
3758 let result = result.unwrap();
3759 assert!(result.is_ack);
3760
3761 let events = observer2.events();
3763 assert!(events
3764 .iter()
3765 .any(|e| matches!(e, HiveEvent::AckReceived { .. })));
3766 }
3767
3768 #[test]
3769 fn test_tick_cleanup() {
3770 let config = HiveMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
3771 .with_peer_timeout(10_000);
3772 let mesh = HiveMesh::new(config);
3773
3774 let observer = Arc::new(CollectingObserver::new());
3775 mesh.add_observer(observer.clone());
3776
3777 mesh.on_ble_discovered(
3779 "device-uuid",
3780 Some("HIVE_TEST-22222222"),
3781 -65,
3782 Some("TEST"),
3783 1000,
3784 );
3785 assert_eq!(mesh.peer_count(), 1);
3786
3787 mesh.tick(5000);
3789 assert_eq!(mesh.peer_count(), 1);
3790
3791 mesh.tick(20000);
3793 assert_eq!(mesh.peer_count(), 0);
3794
3795 let events = observer.events();
3797 assert!(events
3798 .iter()
3799 .any(|e| matches!(e, HiveEvent::PeerLost { .. })));
3800 }
3801
3802 #[test]
3803 fn test_tick_sync_broadcast() {
3804 let config = HiveMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
3805 .with_sync_interval(5000);
3806 let mesh = HiveMesh::new(config);
3807
3808 mesh.on_ble_discovered(
3810 "device-uuid",
3811 Some("HIVE_TEST-22222222"),
3812 -65,
3813 Some("TEST"),
3814 1000,
3815 );
3816 mesh.on_ble_connected("device-uuid", 1000);
3817
3818 let _result = mesh.tick(0);
3820 let result = mesh.tick(3000);
3824 assert!(result.is_none());
3825
3826 let result = mesh.tick(6000);
3828 assert!(result.is_some());
3829
3830 let result = mesh.tick(6100);
3832 assert!(result.is_none());
3833
3834 let result = mesh.tick(12000);
3836 assert!(result.is_some());
3837 }
3838
3839 #[test]
3840 fn test_incoming_connection() {
3841 let mesh = create_mesh(0x11111111, "ALPHA-1");
3842 let observer = Arc::new(CollectingObserver::new());
3843 mesh.add_observer(observer.clone());
3844
3845 let is_new = mesh.on_incoming_connection("central-uuid", NodeId::new(0x22222222), 1000);
3847
3848 assert!(is_new);
3849 assert_eq!(mesh.peer_count(), 1);
3850 assert_eq!(mesh.connected_count(), 1);
3851
3852 let events = observer.events();
3854 assert!(events
3855 .iter()
3856 .any(|e| matches!(e, HiveEvent::PeerDiscovered { .. })));
3857 assert!(events
3858 .iter()
3859 .any(|e| matches!(e, HiveEvent::PeerConnected { .. })));
3860 }
3861
3862 #[test]
3863 fn test_mesh_filtering() {
3864 let mesh = create_mesh(0x11111111, "ALPHA-1");
3865
3866 let peer = mesh.on_ble_discovered(
3868 "device-uuid-1",
3869 Some("HIVE_OTHER-22222222"),
3870 -65,
3871 Some("OTHER"),
3872 1000,
3873 );
3874 assert!(peer.is_none());
3875 assert_eq!(mesh.peer_count(), 0);
3876
3877 let peer = mesh.on_ble_discovered(
3879 "device-uuid-2",
3880 Some("HIVE_TEST-33333333"),
3881 -65,
3882 Some("TEST"),
3883 1000,
3884 );
3885 assert!(peer.is_some());
3886 assert_eq!(mesh.peer_count(), 1);
3887 }
3888
3889 fn create_encrypted_mesh(node_id: u32, callsign: &str, secret: [u8; 32]) -> HiveMesh {
3892 let config =
3893 HiveMeshConfig::new(NodeId::new(node_id), callsign, "TEST").with_encryption(secret);
3894 HiveMesh::new(config)
3895 }
3896
3897 #[test]
3898 fn test_encryption_enabled() {
3899 let secret = [0x42u8; 32];
3900 let mesh = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
3901
3902 assert!(mesh.is_encryption_enabled());
3903 }
3904
3905 #[test]
3906 fn test_encryption_disabled_by_default() {
3907 let mesh = create_mesh(0x11111111, "ALPHA-1");
3908
3909 assert!(!mesh.is_encryption_enabled());
3910 }
3911
3912 #[test]
3913 fn test_encrypted_document_exchange() {
3914 let secret = [0x42u8; 32];
3915 let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
3916 let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret);
3917
3918 let doc = mesh1.build_document();
3920
3921 assert!(doc.len() >= 2);
3923 assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
3924
3925 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
3927
3928 assert!(result.is_some());
3929 let result = result.unwrap();
3930 assert_eq!(result.source_node.as_u32(), 0x11111111);
3931 }
3932
3933 #[test]
3934 fn test_encrypted_emergency_exchange() {
3935 let secret = [0x42u8; 32];
3936 let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
3937 let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret);
3938
3939 let observer = Arc::new(CollectingObserver::new());
3940 mesh2.add_observer(observer.clone());
3941
3942 let doc = mesh1.send_emergency(TEST_TIMESTAMP);
3944
3945 let result =
3947 mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
3948
3949 assert!(result.is_some());
3950 let result = result.unwrap();
3951 assert!(result.is_emergency);
3952
3953 let events = observer.events();
3955 assert!(events
3956 .iter()
3957 .any(|e| matches!(e, HiveEvent::EmergencyReceived { .. })));
3958 }
3959
3960 #[test]
3961 fn test_wrong_key_fails_decrypt() {
3962 let secret1 = [0x42u8; 32];
3963 let secret2 = [0x43u8; 32]; let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret1);
3965 let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret2);
3966
3967 let doc = mesh1.build_document();
3969
3970 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
3972
3973 assert!(result.is_none());
3974 }
3975
3976 #[test]
3977 fn test_unencrypted_mesh_can_read_unencrypted() {
3978 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
3979 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
3980
3981 let doc = mesh1.build_document();
3983
3984 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
3986
3987 assert!(result.is_some());
3988 }
3989
3990 #[test]
3991 fn test_encrypted_mesh_can_receive_unencrypted() {
3992 let secret = [0x42u8; 32];
3994 let mesh1 = create_mesh(0x11111111, "ALPHA-1"); let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret); let doc = mesh1.build_document();
3999
4000 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4002
4003 assert!(result.is_some());
4004 }
4005
4006 #[test]
4007 fn test_unencrypted_mesh_cannot_receive_encrypted() {
4008 let secret = [0x42u8; 32];
4009 let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret); let mesh2 = create_mesh(0x22222222, "BRAVO-1"); let doc = mesh1.build_document();
4014
4015 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4017
4018 assert!(result.is_none());
4019 }
4020
4021 #[test]
4022 fn test_enable_disable_encryption() {
4023 let mut mesh = create_mesh(0x11111111, "ALPHA-1");
4024
4025 assert!(!mesh.is_encryption_enabled());
4026
4027 let secret = [0x42u8; 32];
4029 mesh.enable_encryption(&secret);
4030 assert!(mesh.is_encryption_enabled());
4031
4032 let doc = mesh.build_document();
4034 assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
4035
4036 mesh.disable_encryption();
4038 assert!(!mesh.is_encryption_enabled());
4039
4040 let doc = mesh.build_document();
4042 assert_ne!(doc[0], crate::document::ENCRYPTED_MARKER);
4043 }
4044
4045 #[test]
4046 fn test_encryption_overhead() {
4047 let secret = [0x42u8; 32];
4048 let mesh_encrypted = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4049 let mesh_unencrypted = create_mesh(0x22222222, "BRAVO-1");
4050
4051 let doc_encrypted = mesh_encrypted.build_document();
4052 let doc_unencrypted = mesh_unencrypted.build_document();
4053
4054 let overhead = doc_encrypted.len() - doc_unencrypted.len();
4060 assert_eq!(overhead, 30); }
4062
4063 #[test]
4066 fn test_peer_e2ee_enable_disable() {
4067 let mesh = create_mesh(0x11111111, "ALPHA-1");
4068
4069 assert!(!mesh.is_peer_e2ee_enabled());
4070 assert!(mesh.peer_e2ee_public_key().is_none());
4071
4072 mesh.enable_peer_e2ee();
4073 assert!(mesh.is_peer_e2ee_enabled());
4074 assert!(mesh.peer_e2ee_public_key().is_some());
4075
4076 mesh.disable_peer_e2ee();
4077 assert!(!mesh.is_peer_e2ee_enabled());
4078 }
4079
4080 #[test]
4081 fn test_peer_e2ee_initiate_session() {
4082 let mesh = create_mesh(0x11111111, "ALPHA-1");
4083 mesh.enable_peer_e2ee();
4084
4085 let key_exchange = mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4086 assert!(key_exchange.is_some());
4087
4088 let key_exchange = key_exchange.unwrap();
4089 assert_eq!(key_exchange[0], crate::document::KEY_EXCHANGE_MARKER);
4091
4092 assert_eq!(mesh.peer_e2ee_session_count(), 1);
4094 assert_eq!(mesh.peer_e2ee_established_count(), 0);
4095 }
4096
4097 #[test]
4098 fn test_peer_e2ee_full_handshake() {
4099 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4100 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4101
4102 mesh1.enable_peer_e2ee();
4103 mesh2.enable_peer_e2ee();
4104
4105 let observer1 = Arc::new(CollectingObserver::new());
4106 let observer2 = Arc::new(CollectingObserver::new());
4107 mesh1.add_observer(observer1.clone());
4108 mesh2.add_observer(observer2.clone());
4109
4110 let key_exchange1 = mesh1
4112 .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4113 .unwrap();
4114
4115 let response = mesh2.handle_key_exchange(&key_exchange1, 1000);
4117 assert!(response.is_some());
4118
4119 assert!(mesh2.has_peer_e2ee_session(NodeId::new(0x11111111)));
4121
4122 let key_exchange2 = response.unwrap();
4124 let _ = mesh1.handle_key_exchange(&key_exchange2, 1000);
4125
4126 assert!(mesh1.has_peer_e2ee_session(NodeId::new(0x22222222)));
4128
4129 let events1 = observer1.events();
4131 assert!(events1
4132 .iter()
4133 .any(|e| matches!(e, HiveEvent::PeerE2eeEstablished { .. })));
4134
4135 let events2 = observer2.events();
4136 assert!(events2
4137 .iter()
4138 .any(|e| matches!(e, HiveEvent::PeerE2eeEstablished { .. })));
4139 }
4140
4141 #[test]
4142 fn test_peer_e2ee_encrypt_decrypt() {
4143 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4144 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4145
4146 mesh1.enable_peer_e2ee();
4147 mesh2.enable_peer_e2ee();
4148
4149 let key_exchange1 = mesh1
4151 .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4152 .unwrap();
4153 let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4154 mesh1.handle_key_exchange(&key_exchange2, 1000);
4155
4156 let plaintext = b"Secret message from mesh1";
4158 let encrypted = mesh1.send_peer_e2ee(NodeId::new(0x22222222), plaintext, 2000);
4159 assert!(encrypted.is_some());
4160
4161 let encrypted = encrypted.unwrap();
4162 assert_eq!(encrypted[0], crate::document::PEER_E2EE_MARKER);
4164
4165 let observer2 = Arc::new(CollectingObserver::new());
4167 mesh2.add_observer(observer2.clone());
4168
4169 let decrypted = mesh2.handle_peer_e2ee_message(&encrypted, 2000);
4170 assert!(decrypted.is_some());
4171 assert_eq!(decrypted.unwrap(), plaintext);
4172
4173 let events = observer2.events();
4175 assert!(events.iter().any(|e| matches!(
4176 e,
4177 HiveEvent::PeerE2eeMessageReceived { from_node, data }
4178 if from_node.as_u32() == 0x11111111 && data == plaintext
4179 )));
4180 }
4181
4182 #[test]
4183 fn test_peer_e2ee_bidirectional() {
4184 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4185 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4186
4187 mesh1.enable_peer_e2ee();
4188 mesh2.enable_peer_e2ee();
4189
4190 let key_exchange1 = mesh1
4192 .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4193 .unwrap();
4194 let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4195 mesh1.handle_key_exchange(&key_exchange2, 1000);
4196
4197 let msg1 = mesh1
4199 .send_peer_e2ee(NodeId::new(0x22222222), b"Hello from mesh1", 2000)
4200 .unwrap();
4201 let dec1 = mesh2.handle_peer_e2ee_message(&msg1, 2000).unwrap();
4202 assert_eq!(dec1, b"Hello from mesh1");
4203
4204 let msg2 = mesh2
4206 .send_peer_e2ee(NodeId::new(0x11111111), b"Hello from mesh2", 2000)
4207 .unwrap();
4208 let dec2 = mesh1.handle_peer_e2ee_message(&msg2, 2000).unwrap();
4209 assert_eq!(dec2, b"Hello from mesh2");
4210 }
4211
4212 #[test]
4213 fn test_peer_e2ee_close_session() {
4214 let mesh = create_mesh(0x11111111, "ALPHA-1");
4215 mesh.enable_peer_e2ee();
4216
4217 let observer = Arc::new(CollectingObserver::new());
4218 mesh.add_observer(observer.clone());
4219
4220 mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4222 assert_eq!(mesh.peer_e2ee_session_count(), 1);
4223
4224 mesh.close_peer_e2ee(NodeId::new(0x22222222));
4226
4227 let events = observer.events();
4229 assert!(events
4230 .iter()
4231 .any(|e| matches!(e, HiveEvent::PeerE2eeClosed { .. })));
4232 }
4233
4234 #[test]
4235 fn test_peer_e2ee_without_enabling() {
4236 let mesh = create_mesh(0x11111111, "ALPHA-1");
4237
4238 let result = mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
4240 assert!(result.is_none());
4241
4242 let result = mesh.send_peer_e2ee(NodeId::new(0x22222222), b"test", 1000);
4243 assert!(result.is_none());
4244
4245 assert!(!mesh.has_peer_e2ee_session(NodeId::new(0x22222222)));
4246 }
4247
4248 #[test]
4249 fn test_peer_e2ee_overhead() {
4250 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4251 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
4252
4253 mesh1.enable_peer_e2ee();
4254 mesh2.enable_peer_e2ee();
4255
4256 let key_exchange1 = mesh1
4258 .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
4259 .unwrap();
4260 let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
4261 mesh1.handle_key_exchange(&key_exchange2, 1000);
4262
4263 let plaintext = b"Test message";
4265 let encrypted = mesh1
4266 .send_peer_e2ee(NodeId::new(0x22222222), plaintext, 2000)
4267 .unwrap();
4268
4269 let overhead = encrypted.len() - plaintext.len();
4278 assert_eq!(overhead, 46);
4279 }
4280
4281 fn create_strict_encrypted_mesh(node_id: u32, callsign: &str, secret: [u8; 32]) -> HiveMesh {
4284 let config = HiveMeshConfig::new(NodeId::new(node_id), callsign, "TEST")
4285 .with_encryption(secret)
4286 .with_strict_encryption();
4287 HiveMesh::new(config)
4288 }
4289
4290 #[test]
4291 fn test_strict_encryption_enabled() {
4292 let secret = [0x42u8; 32];
4293 let mesh = create_strict_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4294
4295 assert!(mesh.is_encryption_enabled());
4296 assert!(mesh.is_strict_encryption_enabled());
4297 }
4298
4299 #[test]
4300 fn test_strict_encryption_disabled_by_default() {
4301 let secret = [0x42u8; 32];
4302 let mesh = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4303
4304 assert!(mesh.is_encryption_enabled());
4305 assert!(!mesh.is_strict_encryption_enabled());
4306 }
4307
4308 #[test]
4309 fn test_strict_encryption_requires_encryption_enabled() {
4310 let config = HiveMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4312 .with_strict_encryption(); let mesh = HiveMesh::new(config);
4314
4315 assert!(!mesh.is_encryption_enabled());
4316 assert!(!mesh.is_strict_encryption_enabled());
4317 }
4318
4319 #[test]
4320 fn test_strict_mode_accepts_encrypted_documents() {
4321 let secret = [0x42u8; 32];
4322 let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
4323 let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4324
4325 let doc = mesh1.build_document();
4327 assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
4328
4329 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4331 assert!(result.is_some());
4332 }
4333
4334 #[test]
4335 fn test_strict_mode_rejects_unencrypted_documents() {
4336 let secret = [0x42u8; 32];
4337 let mesh1 = create_mesh(0x11111111, "ALPHA-1"); let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret); let observer = Arc::new(CollectingObserver::new());
4341 mesh2.add_observer(observer.clone());
4342
4343 let doc = mesh1.build_document();
4345 assert_ne!(doc[0], crate::document::ENCRYPTED_MARKER);
4346
4347 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4349 assert!(result.is_none());
4350
4351 let events = observer.events();
4353 assert!(events.iter().any(|e| matches!(
4354 e,
4355 HiveEvent::SecurityViolation {
4356 kind: crate::observer::SecurityViolationKind::UnencryptedInStrictMode,
4357 ..
4358 }
4359 )));
4360 }
4361
4362 #[test]
4363 fn test_non_strict_mode_accepts_unencrypted_documents() {
4364 let secret = [0x42u8; 32];
4365 let mesh1 = create_mesh(0x11111111, "ALPHA-1"); let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret); let doc = mesh1.build_document();
4370
4371 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4373 assert!(result.is_some());
4374 }
4375
4376 #[test]
4377 fn test_strict_mode_security_violation_event_includes_source() {
4378 let secret = [0x42u8; 32];
4379 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
4380 let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret);
4381
4382 let observer = Arc::new(CollectingObserver::new());
4383 mesh2.add_observer(observer.clone());
4384
4385 let doc = mesh1.build_document();
4386
4387 mesh2.on_ble_discovered(
4389 "test-device-uuid",
4390 Some("HIVE_TEST-11111111"),
4391 -65,
4392 Some("TEST"),
4393 500,
4394 );
4395 mesh2.on_ble_connected("test-device-uuid", 600);
4396
4397 let _result = mesh2.on_ble_data_received("test-device-uuid", &doc, 1000);
4398
4399 let events = observer.events();
4401 let violation = events.iter().find(|e| {
4402 matches!(
4403 e,
4404 HiveEvent::SecurityViolation {
4405 kind: crate::observer::SecurityViolationKind::UnencryptedInStrictMode,
4406 ..
4407 }
4408 )
4409 });
4410 assert!(violation.is_some());
4411
4412 if let Some(HiveEvent::SecurityViolation { source, .. }) = violation {
4413 assert!(source.is_some());
4414 assert_eq!(source.as_ref().unwrap(), "test-device-uuid");
4415 }
4416 }
4417
4418 #[test]
4419 fn test_decryption_failure_emits_security_violation() {
4420 let secret1 = [0x42u8; 32];
4421 let secret2 = [0x43u8; 32]; let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret1);
4423 let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret2);
4424
4425 let observer = Arc::new(CollectingObserver::new());
4426 mesh2.add_observer(observer.clone());
4427
4428 let doc = mesh1.build_document();
4430
4431 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
4433 assert!(result.is_none());
4434
4435 let events = observer.events();
4437 assert!(events.iter().any(|e| matches!(
4438 e,
4439 HiveEvent::SecurityViolation {
4440 kind: crate::observer::SecurityViolationKind::DecryptionFailed,
4441 ..
4442 }
4443 )));
4444 }
4445
4446 #[test]
4447 fn test_strict_mode_builder_chain() {
4448 let secret = [0x42u8; 32];
4449 let config = HiveMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4450 .with_encryption(secret)
4451 .with_strict_encryption()
4452 .with_sync_interval(10_000)
4453 .with_peer_timeout(60_000);
4454
4455 let mesh = HiveMesh::new(config);
4456
4457 assert!(mesh.is_encryption_enabled());
4458 assert!(mesh.is_strict_encryption_enabled());
4459 }
4460
4461 fn create_relay_mesh(node_id: u32, callsign: &str) -> HiveMesh {
4464 let config = HiveMeshConfig::new(NodeId::new(node_id), callsign, "TEST").with_relay();
4465 HiveMesh::new(config)
4466 }
4467
4468 #[test]
4469 fn test_relay_disabled_by_default() {
4470 let mesh = create_mesh(0x11111111, "ALPHA-1");
4471 assert!(!mesh.is_relay_enabled());
4472 }
4473
4474 #[test]
4475 fn test_relay_enabled() {
4476 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4477 assert!(mesh.is_relay_enabled());
4478 }
4479
4480 #[test]
4481 fn test_relay_config_builder() {
4482 let config = HiveMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
4483 .with_relay()
4484 .with_max_relay_hops(5)
4485 .with_relay_fanout(3)
4486 .with_seen_cache_ttl(60_000);
4487
4488 assert!(config.enable_relay);
4489 assert_eq!(config.max_relay_hops, 5);
4490 assert_eq!(config.relay_fanout, 3);
4491 assert_eq!(config.seen_cache_ttl_ms, 60_000);
4492 }
4493
4494 #[test]
4495 fn test_seen_message_deduplication() {
4496 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4497 let origin = NodeId::new(0x22222222);
4498 let msg_id = crate::relay::MessageId::from_content(origin, 1000, 0xDEADBEEF);
4499
4500 assert!(mesh.mark_message_seen(msg_id, origin, 1000));
4502
4503 assert!(!mesh.mark_message_seen(msg_id, origin, 2000));
4505
4506 assert_eq!(mesh.seen_cache_size(), 1);
4507 }
4508
4509 #[test]
4510 fn test_wrap_for_relay() {
4511 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4512
4513 let payload = vec![1, 2, 3, 4, 5];
4514 let wrapped = mesh.wrap_for_relay(payload.clone());
4515
4516 assert_eq!(wrapped[0], crate::relay::RELAY_ENVELOPE_MARKER);
4518
4519 let envelope = crate::relay::RelayEnvelope::decode(&wrapped).unwrap();
4521 assert_eq!(envelope.payload, payload);
4522 assert_eq!(envelope.origin_node, NodeId::new(0x11111111));
4523 assert_eq!(envelope.hop_count, 0);
4524 }
4525
4526 #[test]
4527 fn test_process_relay_envelope_new_message() {
4528 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4529 let observer = Arc::new(CollectingObserver::new());
4530 mesh.add_observer(observer.clone());
4531
4532 let payload = vec![1, 2, 3, 4, 5];
4534 let envelope =
4535 crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload.clone())
4536 .with_max_hops(7);
4537 let data = envelope.encode();
4538
4539 let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
4541
4542 assert!(decision.is_some());
4543 let decision = decision.unwrap();
4544 assert_eq!(decision.payload, payload);
4545 assert_eq!(decision.origin_node.as_u32(), 0x22222222);
4546 assert_eq!(decision.hop_count, 0);
4547 assert!(decision.should_relay);
4548 assert!(decision.relay_envelope.is_some());
4549
4550 let relay_env = decision.relay_envelope.unwrap();
4552 assert_eq!(relay_env.hop_count, 1);
4553 }
4554
4555 #[test]
4556 fn test_process_relay_envelope_duplicate() {
4557 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4558 let observer = Arc::new(CollectingObserver::new());
4559 mesh.add_observer(observer.clone());
4560
4561 let payload = vec![1, 2, 3, 4, 5];
4562 let envelope = crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload);
4563 let data = envelope.encode();
4564
4565 let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
4567 assert!(decision.is_some());
4568
4569 let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 2000);
4571 assert!(decision.is_none());
4572
4573 let events = observer.events();
4575 assert!(events
4576 .iter()
4577 .any(|e| matches!(e, HiveEvent::DuplicateMessageDropped { .. })));
4578 }
4579
4580 #[test]
4581 fn test_process_relay_envelope_ttl_expired() {
4582 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4583 let observer = Arc::new(CollectingObserver::new());
4584 mesh.add_observer(observer.clone());
4585
4586 let payload = vec![1, 2, 3, 4, 5];
4588 let mut envelope =
4589 crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload.clone())
4590 .with_max_hops(3);
4591
4592 envelope = envelope.relay().unwrap(); envelope = envelope.relay().unwrap(); envelope = envelope.relay().unwrap(); let data = envelope.encode();
4598
4599 let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
4601
4602 assert!(decision.is_some());
4603 let decision = decision.unwrap();
4604 assert_eq!(decision.payload, payload);
4605 assert!(!decision.should_relay); assert!(decision.relay_envelope.is_none());
4607
4608 let events = observer.events();
4610 assert!(events
4611 .iter()
4612 .any(|e| matches!(e, HiveEvent::MessageTtlExpired { .. })));
4613 }
4614
4615 #[test]
4616 fn test_build_relay_document() {
4617 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4618
4619 let relay_doc = mesh.build_relay_document();
4620
4621 assert_eq!(relay_doc[0], crate::relay::RELAY_ENVELOPE_MARKER);
4623
4624 let envelope = crate::relay::RelayEnvelope::decode(&relay_doc).unwrap();
4626 assert_eq!(envelope.origin_node.as_u32(), 0x11111111);
4627
4628 let doc = crate::document::HiveDocument::decode(&envelope.payload);
4630 assert!(doc.is_some());
4631 }
4632
4633 #[test]
4634 fn test_relay_targets_excludes_source() {
4635 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4636
4637 mesh.on_ble_discovered(
4639 "peer-1",
4640 Some("HIVE_TEST-22222222"),
4641 -60,
4642 Some("TEST"),
4643 1000,
4644 );
4645 mesh.on_ble_connected("peer-1", 1000);
4646
4647 mesh.on_ble_discovered(
4648 "peer-2",
4649 Some("HIVE_TEST-33333333"),
4650 -65,
4651 Some("TEST"),
4652 1000,
4653 );
4654 mesh.on_ble_connected("peer-2", 1000);
4655
4656 mesh.on_ble_discovered(
4657 "peer-3",
4658 Some("HIVE_TEST-44444444"),
4659 -70,
4660 Some("TEST"),
4661 1000,
4662 );
4663 mesh.on_ble_connected("peer-3", 1000);
4664
4665 let targets = mesh.get_relay_targets(Some(NodeId::new(0x33333333)));
4667
4668 assert!(targets.iter().all(|p| p.node_id.as_u32() != 0x33333333));
4670 }
4671
4672 #[test]
4673 fn test_clear_seen_cache() {
4674 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
4675 let origin = NodeId::new(0x22222222);
4676
4677 mesh.mark_message_seen(
4679 crate::relay::MessageId::from_content(origin, 1000, 0x11111111),
4680 origin,
4681 1000,
4682 );
4683 mesh.mark_message_seen(
4684 crate::relay::MessageId::from_content(origin, 2000, 0x22222222),
4685 origin,
4686 2000,
4687 );
4688
4689 assert_eq!(mesh.seen_cache_size(), 2);
4690
4691 mesh.clear_seen_cache();
4693 assert_eq!(mesh.seen_cache_size(), 0);
4694 }
4695}