1#[cfg(not(feature = "std"))]
57use alloc::{string::String, sync::Arc, vec::Vec};
58#[cfg(feature = "std")]
59use std::sync::Arc;
60
61use crate::document::{ENCRYPTED_MARKER, KEY_EXCHANGE_MARKER, PEER_E2EE_MARKER};
62use crate::document_sync::DocumentSync;
63use crate::gossip::{GossipStrategy, RandomFanout};
64use crate::observer::{DisconnectReason, HiveEvent, HiveObserver, SecurityViolationKind};
65use crate::peer::{
66 ConnectionStateGraph, FullStateCountSummary, HivePeer, IndirectPeer, PeerConnectionState,
67 PeerDegree, PeerManagerConfig, StateCountSummary,
68};
69use crate::peer_manager::PeerManager;
70use crate::relay::{
71 MessageId, RelayEnvelope, SeenMessageCache, DEFAULT_MAX_HOPS, DEFAULT_SEEN_TTL_MS,
72 RELAY_ENVELOPE_MARKER,
73};
74use crate::security::{
75 KeyExchangeMessage, MeshEncryptionKey, PeerEncryptedMessage, PeerSessionManager, SessionState,
76};
77use crate::sync::crdt::{EventType, PeripheralType};
78use crate::sync::delta::{DeltaEncoder, DeltaStats};
79use crate::sync::delta_document::{DeltaDocument, Operation};
80use crate::NodeId;
81
82#[cfg(feature = "std")]
83use crate::observer::ObserverManager;
84
85#[derive(Debug, Clone)]
87pub struct HiveMeshConfig {
88 pub node_id: NodeId,
90
91 pub callsign: String,
93
94 pub mesh_id: String,
96
97 pub peripheral_type: PeripheralType,
99
100 pub peer_config: PeerManagerConfig,
102
103 pub sync_interval_ms: u64,
105
106 pub auto_broadcast_events: bool,
108
109 pub encryption_secret: Option<[u8; 32]>,
115
116 pub strict_encryption: bool,
124
125 pub enable_relay: bool,
132
133 pub max_relay_hops: u8,
138
139 pub relay_fanout: usize,
145
146 pub seen_cache_ttl_ms: u64,
151}
152
153impl HiveMeshConfig {
154 pub fn new(node_id: NodeId, callsign: &str, mesh_id: &str) -> Self {
156 Self {
157 node_id,
158 callsign: callsign.into(),
159 mesh_id: mesh_id.into(),
160 peripheral_type: PeripheralType::SoldierSensor,
161 peer_config: PeerManagerConfig::with_mesh_id(mesh_id),
162 sync_interval_ms: 5000,
163 auto_broadcast_events: true,
164 encryption_secret: None,
165 strict_encryption: false,
166 enable_relay: false,
167 max_relay_hops: DEFAULT_MAX_HOPS,
168 relay_fanout: 2,
169 seen_cache_ttl_ms: DEFAULT_SEEN_TTL_MS,
170 }
171 }
172
173 pub fn with_encryption(mut self, secret: [u8; 32]) -> Self {
178 self.encryption_secret = Some(secret);
179 self
180 }
181
182 pub fn with_peripheral_type(mut self, ptype: PeripheralType) -> Self {
184 self.peripheral_type = ptype;
185 self
186 }
187
188 pub fn with_sync_interval(mut self, interval_ms: u64) -> Self {
190 self.sync_interval_ms = interval_ms;
191 self
192 }
193
194 pub fn with_peer_timeout(mut self, timeout_ms: u64) -> Self {
196 self.peer_config.peer_timeout_ms = timeout_ms;
197 self
198 }
199
200 pub fn with_max_peers(mut self, max: usize) -> Self {
202 self.peer_config.max_peers = max;
203 self
204 }
205
206 pub fn with_strict_encryption(mut self) -> Self {
214 self.strict_encryption = true;
215 self
216 }
217
218 pub fn with_relay(mut self) -> Self {
223 self.enable_relay = true;
224 self
225 }
226
227 pub fn with_max_relay_hops(mut self, max_hops: u8) -> Self {
231 self.max_relay_hops = max_hops;
232 self
233 }
234
235 pub fn with_relay_fanout(mut self, fanout: usize) -> Self {
239 self.relay_fanout = fanout.max(1);
240 self
241 }
242
243 pub fn with_seen_cache_ttl(mut self, ttl_ms: u64) -> Self {
247 self.seen_cache_ttl_ms = ttl_ms;
248 self
249 }
250}
251
252#[cfg(feature = "std")]
257pub struct HiveMesh {
258 config: HiveMeshConfig,
260
261 peer_manager: PeerManager,
263
264 document_sync: DocumentSync,
266
267 observers: ObserverManager,
269
270 last_sync_ms: std::sync::atomic::AtomicU32,
272
273 last_cleanup_ms: std::sync::atomic::AtomicU32,
275
276 encryption_key: Option<MeshEncryptionKey>,
278
279 peer_sessions: std::sync::Mutex<Option<PeerSessionManager>>,
281
282 connection_graph: std::sync::Mutex<ConnectionStateGraph>,
284
285 seen_cache: std::sync::Mutex<SeenMessageCache>,
287
288 gossip_strategy: Box<dyn GossipStrategy>,
290
291 delta_encoder: std::sync::Mutex<DeltaEncoder>,
296}
297
298#[cfg(feature = "std")]
299impl HiveMesh {
300 pub fn new(config: HiveMeshConfig) -> Self {
302 let peer_manager = PeerManager::new(config.node_id, config.peer_config.clone());
303 let document_sync = DocumentSync::with_peripheral_type(
304 config.node_id,
305 &config.callsign,
306 config.peripheral_type,
307 );
308
309 let encryption_key = config
311 .encryption_secret
312 .map(|secret| MeshEncryptionKey::from_shared_secret(&config.mesh_id, &secret));
313
314 let connection_graph = ConnectionStateGraph::with_config(
316 config.peer_config.rssi_degraded_threshold,
317 config.peer_config.lost_timeout_ms,
318 );
319
320 let seen_cache = SeenMessageCache::with_ttl(config.seen_cache_ttl_ms);
322
323 let gossip_strategy: Box<dyn GossipStrategy> =
325 Box::new(RandomFanout::new(config.relay_fanout));
326
327 let delta_encoder = DeltaEncoder::new(config.node_id);
329
330 Self {
331 config,
332 peer_manager,
333 document_sync,
334 observers: ObserverManager::new(),
335 last_sync_ms: std::sync::atomic::AtomicU32::new(0),
336 last_cleanup_ms: std::sync::atomic::AtomicU32::new(0),
337 encryption_key,
338 peer_sessions: std::sync::Mutex::new(None),
339 connection_graph: std::sync::Mutex::new(connection_graph),
340 seen_cache: std::sync::Mutex::new(seen_cache),
341 gossip_strategy,
342 delta_encoder: std::sync::Mutex::new(delta_encoder),
343 }
344 }
345
346 pub fn is_encryption_enabled(&self) -> bool {
350 self.encryption_key.is_some()
351 }
352
353 pub fn is_strict_encryption_enabled(&self) -> bool {
357 self.config.strict_encryption && self.encryption_key.is_some()
358 }
359
360 pub fn enable_encryption(&mut self, secret: &[u8; 32]) {
365 self.encryption_key = Some(MeshEncryptionKey::from_shared_secret(
366 &self.config.mesh_id,
367 secret,
368 ));
369 }
370
371 pub fn disable_encryption(&mut self) {
373 self.encryption_key = None;
374 }
375
376 fn encrypt_document(&self, plaintext: &[u8]) -> Vec<u8> {
381 match &self.encryption_key {
382 Some(key) => {
383 match key.encrypt_to_bytes(plaintext) {
385 Ok(ciphertext) => {
386 let mut buf = Vec::with_capacity(2 + ciphertext.len());
387 buf.push(ENCRYPTED_MARKER);
388 buf.push(0x00); buf.extend_from_slice(&ciphertext);
390 buf
391 }
392 Err(e) => {
393 log::error!("Encryption failed: {}", e);
394 plaintext.to_vec()
396 }
397 }
398 }
399 None => plaintext.to_vec(),
400 }
401 }
402
403 fn decrypt_document<'a>(
411 &self,
412 data: &'a [u8],
413 source_hint: Option<&str>,
414 ) -> Option<std::borrow::Cow<'a, [u8]>> {
415 if data.len() >= 2 && data[0] == ENCRYPTED_MARKER {
417 let _reserved = data[1];
419 let encrypted_payload = &data[2..];
420
421 match &self.encryption_key {
422 Some(key) => match key.decrypt_from_bytes(encrypted_payload) {
423 Ok(plaintext) => Some(std::borrow::Cow::Owned(plaintext)),
424 Err(e) => {
425 log::warn!("Decryption failed (wrong key or corrupted): {}", e);
426 self.notify(HiveEvent::SecurityViolation {
427 kind: SecurityViolationKind::DecryptionFailed,
428 source: source_hint.map(String::from),
429 });
430 None
431 }
432 },
433 None => {
434 log::warn!("Received encrypted document but encryption not enabled");
435 None
436 }
437 }
438 } else {
439 if self.config.strict_encryption && self.encryption_key.is_some() {
442 log::warn!(
443 "Rejected unencrypted document in strict encryption mode (source: {:?})",
444 source_hint
445 );
446 self.notify(HiveEvent::SecurityViolation {
447 kind: SecurityViolationKind::UnencryptedInStrictMode,
448 source: source_hint.map(String::from),
449 });
450 None
451 } else {
452 Some(std::borrow::Cow::Borrowed(data))
454 }
455 }
456 }
457
458 pub fn is_relay_enabled(&self) -> bool {
462 self.config.enable_relay
463 }
464
465 pub fn enable_relay(&mut self) {
467 self.config.enable_relay = true;
468 }
469
470 pub fn disable_relay(&mut self) {
472 self.config.enable_relay = false;
473 }
474
475 pub fn has_seen_message(&self, message_id: &MessageId) -> bool {
479 self.seen_cache.lock().unwrap().has_seen(message_id)
480 }
481
482 pub fn mark_message_seen(&self, message_id: MessageId, origin: NodeId, now_ms: u64) -> bool {
486 self.seen_cache
487 .lock()
488 .unwrap()
489 .check_and_mark(message_id, origin, now_ms)
490 }
491
492 pub fn seen_cache_size(&self) -> usize {
494 self.seen_cache.lock().unwrap().len()
495 }
496
497 pub fn clear_seen_cache(&self) {
499 self.seen_cache.lock().unwrap().clear();
500 }
501
502 pub fn wrap_for_relay(&self, payload: Vec<u8>) -> Vec<u8> {
507 let envelope = RelayEnvelope::broadcast(self.config.node_id, payload)
508 .with_max_hops(self.config.max_relay_hops);
509 envelope.encode()
510 }
511
512 pub fn get_relay_targets(&self, exclude_peer: Option<NodeId>) -> Vec<HivePeer> {
517 let connected = self.peer_manager.get_connected_peers();
518 let filtered: Vec<_> = if let Some(exclude) = exclude_peer {
519 connected
520 .into_iter()
521 .filter(|p| p.node_id != exclude)
522 .collect()
523 } else {
524 connected
525 };
526
527 self.gossip_strategy
528 .select_peers(&filtered)
529 .into_iter()
530 .cloned()
531 .collect()
532 }
533
534 pub fn process_relay_envelope(
544 &self,
545 data: &[u8],
546 source_peer: NodeId,
547 now_ms: u64,
548 ) -> Option<RelayDecision> {
549 let envelope = RelayEnvelope::decode(data)?;
551
552 if envelope.origin_node != source_peer && envelope.origin_node != self.node_id() {
555 let is_new = self.connection_graph.lock().unwrap().on_relay_received(
556 source_peer,
557 envelope.origin_node,
558 envelope.hop_count,
559 now_ms,
560 );
561
562 if is_new {
563 log::debug!(
564 "Discovered indirect peer {:08X} via {:08X} ({} hops)",
565 envelope.origin_node.as_u32(),
566 source_peer.as_u32(),
567 envelope.hop_count
568 );
569 }
570 }
571
572 if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
574 let stats = self
576 .seen_cache
577 .lock()
578 .unwrap()
579 .get_stats(&envelope.message_id);
580 let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
581
582 self.notify(HiveEvent::DuplicateMessageDropped {
583 origin_node: envelope.origin_node,
584 seen_count,
585 });
586
587 log::debug!(
588 "Dropped duplicate message {} from {:08X} (seen {} times)",
589 envelope.message_id,
590 envelope.origin_node.as_u32(),
591 seen_count
592 );
593 return None;
594 }
595
596 if !envelope.can_relay() {
598 self.notify(HiveEvent::MessageTtlExpired {
599 origin_node: envelope.origin_node,
600 hop_count: envelope.hop_count,
601 });
602
603 log::debug!(
604 "Message {} from {:08X} TTL expired at hop {}",
605 envelope.message_id,
606 envelope.origin_node.as_u32(),
607 envelope.hop_count
608 );
609
610 return Some(RelayDecision {
612 payload: envelope.payload,
613 origin_node: envelope.origin_node,
614 hop_count: envelope.hop_count,
615 should_relay: false,
616 relay_envelope: None,
617 });
618 }
619
620 let should_relay = self.config.enable_relay;
622 let relay_envelope = if should_relay {
623 envelope.relay() } else {
625 None
626 };
627
628 Some(RelayDecision {
629 payload: envelope.payload,
630 origin_node: envelope.origin_node,
631 hop_count: envelope.hop_count,
632 should_relay,
633 relay_envelope,
634 })
635 }
636
637 pub fn build_relay_document(&self) -> Vec<u8> {
642 let doc = self.build_document(); self.wrap_for_relay(doc)
644 }
645
646 pub fn register_peer_for_delta(&self, peer_id: &NodeId) {
653 let mut encoder = self.delta_encoder.lock().unwrap();
654 encoder.add_peer(peer_id);
655 log::debug!(
656 "Registered peer {:08X} for delta sync tracking",
657 peer_id.as_u32()
658 );
659 }
660
661 pub fn unregister_peer_for_delta(&self, peer_id: &NodeId) {
665 let mut encoder = self.delta_encoder.lock().unwrap();
666 encoder.remove_peer(peer_id);
667 log::debug!(
668 "Unregistered peer {:08X} from delta sync tracking",
669 peer_id.as_u32()
670 );
671 }
672
673 pub fn reset_peer_delta_state(&self, peer_id: &NodeId) {
678 let mut encoder = self.delta_encoder.lock().unwrap();
679 encoder.reset_peer(peer_id);
680 log::debug!("Reset delta sync state for peer {:08X}", peer_id.as_u32());
681 }
682
683 pub fn record_delta_sent(&self, peer_id: &NodeId, bytes: usize) {
685 let mut encoder = self.delta_encoder.lock().unwrap();
686 encoder.record_sent(peer_id, bytes);
687 }
688
689 pub fn record_delta_received(&self, peer_id: &NodeId, bytes: usize, timestamp: u64) {
691 let mut encoder = self.delta_encoder.lock().unwrap();
692 encoder.record_received(peer_id, bytes, timestamp);
693 }
694
695 pub fn delta_stats(&self) -> DeltaStats {
700 self.delta_encoder.lock().unwrap().stats()
701 }
702
703 pub fn peer_delta_stats(&self, peer_id: &NodeId) -> Option<(u64, u64, u32)> {
707 let encoder = self.delta_encoder.lock().unwrap();
708 encoder
709 .get_peer_state(peer_id)
710 .map(|state| (state.bytes_sent, state.bytes_received, state.sync_count))
711 }
712
713 pub fn build_delta_document_for_peer(&self, peer_id: &NodeId, now_ms: u64) -> Option<Vec<u8>> {
721 let mut all_operations: Vec<Operation> = Vec::new();
723
724 for (node_id_u32, count) in self.document_sync.counter_entries() {
727 all_operations.push(Operation::IncrementCounter {
728 counter_id: 0, node_id: NodeId::new(node_id_u32),
730 amount: count,
731 timestamp: count, });
733 }
734
735 let peripheral = self.document_sync.peripheral_snapshot();
738 let peripheral_timestamp = peripheral
739 .last_event
740 .as_ref()
741 .map(|e| e.timestamp)
742 .unwrap_or(1); all_operations.push(Operation::UpdatePeripheral {
744 peripheral,
745 timestamp: peripheral_timestamp,
746 });
747
748 if let Some(emergency) = self.document_sync.emergency_snapshot() {
750 let source_node = NodeId::new(emergency.source_node());
751 let timestamp = emergency.timestamp();
752
753 all_operations.push(Operation::SetEmergency {
755 source_node,
756 timestamp,
757 known_peers: emergency.all_nodes(),
758 });
759
760 for acked_node in emergency.acked_nodes() {
762 all_operations.push(Operation::AckEmergency {
763 node_id: NodeId::new(acked_node),
764 emergency_timestamp: timestamp,
765 });
766 }
767 }
768
769 let filtered_operations: Vec<Operation> = {
771 let encoder = self.delta_encoder.lock().unwrap();
772 if let Some(peer_state) = encoder.get_peer_state(peer_id) {
773 all_operations
774 .into_iter()
775 .filter(|op| peer_state.needs_send(&op.key(), op.timestamp()))
776 .collect()
777 } else {
778 all_operations
780 }
781 };
782
783 if filtered_operations.is_empty() {
785 return None;
786 }
787
788 {
790 let mut encoder = self.delta_encoder.lock().unwrap();
791 if let Some(peer_state) = encoder.get_peer_state_mut(peer_id) {
792 for op in &filtered_operations {
793 peer_state.mark_sent(&op.key(), op.timestamp());
794 }
795 }
796 }
797
798 let mut delta = DeltaDocument::new(self.config.node_id, now_ms);
800 for op in filtered_operations {
801 delta.add_operation(op);
802 }
803
804 let encoded = delta.encode();
806 let result = self.encrypt_document(&encoded);
807
808 {
810 let mut encoder = self.delta_encoder.lock().unwrap();
811 encoder.record_sent(peer_id, result.len());
812 }
813
814 Some(result)
815 }
816
817 pub fn build_full_delta_document(&self, now_ms: u64) -> Vec<u8> {
822 let mut delta = DeltaDocument::new(self.config.node_id, now_ms);
823
824 for (node_id_u32, count) in self.document_sync.counter_entries() {
826 delta.add_operation(Operation::IncrementCounter {
827 counter_id: 0,
828 node_id: NodeId::new(node_id_u32),
829 amount: count,
830 timestamp: now_ms,
831 });
832 }
833
834 let peripheral = self.document_sync.peripheral_snapshot();
836 let peripheral_timestamp = peripheral
837 .last_event
838 .as_ref()
839 .map(|e| e.timestamp)
840 .unwrap_or(now_ms);
841 delta.add_operation(Operation::UpdatePeripheral {
842 peripheral,
843 timestamp: peripheral_timestamp,
844 });
845
846 if let Some(emergency) = self.document_sync.emergency_snapshot() {
848 let source_node = NodeId::new(emergency.source_node());
849 let timestamp = emergency.timestamp();
850
851 delta.add_operation(Operation::SetEmergency {
852 source_node,
853 timestamp,
854 known_peers: emergency.all_nodes(),
855 });
856
857 for acked_node in emergency.acked_nodes() {
858 delta.add_operation(Operation::AckEmergency {
859 node_id: NodeId::new(acked_node),
860 emergency_timestamp: timestamp,
861 });
862 }
863 }
864
865 let encoded = delta.encode();
866 self.encrypt_document(&encoded)
867 }
868
869 fn process_delta_document_internal(
873 &self,
874 source_node: NodeId,
875 data: &[u8],
876 now_ms: u64,
877 relay_data: Option<Vec<u8>>,
878 origin_node: Option<NodeId>,
879 hop_count: u8,
880 ) -> Option<DataReceivedResult> {
881 let delta = DeltaDocument::decode(data)?;
883
884 if delta.origin_node == self.config.node_id {
886 return None;
887 }
888
889 let mut counter_changed = false;
891 let mut emergency_changed = false;
892 let mut is_emergency = false;
893 let mut is_ack = false;
894 let mut event_timestamp = 0u64;
895
896 for op in &delta.operations {
897 match op {
898 Operation::IncrementCounter {
899 node_id, amount, ..
900 } => {
901 let current = self.document_sync.counter_entries();
903 let current_value = current
904 .iter()
905 .find(|(id, _)| *id == node_id.as_u32())
906 .map(|(_, v)| *v)
907 .unwrap_or(0);
908
909 if *amount > current_value {
910 counter_changed = true;
913 }
914 }
915 Operation::UpdatePeripheral { timestamp, .. } => {
916 if *timestamp > event_timestamp {
918 event_timestamp = *timestamp;
919 }
920 }
921 Operation::SetEmergency { timestamp, .. } => {
922 is_emergency = true;
923 emergency_changed = true;
924 event_timestamp = *timestamp;
925 }
926 Operation::AckEmergency {
927 emergency_timestamp,
928 ..
929 } => {
930 is_ack = true;
931 emergency_changed = true;
932 if *emergency_timestamp > event_timestamp {
933 event_timestamp = *emergency_timestamp;
934 }
935 }
936 Operation::ClearEmergency {
937 emergency_timestamp,
938 } => {
939 emergency_changed = true;
940 if *emergency_timestamp > event_timestamp {
941 event_timestamp = *emergency_timestamp;
942 }
943 }
944 }
945 }
946
947 self.peer_manager.record_sync(source_node, now_ms);
949
950 {
952 let mut encoder = self.delta_encoder.lock().unwrap();
953 encoder.record_received(&source_node, data.len(), now_ms);
954 }
955
956 if is_emergency {
958 self.notify(HiveEvent::EmergencyReceived {
959 from_node: delta.origin_node,
960 });
961 } else if is_ack {
962 self.notify(HiveEvent::AckReceived {
963 from_node: delta.origin_node,
964 });
965 }
966
967 if counter_changed {
968 let total_count = self.document_sync.total_count();
969 self.notify(HiveEvent::DocumentSynced {
970 from_node: delta.origin_node,
971 total_count,
972 });
973 }
974
975 if relay_data.is_some() {
977 let relay_targets = self.get_relay_targets(Some(source_node));
978 self.notify(HiveEvent::MessageRelayed {
979 origin_node: origin_node.unwrap_or(delta.origin_node),
980 relay_count: relay_targets.len(),
981 hop_count,
982 });
983 }
984
985 Some(DataReceivedResult {
986 source_node: delta.origin_node,
987 is_emergency,
988 is_ack,
989 counter_changed,
990 emergency_changed,
991 total_count: self.document_sync.total_count(),
992 event_timestamp,
993 relay_data,
994 origin_node,
995 hop_count,
996 })
997 }
998
999 pub fn enable_peer_e2ee(&self) {
1007 let mut sessions = self.peer_sessions.lock().unwrap();
1008 if sessions.is_none() {
1009 *sessions = Some(PeerSessionManager::new(self.config.node_id));
1010 log::info!(
1011 "Per-peer E2EE enabled for node {:08X}",
1012 self.config.node_id.as_u32()
1013 );
1014 }
1015 }
1016
1017 pub fn disable_peer_e2ee(&self) {
1021 let mut sessions = self.peer_sessions.lock().unwrap();
1022 *sessions = None;
1023 log::info!("Per-peer E2EE disabled");
1024 }
1025
1026 pub fn is_peer_e2ee_enabled(&self) -> bool {
1028 self.peer_sessions.lock().unwrap().is_some()
1029 }
1030
1031 pub fn peer_e2ee_public_key(&self) -> Option<[u8; 32]> {
1035 self.peer_sessions
1036 .lock()
1037 .unwrap()
1038 .as_ref()
1039 .map(|s| s.our_public_key())
1040 }
1041
1042 pub fn initiate_peer_e2ee(&self, peer_node_id: NodeId, now_ms: u64) -> Option<Vec<u8>> {
1048 let mut sessions = self.peer_sessions.lock().unwrap();
1049 let session_mgr = sessions.as_mut()?;
1050
1051 let key_exchange = session_mgr.initiate_session(peer_node_id, now_ms);
1052 let mut buf = Vec::with_capacity(2 + 37);
1053 buf.push(KEY_EXCHANGE_MARKER);
1054 buf.push(0x00); buf.extend_from_slice(&key_exchange.encode());
1056
1057 log::info!(
1058 "Initiated E2EE session with peer {:08X}",
1059 peer_node_id.as_u32()
1060 );
1061 Some(buf)
1062 }
1063
1064 pub fn has_peer_e2ee_session(&self, peer_node_id: NodeId) -> bool {
1066 self.peer_sessions
1067 .lock()
1068 .unwrap()
1069 .as_ref()
1070 .is_some_and(|s| s.has_session(peer_node_id))
1071 }
1072
1073 pub fn peer_e2ee_session_state(&self, peer_node_id: NodeId) -> Option<SessionState> {
1075 self.peer_sessions
1076 .lock()
1077 .unwrap()
1078 .as_ref()
1079 .and_then(|s| s.session_state(peer_node_id))
1080 }
1081
1082 pub fn send_peer_e2ee(
1087 &self,
1088 peer_node_id: NodeId,
1089 plaintext: &[u8],
1090 now_ms: u64,
1091 ) -> Option<Vec<u8>> {
1092 let mut sessions = self.peer_sessions.lock().unwrap();
1093 let session_mgr = sessions.as_mut()?;
1094
1095 match session_mgr.encrypt_for_peer(peer_node_id, plaintext, now_ms) {
1096 Ok(encrypted) => {
1097 let mut buf = Vec::with_capacity(2 + encrypted.encode().len());
1098 buf.push(PEER_E2EE_MARKER);
1099 buf.push(0x00); buf.extend_from_slice(&encrypted.encode());
1101 Some(buf)
1102 }
1103 Err(e) => {
1104 log::warn!(
1105 "Failed to encrypt for peer {:08X}: {:?}",
1106 peer_node_id.as_u32(),
1107 e
1108 );
1109 None
1110 }
1111 }
1112 }
1113
1114 pub fn close_peer_e2ee(&self, peer_node_id: NodeId) {
1116 let mut sessions = self.peer_sessions.lock().unwrap();
1117 if let Some(session_mgr) = sessions.as_mut() {
1118 session_mgr.close_session(peer_node_id);
1119 self.notify(HiveEvent::PeerE2eeClosed { peer_node_id });
1120 log::info!(
1121 "Closed E2EE session with peer {:08X}",
1122 peer_node_id.as_u32()
1123 );
1124 }
1125 }
1126
1127 pub fn peer_e2ee_session_count(&self) -> usize {
1129 self.peer_sessions
1130 .lock()
1131 .unwrap()
1132 .as_ref()
1133 .map(|s| s.session_count())
1134 .unwrap_or(0)
1135 }
1136
1137 pub fn peer_e2ee_established_count(&self) -> usize {
1139 self.peer_sessions
1140 .lock()
1141 .unwrap()
1142 .as_ref()
1143 .map(|s| s.established_count())
1144 .unwrap_or(0)
1145 }
1146
1147 fn handle_key_exchange(&self, data: &[u8], now_ms: u64) -> Option<Vec<u8>> {
1152 if data.len() < 2 || data[0] != KEY_EXCHANGE_MARKER {
1153 return None;
1154 }
1155
1156 let payload = &data[2..];
1157 let msg = KeyExchangeMessage::decode(payload)?;
1158
1159 let mut sessions = self.peer_sessions.lock().unwrap();
1160 let session_mgr = sessions.as_mut()?;
1161
1162 let (response, established) = session_mgr.handle_key_exchange(&msg, now_ms)?;
1163
1164 if established {
1165 self.notify(HiveEvent::PeerE2eeEstablished {
1166 peer_node_id: msg.sender_node_id,
1167 });
1168 log::info!(
1169 "E2EE session established with peer {:08X}",
1170 msg.sender_node_id.as_u32()
1171 );
1172 }
1173
1174 let mut buf = Vec::with_capacity(2 + 37);
1176 buf.push(KEY_EXCHANGE_MARKER);
1177 buf.push(0x00);
1178 buf.extend_from_slice(&response.encode());
1179 Some(buf)
1180 }
1181
1182 fn handle_peer_e2ee_message(&self, data: &[u8], now_ms: u64) -> Option<Vec<u8>> {
1187 if data.len() < 2 || data[0] != PEER_E2EE_MARKER {
1188 return None;
1189 }
1190
1191 let payload = &data[2..];
1192 let msg = PeerEncryptedMessage::decode(payload)?;
1193
1194 let mut sessions = self.peer_sessions.lock().unwrap();
1195 let session_mgr = sessions.as_mut()?;
1196
1197 match session_mgr.decrypt_from_peer(&msg, now_ms) {
1198 Ok(plaintext) => {
1199 self.notify(HiveEvent::PeerE2eeMessageReceived {
1201 from_node: msg.sender_node_id,
1202 data: plaintext.clone(),
1203 });
1204 Some(plaintext)
1205 }
1206 Err(e) => {
1207 log::warn!(
1208 "Failed to decrypt E2EE message from {:08X}: {:?}",
1209 msg.sender_node_id.as_u32(),
1210 e
1211 );
1212 None
1213 }
1214 }
1215 }
1216
1217 pub fn node_id(&self) -> NodeId {
1221 self.config.node_id
1222 }
1223
1224 pub fn callsign(&self) -> &str {
1226 &self.config.callsign
1227 }
1228
1229 pub fn mesh_id(&self) -> &str {
1231 &self.config.mesh_id
1232 }
1233
1234 pub fn device_name(&self) -> String {
1236 format!(
1237 "HIVE_{}-{:08X}",
1238 self.config.mesh_id,
1239 self.config.node_id.as_u32()
1240 )
1241 }
1242
1243 pub fn add_observer(&self, observer: Arc<dyn HiveObserver>) {
1247 self.observers.add(observer);
1248 }
1249
1250 pub fn remove_observer(&self, observer: &Arc<dyn HiveObserver>) {
1252 self.observers.remove(observer);
1253 }
1254
1255 pub fn send_emergency(&self, timestamp: u64) -> Vec<u8> {
1262 let data = self.document_sync.send_emergency(timestamp);
1263 self.notify(HiveEvent::MeshStateChanged {
1264 peer_count: self.peer_manager.peer_count(),
1265 connected_count: self.peer_manager.connected_count(),
1266 });
1267 self.encrypt_document(&data)
1268 }
1269
1270 pub fn send_ack(&self, timestamp: u64) -> Vec<u8> {
1275 let data = self.document_sync.send_ack(timestamp);
1276 self.notify(HiveEvent::MeshStateChanged {
1277 peer_count: self.peer_manager.peer_count(),
1278 connected_count: self.peer_manager.connected_count(),
1279 });
1280 self.encrypt_document(&data)
1281 }
1282
1283 pub fn clear_event(&self) {
1285 self.document_sync.clear_event();
1286 }
1287
1288 pub fn is_emergency_active(&self) -> bool {
1290 self.document_sync.is_emergency_active()
1291 }
1292
1293 pub fn is_ack_active(&self) -> bool {
1295 self.document_sync.is_ack_active()
1296 }
1297
1298 pub fn current_event(&self) -> Option<EventType> {
1300 self.document_sync.current_event()
1301 }
1302
1303 pub fn start_emergency(&self, timestamp: u64, known_peers: &[u32]) -> Vec<u8> {
1312 let data = self.document_sync.start_emergency(timestamp, known_peers);
1313 self.notify(HiveEvent::MeshStateChanged {
1314 peer_count: self.peer_manager.peer_count(),
1315 connected_count: self.peer_manager.connected_count(),
1316 });
1317 self.encrypt_document(&data)
1318 }
1319
1320 pub fn start_emergency_with_known_peers(&self, timestamp: u64) -> Vec<u8> {
1324 let peers: Vec<u32> = self
1325 .peer_manager
1326 .get_peers()
1327 .iter()
1328 .map(|p| p.node_id.as_u32())
1329 .collect();
1330 self.start_emergency(timestamp, &peers)
1331 }
1332
1333 pub fn ack_emergency(&self, timestamp: u64) -> Option<Vec<u8>> {
1338 let result = self.document_sync.ack_emergency(timestamp);
1339 if result.is_some() {
1340 self.notify(HiveEvent::MeshStateChanged {
1341 peer_count: self.peer_manager.peer_count(),
1342 connected_count: self.peer_manager.connected_count(),
1343 });
1344 }
1345 result.map(|data| self.encrypt_document(&data))
1346 }
1347
1348 pub fn clear_emergency(&self) {
1350 self.document_sync.clear_emergency();
1351 }
1352
1353 pub fn has_active_emergency(&self) -> bool {
1355 self.document_sync.has_active_emergency()
1356 }
1357
1358 pub fn get_emergency_status(&self) -> Option<(u32, u64, usize, usize)> {
1362 self.document_sync.get_emergency_status()
1363 }
1364
1365 pub fn has_peer_acked(&self, peer_id: u32) -> bool {
1367 self.document_sync.has_peer_acked(peer_id)
1368 }
1369
1370 pub fn all_peers_acked(&self) -> bool {
1372 self.document_sync.all_peers_acked()
1373 }
1374
1375 pub fn send_chat(&self, sender: &str, text: &str, timestamp: u64) -> Option<Vec<u8>> {
1385 if self.document_sync.add_chat_message(sender, text, timestamp) {
1386 Some(self.encrypt_document(&self.build_document()))
1387 } else {
1388 None
1389 }
1390 }
1391
1392 pub fn send_chat_reply(
1400 &self,
1401 sender: &str,
1402 text: &str,
1403 reply_to_node: u32,
1404 reply_to_timestamp: u64,
1405 timestamp: u64,
1406 ) -> Option<Vec<u8>> {
1407 if self.document_sync.add_chat_reply(
1408 sender,
1409 text,
1410 reply_to_node,
1411 reply_to_timestamp,
1412 timestamp,
1413 ) {
1414 Some(self.encrypt_document(&self.build_document()))
1415 } else {
1416 None
1417 }
1418 }
1419
1420 pub fn chat_count(&self) -> usize {
1422 self.document_sync.chat_count()
1423 }
1424
1425 pub fn chat_messages_since(
1429 &self,
1430 since_timestamp: u64,
1431 ) -> Vec<(u32, u64, String, String, u32, u64)> {
1432 self.document_sync.chat_messages_since(since_timestamp)
1433 }
1434
1435 pub fn all_chat_messages(&self) -> Vec<(u32, u64, String, String, u32, u64)> {
1439 self.document_sync.all_chat_messages()
1440 }
1441
1442 pub fn on_ble_discovered(
1448 &self,
1449 identifier: &str,
1450 name: Option<&str>,
1451 rssi: i8,
1452 mesh_id: Option<&str>,
1453 now_ms: u64,
1454 ) -> Option<HivePeer> {
1455 let (node_id, is_new) = self
1456 .peer_manager
1457 .on_discovered(identifier, name, rssi, mesh_id, now_ms)?;
1458
1459 let peer = self.peer_manager.get_peer(node_id)?;
1460
1461 {
1463 let mut graph = self.connection_graph.lock().unwrap();
1464 graph.on_discovered(
1465 node_id,
1466 identifier.to_string(),
1467 name.map(|s| s.to_string()),
1468 mesh_id.map(|s| s.to_string()),
1469 rssi,
1470 now_ms,
1471 );
1472 }
1473
1474 if is_new {
1475 self.notify(HiveEvent::PeerDiscovered { peer: peer.clone() });
1476 self.notify_mesh_state_changed();
1477 }
1478
1479 Some(peer)
1480 }
1481
1482 pub fn on_ble_connected(&self, identifier: &str, now_ms: u64) -> Option<NodeId> {
1486 let node_id = self.peer_manager.on_connected(identifier, now_ms)?;
1487
1488 {
1490 let mut graph = self.connection_graph.lock().unwrap();
1491 graph.on_connected(node_id, now_ms);
1492 }
1493
1494 self.notify(HiveEvent::PeerConnected { node_id });
1495 self.notify_mesh_state_changed();
1496 Some(node_id)
1497 }
1498
1499 pub fn on_ble_disconnected(
1501 &self,
1502 identifier: &str,
1503 reason: DisconnectReason,
1504 ) -> Option<NodeId> {
1505 let (node_id, observer_reason) = self.peer_manager.on_disconnected(identifier, reason)?;
1506
1507 {
1509 let mut graph = self.connection_graph.lock().unwrap();
1510 let platform_reason = match observer_reason {
1511 DisconnectReason::LocalRequest => crate::platform::DisconnectReason::LocalRequest,
1512 DisconnectReason::RemoteRequest => crate::platform::DisconnectReason::RemoteRequest,
1513 DisconnectReason::Timeout => crate::platform::DisconnectReason::Timeout,
1514 DisconnectReason::LinkLoss => crate::platform::DisconnectReason::LinkLoss,
1515 DisconnectReason::ConnectionFailed => {
1516 crate::platform::DisconnectReason::ConnectionFailed
1517 }
1518 DisconnectReason::Unknown => crate::platform::DisconnectReason::Unknown,
1519 };
1520 let now_ms = std::time::SystemTime::now()
1521 .duration_since(std::time::UNIX_EPOCH)
1522 .map(|d| d.as_millis() as u64)
1523 .unwrap_or(0);
1524 graph.on_disconnected(node_id, platform_reason, now_ms);
1525 }
1526
1527 self.notify(HiveEvent::PeerDisconnected {
1528 node_id,
1529 reason: observer_reason,
1530 });
1531 self.notify_mesh_state_changed();
1532 Some(node_id)
1533 }
1534
1535 pub fn on_peer_disconnected(&self, node_id: NodeId, reason: DisconnectReason) {
1539 if self
1540 .peer_manager
1541 .on_disconnected_by_node_id(node_id, reason)
1542 {
1543 {
1545 let mut graph = self.connection_graph.lock().unwrap();
1546 let platform_reason = match reason {
1547 DisconnectReason::LocalRequest => {
1548 crate::platform::DisconnectReason::LocalRequest
1549 }
1550 DisconnectReason::RemoteRequest => {
1551 crate::platform::DisconnectReason::RemoteRequest
1552 }
1553 DisconnectReason::Timeout => crate::platform::DisconnectReason::Timeout,
1554 DisconnectReason::LinkLoss => crate::platform::DisconnectReason::LinkLoss,
1555 DisconnectReason::ConnectionFailed => {
1556 crate::platform::DisconnectReason::ConnectionFailed
1557 }
1558 DisconnectReason::Unknown => crate::platform::DisconnectReason::Unknown,
1559 };
1560 let now_ms = std::time::SystemTime::now()
1561 .duration_since(std::time::UNIX_EPOCH)
1562 .map(|d| d.as_millis() as u64)
1563 .unwrap_or(0);
1564 graph.on_disconnected(node_id, platform_reason, now_ms);
1565 }
1566
1567 self.notify(HiveEvent::PeerDisconnected { node_id, reason });
1568 self.notify_mesh_state_changed();
1569 }
1570 }
1571
1572 pub fn on_incoming_connection(&self, identifier: &str, node_id: NodeId, now_ms: u64) -> bool {
1576 let is_new = self
1577 .peer_manager
1578 .on_incoming_connection(identifier, node_id, now_ms);
1579
1580 {
1582 let mut graph = self.connection_graph.lock().unwrap();
1583 if is_new {
1584 graph.on_discovered(
1585 node_id,
1586 identifier.to_string(),
1587 None,
1588 Some(self.config.mesh_id.clone()),
1589 -50, now_ms,
1591 );
1592 }
1593 graph.on_connected(node_id, now_ms);
1594 }
1595
1596 if is_new {
1597 if let Some(peer) = self.peer_manager.get_peer(node_id) {
1598 self.notify(HiveEvent::PeerDiscovered { peer });
1599 }
1600 }
1601
1602 self.notify(HiveEvent::PeerConnected { node_id });
1603 self.notify_mesh_state_changed();
1604
1605 is_new
1606 }
1607
1608 pub fn on_ble_data_received(
1615 &self,
1616 identifier: &str,
1617 data: &[u8],
1618 now_ms: u64,
1619 ) -> Option<DataReceivedResult> {
1620 let node_id = self.peer_manager.get_node_id(identifier)?;
1622
1623 if data.len() >= 2 {
1625 match data[0] {
1626 KEY_EXCHANGE_MARKER => {
1627 let _response = self.handle_key_exchange(data, now_ms);
1629 return None;
1631 }
1632 PEER_E2EE_MARKER => {
1633 let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
1635 return None;
1637 }
1638 RELAY_ENVELOPE_MARKER => {
1639 return self
1641 .handle_relay_envelope_with_identifier(node_id, identifier, data, now_ms);
1642 }
1643 _ => {}
1644 }
1645 }
1646
1647 self.process_document_data_with_identifier(node_id, identifier, data, now_ms, None, None, 0)
1649 }
1650
1651 #[allow(clippy::too_many_arguments)]
1653 fn process_document_data_with_identifier(
1654 &self,
1655 source_node: NodeId,
1656 identifier: &str,
1657 data: &[u8],
1658 now_ms: u64,
1659 relay_data: Option<Vec<u8>>,
1660 origin_node: Option<NodeId>,
1661 hop_count: u8,
1662 ) -> Option<DataReceivedResult> {
1663 let decrypted = self.decrypt_document(data, Some(identifier))?;
1665
1666 if DeltaDocument::is_delta_document(&decrypted) {
1668 return self.process_delta_document_internal(
1669 source_node,
1670 &decrypted,
1671 now_ms,
1672 relay_data,
1673 origin_node,
1674 hop_count,
1675 );
1676 }
1677
1678 let result = self.document_sync.merge_document(&decrypted)?;
1680
1681 self.peer_manager.record_sync(source_node, now_ms);
1683
1684 if result.is_emergency() {
1686 self.notify(HiveEvent::EmergencyReceived {
1687 from_node: result.source_node,
1688 });
1689 } else if result.is_ack() {
1690 self.notify(HiveEvent::AckReceived {
1691 from_node: result.source_node,
1692 });
1693 }
1694
1695 if result.counter_changed {
1696 self.notify(HiveEvent::DocumentSynced {
1697 from_node: result.source_node,
1698 total_count: result.total_count,
1699 });
1700 }
1701
1702 if relay_data.is_some() {
1704 let relay_targets = self.get_relay_targets(Some(source_node));
1705 self.notify(HiveEvent::MessageRelayed {
1706 origin_node: origin_node.unwrap_or(result.source_node),
1707 relay_count: relay_targets.len(),
1708 hop_count,
1709 });
1710 }
1711
1712 Some(DataReceivedResult {
1713 source_node: result.source_node,
1714 is_emergency: result.is_emergency(),
1715 is_ack: result.is_ack(),
1716 counter_changed: result.counter_changed,
1717 emergency_changed: result.emergency_changed,
1718 total_count: result.total_count,
1719 event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
1720 relay_data,
1721 origin_node,
1722 hop_count,
1723 })
1724 }
1725
1726 fn handle_relay_envelope_with_identifier(
1728 &self,
1729 source_node: NodeId,
1730 identifier: &str,
1731 data: &[u8],
1732 now_ms: u64,
1733 ) -> Option<DataReceivedResult> {
1734 let envelope = RelayEnvelope::decode(data)?;
1736
1737 if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
1739 let stats = self
1740 .seen_cache
1741 .lock()
1742 .unwrap()
1743 .get_stats(&envelope.message_id);
1744 let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
1745
1746 self.notify(HiveEvent::DuplicateMessageDropped {
1747 origin_node: envelope.origin_node,
1748 seen_count,
1749 });
1750 return None;
1751 }
1752
1753 let relay_data = if envelope.can_relay() && self.config.enable_relay {
1755 envelope.relay().map(|e| e.encode())
1756 } else {
1757 if !envelope.can_relay() {
1758 self.notify(HiveEvent::MessageTtlExpired {
1759 origin_node: envelope.origin_node,
1760 hop_count: envelope.hop_count,
1761 });
1762 }
1763 None
1764 };
1765
1766 self.process_document_data_with_identifier(
1768 source_node,
1769 identifier,
1770 &envelope.payload,
1771 now_ms,
1772 relay_data,
1773 Some(envelope.origin_node),
1774 envelope.hop_count,
1775 )
1776 }
1777
1778 pub fn on_ble_data_received_from_node(
1785 &self,
1786 node_id: NodeId,
1787 data: &[u8],
1788 now_ms: u64,
1789 ) -> Option<DataReceivedResult> {
1790 if data.len() >= 2 {
1792 match data[0] {
1793 KEY_EXCHANGE_MARKER => {
1794 let _response = self.handle_key_exchange(data, now_ms);
1795 return None;
1796 }
1797 PEER_E2EE_MARKER => {
1798 let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
1799 return None;
1800 }
1801 RELAY_ENVELOPE_MARKER => {
1802 return self.handle_relay_envelope(node_id, data, now_ms);
1804 }
1805 _ => {}
1806 }
1807 }
1808
1809 self.process_document_data(node_id, data, now_ms, None, None, 0)
1811 }
1812
1813 fn process_document_data(
1815 &self,
1816 source_node: NodeId,
1817 data: &[u8],
1818 now_ms: u64,
1819 relay_data: Option<Vec<u8>>,
1820 origin_node: Option<NodeId>,
1821 hop_count: u8,
1822 ) -> Option<DataReceivedResult> {
1823 let source_hint = format!("node:{:08X}", source_node.as_u32());
1825 let decrypted = self.decrypt_document(data, Some(&source_hint))?;
1826
1827 if DeltaDocument::is_delta_document(&decrypted) {
1829 return self.process_delta_document_internal(
1830 source_node,
1831 &decrypted,
1832 now_ms,
1833 relay_data,
1834 origin_node,
1835 hop_count,
1836 );
1837 }
1838
1839 let result = self.document_sync.merge_document(&decrypted)?;
1841
1842 self.peer_manager.record_sync(source_node, now_ms);
1844
1845 if result.is_emergency() {
1847 self.notify(HiveEvent::EmergencyReceived {
1848 from_node: result.source_node,
1849 });
1850 } else if result.is_ack() {
1851 self.notify(HiveEvent::AckReceived {
1852 from_node: result.source_node,
1853 });
1854 }
1855
1856 if result.counter_changed {
1857 self.notify(HiveEvent::DocumentSynced {
1858 from_node: result.source_node,
1859 total_count: result.total_count,
1860 });
1861 }
1862
1863 if relay_data.is_some() {
1865 let relay_targets = self.get_relay_targets(Some(source_node));
1866 self.notify(HiveEvent::MessageRelayed {
1867 origin_node: origin_node.unwrap_or(result.source_node),
1868 relay_count: relay_targets.len(),
1869 hop_count,
1870 });
1871 }
1872
1873 Some(DataReceivedResult {
1874 source_node: result.source_node,
1875 is_emergency: result.is_emergency(),
1876 is_ack: result.is_ack(),
1877 counter_changed: result.counter_changed,
1878 emergency_changed: result.emergency_changed,
1879 total_count: result.total_count,
1880 event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
1881 relay_data,
1882 origin_node,
1883 hop_count,
1884 })
1885 }
1886
1887 fn handle_relay_envelope(
1889 &self,
1890 source_node: NodeId,
1891 data: &[u8],
1892 now_ms: u64,
1893 ) -> Option<DataReceivedResult> {
1894 let decision = self.process_relay_envelope(data, source_node, now_ms)?;
1896
1897 let relay_data = if decision.should_relay {
1899 decision.relay_data()
1900 } else {
1901 None
1902 };
1903
1904 self.process_document_data(
1906 source_node,
1907 &decision.payload,
1908 now_ms,
1909 relay_data,
1910 Some(decision.origin_node),
1911 decision.hop_count,
1912 )
1913 }
1914
1915 pub fn on_ble_data(
1924 &self,
1925 identifier: &str,
1926 data: &[u8],
1927 now_ms: u64,
1928 ) -> Option<DataReceivedResult> {
1929 if data.len() >= 2 {
1931 match data[0] {
1932 KEY_EXCHANGE_MARKER => {
1933 let _response = self.handle_key_exchange(data, now_ms);
1934 return None;
1935 }
1936 PEER_E2EE_MARKER => {
1937 let _plaintext = self.handle_peer_e2ee_message(data, now_ms);
1938 return None;
1939 }
1940 RELAY_ENVELOPE_MARKER => {
1941 return self.handle_relay_envelope_with_incoming(identifier, data, now_ms);
1943 }
1944 _ => {}
1945 }
1946 }
1947
1948 self.process_incoming_document(identifier, data, now_ms, None, None, 0)
1950 }
1951
1952 fn process_incoming_document(
1954 &self,
1955 identifier: &str,
1956 data: &[u8],
1957 now_ms: u64,
1958 relay_data: Option<Vec<u8>>,
1959 origin_node: Option<NodeId>,
1960 hop_count: u8,
1961 ) -> Option<DataReceivedResult> {
1962 let decrypted = self.decrypt_document(data, Some(identifier))?;
1964
1965 let result = self.document_sync.merge_document(&decrypted)?;
1967
1968 self.peer_manager.record_sync(result.source_node, now_ms);
1970
1971 self.peer_manager
1973 .on_incoming_connection(identifier, result.source_node, now_ms);
1974
1975 if result.is_emergency() {
1977 self.notify(HiveEvent::EmergencyReceived {
1978 from_node: result.source_node,
1979 });
1980 } else if result.is_ack() {
1981 self.notify(HiveEvent::AckReceived {
1982 from_node: result.source_node,
1983 });
1984 }
1985
1986 if result.counter_changed {
1987 self.notify(HiveEvent::DocumentSynced {
1988 from_node: result.source_node,
1989 total_count: result.total_count,
1990 });
1991 }
1992
1993 if relay_data.is_some() {
1995 let relay_targets = self.get_relay_targets(Some(result.source_node));
1996 self.notify(HiveEvent::MessageRelayed {
1997 origin_node: origin_node.unwrap_or(result.source_node),
1998 relay_count: relay_targets.len(),
1999 hop_count,
2000 });
2001 }
2002
2003 Some(DataReceivedResult {
2004 source_node: result.source_node,
2005 is_emergency: result.is_emergency(),
2006 is_ack: result.is_ack(),
2007 counter_changed: result.counter_changed,
2008 emergency_changed: result.emergency_changed,
2009 total_count: result.total_count,
2010 event_timestamp: result.event.as_ref().map(|e| e.timestamp).unwrap_or(0),
2011 relay_data,
2012 origin_node,
2013 hop_count,
2014 })
2015 }
2016
2017 fn handle_relay_envelope_with_incoming(
2019 &self,
2020 identifier: &str,
2021 data: &[u8],
2022 now_ms: u64,
2023 ) -> Option<DataReceivedResult> {
2024 let envelope = RelayEnvelope::decode(data)?;
2026
2027 if !self.mark_message_seen(envelope.message_id, envelope.origin_node, now_ms) {
2029 let stats = self
2031 .seen_cache
2032 .lock()
2033 .unwrap()
2034 .get_stats(&envelope.message_id);
2035 let seen_count = stats.map(|(_, count, _)| count).unwrap_or(1);
2036
2037 self.notify(HiveEvent::DuplicateMessageDropped {
2038 origin_node: envelope.origin_node,
2039 seen_count,
2040 });
2041 return None;
2042 }
2043
2044 let (should_relay, relay_data) = if envelope.can_relay() && self.config.enable_relay {
2046 let relay_env = envelope.relay();
2047 (true, relay_env.map(|e| e.encode()))
2048 } else {
2049 if !envelope.can_relay() {
2050 self.notify(HiveEvent::MessageTtlExpired {
2051 origin_node: envelope.origin_node,
2052 hop_count: envelope.hop_count,
2053 });
2054 }
2055 (false, None)
2056 };
2057
2058 self.process_incoming_document(
2060 identifier,
2061 &envelope.payload,
2062 now_ms,
2063 if should_relay { relay_data } else { None },
2064 Some(envelope.origin_node),
2065 envelope.hop_count,
2066 )
2067 }
2068
2069 pub fn tick(&self, now_ms: u64) -> Option<Vec<u8>> {
2079 use std::sync::atomic::Ordering;
2080
2081 let now_ms_32 = now_ms as u32;
2083
2084 let last_cleanup = self.last_cleanup_ms.load(Ordering::Relaxed);
2086 let cleanup_elapsed = now_ms_32.wrapping_sub(last_cleanup);
2087 if cleanup_elapsed >= self.config.peer_config.cleanup_interval_ms as u32 {
2088 self.last_cleanup_ms.store(now_ms_32, Ordering::Relaxed);
2089 let removed = self.peer_manager.cleanup_stale(now_ms);
2090 for node_id in &removed {
2091 self.notify(HiveEvent::PeerLost { node_id: *node_id });
2092 }
2093 if !removed.is_empty() {
2094 self.notify_mesh_state_changed();
2095 }
2096
2097 {
2099 let mut graph = self.connection_graph.lock().unwrap();
2100 let newly_lost = graph.tick(now_ms);
2101 graph.cleanup_lost(self.config.peer_config.peer_timeout_ms, now_ms);
2103 drop(graph);
2104
2105 for node_id in newly_lost {
2108 if !removed.contains(&node_id) {
2110 self.notify(HiveEvent::PeerLost { node_id });
2111 }
2112 }
2113 }
2114 }
2115
2116 let last_sync = self.last_sync_ms.load(Ordering::Relaxed);
2118 let sync_elapsed = now_ms_32.wrapping_sub(last_sync);
2119 if sync_elapsed >= self.config.sync_interval_ms as u32 {
2120 self.last_sync_ms.store(now_ms_32, Ordering::Relaxed);
2121 if self.peer_manager.connected_count() > 0 {
2123 let doc = self.document_sync.build_document();
2124 return Some(self.encrypt_document(&doc));
2125 }
2126 }
2127
2128 None
2129 }
2130
2131 pub fn get_peers(&self) -> Vec<HivePeer> {
2135 self.peer_manager.get_peers()
2136 }
2137
2138 pub fn get_connected_peers(&self) -> Vec<HivePeer> {
2140 self.peer_manager.get_connected_peers()
2141 }
2142
2143 pub fn get_peer(&self, node_id: NodeId) -> Option<HivePeer> {
2145 self.peer_manager.get_peer(node_id)
2146 }
2147
2148 pub fn peer_count(&self) -> usize {
2150 self.peer_manager.peer_count()
2151 }
2152
2153 pub fn connected_count(&self) -> usize {
2155 self.peer_manager.connected_count()
2156 }
2157
2158 pub fn matches_mesh(&self, device_mesh_id: Option<&str>) -> bool {
2160 self.peer_manager.matches_mesh(device_mesh_id)
2161 }
2162
2163 pub fn get_connection_graph(&self) -> Vec<PeerConnectionState> {
2187 self.connection_graph.lock().unwrap().get_all_owned()
2188 }
2189
2190 pub fn get_peer_connection_state(&self, node_id: NodeId) -> Option<PeerConnectionState> {
2192 self.connection_graph
2193 .lock()
2194 .unwrap()
2195 .get_peer(node_id)
2196 .cloned()
2197 }
2198
2199 pub fn get_connected_states(&self) -> Vec<PeerConnectionState> {
2201 self.connection_graph
2202 .lock()
2203 .unwrap()
2204 .get_connected()
2205 .into_iter()
2206 .cloned()
2207 .collect()
2208 }
2209
2210 pub fn get_degraded_peers(&self) -> Vec<PeerConnectionState> {
2212 self.connection_graph
2213 .lock()
2214 .unwrap()
2215 .get_degraded()
2216 .into_iter()
2217 .cloned()
2218 .collect()
2219 }
2220
2221 pub fn get_recently_disconnected(
2225 &self,
2226 within_ms: u64,
2227 now_ms: u64,
2228 ) -> Vec<PeerConnectionState> {
2229 self.connection_graph
2230 .lock()
2231 .unwrap()
2232 .get_recently_disconnected(within_ms, now_ms)
2233 .into_iter()
2234 .cloned()
2235 .collect()
2236 }
2237
2238 pub fn get_lost_peers(&self) -> Vec<PeerConnectionState> {
2240 self.connection_graph
2241 .lock()
2242 .unwrap()
2243 .get_lost()
2244 .into_iter()
2245 .cloned()
2246 .collect()
2247 }
2248
2249 pub fn get_connection_state_counts(&self) -> StateCountSummary {
2251 self.connection_graph.lock().unwrap().state_counts()
2252 }
2253
2254 pub fn get_indirect_peers(&self) -> Vec<IndirectPeer> {
2262 self.connection_graph
2263 .lock()
2264 .unwrap()
2265 .get_indirect_peers_owned()
2266 }
2267
2268 pub fn get_peer_degree(&self, node_id: NodeId) -> Option<PeerDegree> {
2275 self.connection_graph.lock().unwrap().peer_degree(node_id)
2276 }
2277
2278 pub fn get_full_state_counts(&self) -> FullStateCountSummary {
2283 self.connection_graph.lock().unwrap().full_state_counts()
2284 }
2285
2286 pub fn get_paths_to_peer(&self, node_id: NodeId) -> Vec<(NodeId, u8)> {
2291 self.connection_graph.lock().unwrap().get_paths_to(node_id)
2292 }
2293
2294 pub fn is_peer_known(&self, node_id: NodeId) -> bool {
2296 self.connection_graph.lock().unwrap().is_known(node_id)
2297 }
2298
2299 pub fn indirect_peer_count(&self) -> usize {
2301 self.connection_graph.lock().unwrap().indirect_peer_count()
2302 }
2303
2304 pub fn cleanup_indirect_peers(&self, now_ms: u64) -> Vec<NodeId> {
2309 self.connection_graph
2310 .lock()
2311 .unwrap()
2312 .cleanup_indirect(now_ms)
2313 }
2314
2315 pub fn total_count(&self) -> u64 {
2317 self.document_sync.total_count()
2318 }
2319
2320 pub fn document_version(&self) -> u32 {
2322 self.document_sync.version()
2323 }
2324
2325 pub fn version(&self) -> u32 {
2327 self.document_sync.version()
2328 }
2329
2330 pub fn update_health(&self, battery_percent: u8) {
2332 self.document_sync.update_health(battery_percent);
2333 }
2334
2335 pub fn update_activity(&self, activity: u8) {
2337 self.document_sync.update_activity(activity);
2338 }
2339
2340 pub fn update_health_full(&self, battery_percent: u8, activity: u8) {
2342 self.document_sync
2343 .update_health_full(battery_percent, activity);
2344 }
2345
2346 pub fn build_document(&self) -> Vec<u8> {
2350 let doc = self.document_sync.build_document();
2351 self.encrypt_document(&doc)
2352 }
2353
2354 pub fn peers_needing_sync(&self, now_ms: u64) -> Vec<HivePeer> {
2356 self.peer_manager.peers_needing_sync(now_ms)
2357 }
2358
2359 fn notify(&self, event: HiveEvent) {
2362 self.observers.notify(event);
2363 }
2364
2365 fn notify_mesh_state_changed(&self) {
2366 self.notify(HiveEvent::MeshStateChanged {
2367 peer_count: self.peer_manager.peer_count(),
2368 connected_count: self.peer_manager.connected_count(),
2369 });
2370 }
2371}
2372
2373#[derive(Debug, Clone)]
2375pub struct DataReceivedResult {
2376 pub source_node: NodeId,
2378
2379 pub is_emergency: bool,
2381
2382 pub is_ack: bool,
2384
2385 pub counter_changed: bool,
2387
2388 pub emergency_changed: bool,
2390
2391 pub total_count: u64,
2393
2394 pub event_timestamp: u64,
2396
2397 pub relay_data: Option<Vec<u8>>,
2402
2403 pub origin_node: Option<NodeId>,
2405
2406 pub hop_count: u8,
2408}
2409
2410#[derive(Debug, Clone)]
2412pub struct RelayDecision {
2413 pub payload: Vec<u8>,
2415
2416 pub origin_node: NodeId,
2418
2419 pub hop_count: u8,
2421
2422 pub should_relay: bool,
2424
2425 pub relay_envelope: Option<RelayEnvelope>,
2429}
2430
2431impl RelayDecision {
2432 pub fn relay_data(&self) -> Option<Vec<u8>> {
2436 self.relay_envelope.as_ref().map(|e| e.encode())
2437 }
2438}
2439
2440#[cfg(all(test, feature = "std"))]
2441mod tests {
2442 use super::*;
2443 use crate::observer::CollectingObserver;
2444
2445 const TEST_TIMESTAMP: u64 = 1705276800000;
2447
2448 fn create_mesh(node_id: u32, callsign: &str) -> HiveMesh {
2449 let config = HiveMeshConfig::new(NodeId::new(node_id), callsign, "TEST");
2450 HiveMesh::new(config)
2451 }
2452
2453 #[test]
2454 fn test_mesh_creation() {
2455 let mesh = create_mesh(0x12345678, "ALPHA-1");
2456
2457 assert_eq!(mesh.node_id().as_u32(), 0x12345678);
2458 assert_eq!(mesh.callsign(), "ALPHA-1");
2459 assert_eq!(mesh.mesh_id(), "TEST");
2460 assert_eq!(mesh.device_name(), "HIVE_TEST-12345678");
2461 }
2462
2463 #[test]
2464 fn test_peer_discovery() {
2465 let mesh = create_mesh(0x11111111, "ALPHA-1");
2466 let observer = Arc::new(CollectingObserver::new());
2467 mesh.add_observer(observer.clone());
2468
2469 let peer = mesh.on_ble_discovered(
2471 "device-uuid",
2472 Some("HIVE_TEST-22222222"),
2473 -65,
2474 Some("TEST"),
2475 1000,
2476 );
2477
2478 assert!(peer.is_some());
2479 let peer = peer.unwrap();
2480 assert_eq!(peer.node_id.as_u32(), 0x22222222);
2481
2482 let events = observer.events();
2484 assert!(events
2485 .iter()
2486 .any(|e| matches!(e, HiveEvent::PeerDiscovered { .. })));
2487 assert!(events
2488 .iter()
2489 .any(|e| matches!(e, HiveEvent::MeshStateChanged { .. })));
2490 }
2491
2492 #[test]
2493 fn test_connection_lifecycle() {
2494 let mesh = create_mesh(0x11111111, "ALPHA-1");
2495 let observer = Arc::new(CollectingObserver::new());
2496 mesh.add_observer(observer.clone());
2497
2498 mesh.on_ble_discovered(
2500 "device-uuid",
2501 Some("HIVE_TEST-22222222"),
2502 -65,
2503 Some("TEST"),
2504 1000,
2505 );
2506
2507 let node_id = mesh.on_ble_connected("device-uuid", 2000);
2508 assert_eq!(node_id, Some(NodeId::new(0x22222222)));
2509 assert_eq!(mesh.connected_count(), 1);
2510
2511 let node_id = mesh.on_ble_disconnected("device-uuid", DisconnectReason::RemoteRequest);
2513 assert_eq!(node_id, Some(NodeId::new(0x22222222)));
2514 assert_eq!(mesh.connected_count(), 0);
2515
2516 let events = observer.events();
2518 assert!(events
2519 .iter()
2520 .any(|e| matches!(e, HiveEvent::PeerConnected { .. })));
2521 assert!(events
2522 .iter()
2523 .any(|e| matches!(e, HiveEvent::PeerDisconnected { .. })));
2524 }
2525
2526 #[test]
2527 fn test_emergency_flow() {
2528 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
2529 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
2530
2531 let observer2 = Arc::new(CollectingObserver::new());
2532 mesh2.add_observer(observer2.clone());
2533
2534 let doc = mesh1.send_emergency(TEST_TIMESTAMP);
2536 assert!(mesh1.is_emergency_active());
2537
2538 let result =
2540 mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
2541
2542 assert!(result.is_some());
2543 let result = result.unwrap();
2544 assert!(result.is_emergency);
2545 assert_eq!(result.source_node.as_u32(), 0x11111111);
2546
2547 let events = observer2.events();
2549 assert!(events
2550 .iter()
2551 .any(|e| matches!(e, HiveEvent::EmergencyReceived { .. })));
2552 }
2553
2554 #[test]
2555 fn test_ack_flow() {
2556 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
2557 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
2558
2559 let observer2 = Arc::new(CollectingObserver::new());
2560 mesh2.add_observer(observer2.clone());
2561
2562 let doc = mesh1.send_ack(TEST_TIMESTAMP);
2564 assert!(mesh1.is_ack_active());
2565
2566 let result =
2568 mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
2569
2570 assert!(result.is_some());
2571 let result = result.unwrap();
2572 assert!(result.is_ack);
2573
2574 let events = observer2.events();
2576 assert!(events
2577 .iter()
2578 .any(|e| matches!(e, HiveEvent::AckReceived { .. })));
2579 }
2580
2581 #[test]
2582 fn test_tick_cleanup() {
2583 let config = HiveMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
2584 .with_peer_timeout(10_000);
2585 let mesh = HiveMesh::new(config);
2586
2587 let observer = Arc::new(CollectingObserver::new());
2588 mesh.add_observer(observer.clone());
2589
2590 mesh.on_ble_discovered(
2592 "device-uuid",
2593 Some("HIVE_TEST-22222222"),
2594 -65,
2595 Some("TEST"),
2596 1000,
2597 );
2598 assert_eq!(mesh.peer_count(), 1);
2599
2600 mesh.tick(5000);
2602 assert_eq!(mesh.peer_count(), 1);
2603
2604 mesh.tick(20000);
2606 assert_eq!(mesh.peer_count(), 0);
2607
2608 let events = observer.events();
2610 assert!(events
2611 .iter()
2612 .any(|e| matches!(e, HiveEvent::PeerLost { .. })));
2613 }
2614
2615 #[test]
2616 fn test_tick_sync_broadcast() {
2617 let config = HiveMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
2618 .with_sync_interval(5000);
2619 let mesh = HiveMesh::new(config);
2620
2621 mesh.on_ble_discovered(
2623 "device-uuid",
2624 Some("HIVE_TEST-22222222"),
2625 -65,
2626 Some("TEST"),
2627 1000,
2628 );
2629 mesh.on_ble_connected("device-uuid", 1000);
2630
2631 let _result = mesh.tick(0);
2633 let result = mesh.tick(3000);
2637 assert!(result.is_none());
2638
2639 let result = mesh.tick(6000);
2641 assert!(result.is_some());
2642
2643 let result = mesh.tick(6100);
2645 assert!(result.is_none());
2646
2647 let result = mesh.tick(12000);
2649 assert!(result.is_some());
2650 }
2651
2652 #[test]
2653 fn test_incoming_connection() {
2654 let mesh = create_mesh(0x11111111, "ALPHA-1");
2655 let observer = Arc::new(CollectingObserver::new());
2656 mesh.add_observer(observer.clone());
2657
2658 let is_new = mesh.on_incoming_connection("central-uuid", NodeId::new(0x22222222), 1000);
2660
2661 assert!(is_new);
2662 assert_eq!(mesh.peer_count(), 1);
2663 assert_eq!(mesh.connected_count(), 1);
2664
2665 let events = observer.events();
2667 assert!(events
2668 .iter()
2669 .any(|e| matches!(e, HiveEvent::PeerDiscovered { .. })));
2670 assert!(events
2671 .iter()
2672 .any(|e| matches!(e, HiveEvent::PeerConnected { .. })));
2673 }
2674
2675 #[test]
2676 fn test_mesh_filtering() {
2677 let mesh = create_mesh(0x11111111, "ALPHA-1");
2678
2679 let peer = mesh.on_ble_discovered(
2681 "device-uuid-1",
2682 Some("HIVE_OTHER-22222222"),
2683 -65,
2684 Some("OTHER"),
2685 1000,
2686 );
2687 assert!(peer.is_none());
2688 assert_eq!(mesh.peer_count(), 0);
2689
2690 let peer = mesh.on_ble_discovered(
2692 "device-uuid-2",
2693 Some("HIVE_TEST-33333333"),
2694 -65,
2695 Some("TEST"),
2696 1000,
2697 );
2698 assert!(peer.is_some());
2699 assert_eq!(mesh.peer_count(), 1);
2700 }
2701
2702 fn create_encrypted_mesh(node_id: u32, callsign: &str, secret: [u8; 32]) -> HiveMesh {
2705 let config =
2706 HiveMeshConfig::new(NodeId::new(node_id), callsign, "TEST").with_encryption(secret);
2707 HiveMesh::new(config)
2708 }
2709
2710 #[test]
2711 fn test_encryption_enabled() {
2712 let secret = [0x42u8; 32];
2713 let mesh = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
2714
2715 assert!(mesh.is_encryption_enabled());
2716 }
2717
2718 #[test]
2719 fn test_encryption_disabled_by_default() {
2720 let mesh = create_mesh(0x11111111, "ALPHA-1");
2721
2722 assert!(!mesh.is_encryption_enabled());
2723 }
2724
2725 #[test]
2726 fn test_encrypted_document_exchange() {
2727 let secret = [0x42u8; 32];
2728 let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
2729 let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret);
2730
2731 let doc = mesh1.build_document();
2733
2734 assert!(doc.len() >= 2);
2736 assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
2737
2738 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
2740
2741 assert!(result.is_some());
2742 let result = result.unwrap();
2743 assert_eq!(result.source_node.as_u32(), 0x11111111);
2744 }
2745
2746 #[test]
2747 fn test_encrypted_emergency_exchange() {
2748 let secret = [0x42u8; 32];
2749 let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
2750 let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret);
2751
2752 let observer = Arc::new(CollectingObserver::new());
2753 mesh2.add_observer(observer.clone());
2754
2755 let doc = mesh1.send_emergency(TEST_TIMESTAMP);
2757
2758 let result =
2760 mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, TEST_TIMESTAMP);
2761
2762 assert!(result.is_some());
2763 let result = result.unwrap();
2764 assert!(result.is_emergency);
2765
2766 let events = observer.events();
2768 assert!(events
2769 .iter()
2770 .any(|e| matches!(e, HiveEvent::EmergencyReceived { .. })));
2771 }
2772
2773 #[test]
2774 fn test_wrong_key_fails_decrypt() {
2775 let secret1 = [0x42u8; 32];
2776 let secret2 = [0x43u8; 32]; let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret1);
2778 let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret2);
2779
2780 let doc = mesh1.build_document();
2782
2783 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
2785
2786 assert!(result.is_none());
2787 }
2788
2789 #[test]
2790 fn test_unencrypted_mesh_can_read_unencrypted() {
2791 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
2792 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
2793
2794 let doc = mesh1.build_document();
2796
2797 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
2799
2800 assert!(result.is_some());
2801 }
2802
2803 #[test]
2804 fn test_encrypted_mesh_can_receive_unencrypted() {
2805 let secret = [0x42u8; 32];
2807 let mesh1 = create_mesh(0x11111111, "ALPHA-1"); let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret); let doc = mesh1.build_document();
2812
2813 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
2815
2816 assert!(result.is_some());
2817 }
2818
2819 #[test]
2820 fn test_unencrypted_mesh_cannot_receive_encrypted() {
2821 let secret = [0x42u8; 32];
2822 let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret); let mesh2 = create_mesh(0x22222222, "BRAVO-1"); let doc = mesh1.build_document();
2827
2828 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
2830
2831 assert!(result.is_none());
2832 }
2833
2834 #[test]
2835 fn test_enable_disable_encryption() {
2836 let mut mesh = create_mesh(0x11111111, "ALPHA-1");
2837
2838 assert!(!mesh.is_encryption_enabled());
2839
2840 let secret = [0x42u8; 32];
2842 mesh.enable_encryption(&secret);
2843 assert!(mesh.is_encryption_enabled());
2844
2845 let doc = mesh.build_document();
2847 assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
2848
2849 mesh.disable_encryption();
2851 assert!(!mesh.is_encryption_enabled());
2852
2853 let doc = mesh.build_document();
2855 assert_ne!(doc[0], crate::document::ENCRYPTED_MARKER);
2856 }
2857
2858 #[test]
2859 fn test_encryption_overhead() {
2860 let secret = [0x42u8; 32];
2861 let mesh_encrypted = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
2862 let mesh_unencrypted = create_mesh(0x22222222, "BRAVO-1");
2863
2864 let doc_encrypted = mesh_encrypted.build_document();
2865 let doc_unencrypted = mesh_unencrypted.build_document();
2866
2867 let overhead = doc_encrypted.len() - doc_unencrypted.len();
2873 assert_eq!(overhead, 30); }
2875
2876 #[test]
2879 fn test_peer_e2ee_enable_disable() {
2880 let mesh = create_mesh(0x11111111, "ALPHA-1");
2881
2882 assert!(!mesh.is_peer_e2ee_enabled());
2883 assert!(mesh.peer_e2ee_public_key().is_none());
2884
2885 mesh.enable_peer_e2ee();
2886 assert!(mesh.is_peer_e2ee_enabled());
2887 assert!(mesh.peer_e2ee_public_key().is_some());
2888
2889 mesh.disable_peer_e2ee();
2890 assert!(!mesh.is_peer_e2ee_enabled());
2891 }
2892
2893 #[test]
2894 fn test_peer_e2ee_initiate_session() {
2895 let mesh = create_mesh(0x11111111, "ALPHA-1");
2896 mesh.enable_peer_e2ee();
2897
2898 let key_exchange = mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
2899 assert!(key_exchange.is_some());
2900
2901 let key_exchange = key_exchange.unwrap();
2902 assert_eq!(key_exchange[0], crate::document::KEY_EXCHANGE_MARKER);
2904
2905 assert_eq!(mesh.peer_e2ee_session_count(), 1);
2907 assert_eq!(mesh.peer_e2ee_established_count(), 0);
2908 }
2909
2910 #[test]
2911 fn test_peer_e2ee_full_handshake() {
2912 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
2913 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
2914
2915 mesh1.enable_peer_e2ee();
2916 mesh2.enable_peer_e2ee();
2917
2918 let observer1 = Arc::new(CollectingObserver::new());
2919 let observer2 = Arc::new(CollectingObserver::new());
2920 mesh1.add_observer(observer1.clone());
2921 mesh2.add_observer(observer2.clone());
2922
2923 let key_exchange1 = mesh1
2925 .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
2926 .unwrap();
2927
2928 let response = mesh2.handle_key_exchange(&key_exchange1, 1000);
2930 assert!(response.is_some());
2931
2932 assert!(mesh2.has_peer_e2ee_session(NodeId::new(0x11111111)));
2934
2935 let key_exchange2 = response.unwrap();
2937 let _ = mesh1.handle_key_exchange(&key_exchange2, 1000);
2938
2939 assert!(mesh1.has_peer_e2ee_session(NodeId::new(0x22222222)));
2941
2942 let events1 = observer1.events();
2944 assert!(events1
2945 .iter()
2946 .any(|e| matches!(e, HiveEvent::PeerE2eeEstablished { .. })));
2947
2948 let events2 = observer2.events();
2949 assert!(events2
2950 .iter()
2951 .any(|e| matches!(e, HiveEvent::PeerE2eeEstablished { .. })));
2952 }
2953
2954 #[test]
2955 fn test_peer_e2ee_encrypt_decrypt() {
2956 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
2957 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
2958
2959 mesh1.enable_peer_e2ee();
2960 mesh2.enable_peer_e2ee();
2961
2962 let key_exchange1 = mesh1
2964 .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
2965 .unwrap();
2966 let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
2967 mesh1.handle_key_exchange(&key_exchange2, 1000);
2968
2969 let plaintext = b"Secret message from mesh1";
2971 let encrypted = mesh1.send_peer_e2ee(NodeId::new(0x22222222), plaintext, 2000);
2972 assert!(encrypted.is_some());
2973
2974 let encrypted = encrypted.unwrap();
2975 assert_eq!(encrypted[0], crate::document::PEER_E2EE_MARKER);
2977
2978 let observer2 = Arc::new(CollectingObserver::new());
2980 mesh2.add_observer(observer2.clone());
2981
2982 let decrypted = mesh2.handle_peer_e2ee_message(&encrypted, 2000);
2983 assert!(decrypted.is_some());
2984 assert_eq!(decrypted.unwrap(), plaintext);
2985
2986 let events = observer2.events();
2988 assert!(events.iter().any(|e| matches!(
2989 e,
2990 HiveEvent::PeerE2eeMessageReceived { from_node, data }
2991 if from_node.as_u32() == 0x11111111 && data == plaintext
2992 )));
2993 }
2994
2995 #[test]
2996 fn test_peer_e2ee_bidirectional() {
2997 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
2998 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
2999
3000 mesh1.enable_peer_e2ee();
3001 mesh2.enable_peer_e2ee();
3002
3003 let key_exchange1 = mesh1
3005 .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
3006 .unwrap();
3007 let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
3008 mesh1.handle_key_exchange(&key_exchange2, 1000);
3009
3010 let msg1 = mesh1
3012 .send_peer_e2ee(NodeId::new(0x22222222), b"Hello from mesh1", 2000)
3013 .unwrap();
3014 let dec1 = mesh2.handle_peer_e2ee_message(&msg1, 2000).unwrap();
3015 assert_eq!(dec1, b"Hello from mesh1");
3016
3017 let msg2 = mesh2
3019 .send_peer_e2ee(NodeId::new(0x11111111), b"Hello from mesh2", 2000)
3020 .unwrap();
3021 let dec2 = mesh1.handle_peer_e2ee_message(&msg2, 2000).unwrap();
3022 assert_eq!(dec2, b"Hello from mesh2");
3023 }
3024
3025 #[test]
3026 fn test_peer_e2ee_close_session() {
3027 let mesh = create_mesh(0x11111111, "ALPHA-1");
3028 mesh.enable_peer_e2ee();
3029
3030 let observer = Arc::new(CollectingObserver::new());
3031 mesh.add_observer(observer.clone());
3032
3033 mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
3035 assert_eq!(mesh.peer_e2ee_session_count(), 1);
3036
3037 mesh.close_peer_e2ee(NodeId::new(0x22222222));
3039
3040 let events = observer.events();
3042 assert!(events
3043 .iter()
3044 .any(|e| matches!(e, HiveEvent::PeerE2eeClosed { .. })));
3045 }
3046
3047 #[test]
3048 fn test_peer_e2ee_without_enabling() {
3049 let mesh = create_mesh(0x11111111, "ALPHA-1");
3050
3051 let result = mesh.initiate_peer_e2ee(NodeId::new(0x22222222), 1000);
3053 assert!(result.is_none());
3054
3055 let result = mesh.send_peer_e2ee(NodeId::new(0x22222222), b"test", 1000);
3056 assert!(result.is_none());
3057
3058 assert!(!mesh.has_peer_e2ee_session(NodeId::new(0x22222222)));
3059 }
3060
3061 #[test]
3062 fn test_peer_e2ee_overhead() {
3063 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
3064 let mesh2 = create_mesh(0x22222222, "BRAVO-1");
3065
3066 mesh1.enable_peer_e2ee();
3067 mesh2.enable_peer_e2ee();
3068
3069 let key_exchange1 = mesh1
3071 .initiate_peer_e2ee(NodeId::new(0x22222222), 1000)
3072 .unwrap();
3073 let key_exchange2 = mesh2.handle_key_exchange(&key_exchange1, 1000).unwrap();
3074 mesh1.handle_key_exchange(&key_exchange2, 1000);
3075
3076 let plaintext = b"Test message";
3078 let encrypted = mesh1
3079 .send_peer_e2ee(NodeId::new(0x22222222), plaintext, 2000)
3080 .unwrap();
3081
3082 let overhead = encrypted.len() - plaintext.len();
3091 assert_eq!(overhead, 46);
3092 }
3093
3094 fn create_strict_encrypted_mesh(node_id: u32, callsign: &str, secret: [u8; 32]) -> HiveMesh {
3097 let config = HiveMeshConfig::new(NodeId::new(node_id), callsign, "TEST")
3098 .with_encryption(secret)
3099 .with_strict_encryption();
3100 HiveMesh::new(config)
3101 }
3102
3103 #[test]
3104 fn test_strict_encryption_enabled() {
3105 let secret = [0x42u8; 32];
3106 let mesh = create_strict_encrypted_mesh(0x11111111, "ALPHA-1", secret);
3107
3108 assert!(mesh.is_encryption_enabled());
3109 assert!(mesh.is_strict_encryption_enabled());
3110 }
3111
3112 #[test]
3113 fn test_strict_encryption_disabled_by_default() {
3114 let secret = [0x42u8; 32];
3115 let mesh = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
3116
3117 assert!(mesh.is_encryption_enabled());
3118 assert!(!mesh.is_strict_encryption_enabled());
3119 }
3120
3121 #[test]
3122 fn test_strict_encryption_requires_encryption_enabled() {
3123 let config = HiveMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
3125 .with_strict_encryption(); let mesh = HiveMesh::new(config);
3127
3128 assert!(!mesh.is_encryption_enabled());
3129 assert!(!mesh.is_strict_encryption_enabled());
3130 }
3131
3132 #[test]
3133 fn test_strict_mode_accepts_encrypted_documents() {
3134 let secret = [0x42u8; 32];
3135 let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret);
3136 let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret);
3137
3138 let doc = mesh1.build_document();
3140 assert_eq!(doc[0], crate::document::ENCRYPTED_MARKER);
3141
3142 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
3144 assert!(result.is_some());
3145 }
3146
3147 #[test]
3148 fn test_strict_mode_rejects_unencrypted_documents() {
3149 let secret = [0x42u8; 32];
3150 let mesh1 = create_mesh(0x11111111, "ALPHA-1"); let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret); let observer = Arc::new(CollectingObserver::new());
3154 mesh2.add_observer(observer.clone());
3155
3156 let doc = mesh1.build_document();
3158 assert_ne!(doc[0], crate::document::ENCRYPTED_MARKER);
3159
3160 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
3162 assert!(result.is_none());
3163
3164 let events = observer.events();
3166 assert!(events.iter().any(|e| matches!(
3167 e,
3168 HiveEvent::SecurityViolation {
3169 kind: crate::observer::SecurityViolationKind::UnencryptedInStrictMode,
3170 ..
3171 }
3172 )));
3173 }
3174
3175 #[test]
3176 fn test_non_strict_mode_accepts_unencrypted_documents() {
3177 let secret = [0x42u8; 32];
3178 let mesh1 = create_mesh(0x11111111, "ALPHA-1"); let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret); let doc = mesh1.build_document();
3183
3184 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
3186 assert!(result.is_some());
3187 }
3188
3189 #[test]
3190 fn test_strict_mode_security_violation_event_includes_source() {
3191 let secret = [0x42u8; 32];
3192 let mesh1 = create_mesh(0x11111111, "ALPHA-1");
3193 let mesh2 = create_strict_encrypted_mesh(0x22222222, "BRAVO-1", secret);
3194
3195 let observer = Arc::new(CollectingObserver::new());
3196 mesh2.add_observer(observer.clone());
3197
3198 let doc = mesh1.build_document();
3199
3200 mesh2.on_ble_discovered(
3202 "test-device-uuid",
3203 Some("HIVE_TEST-11111111"),
3204 -65,
3205 Some("TEST"),
3206 500,
3207 );
3208 mesh2.on_ble_connected("test-device-uuid", 600);
3209
3210 let _result = mesh2.on_ble_data_received("test-device-uuid", &doc, 1000);
3211
3212 let events = observer.events();
3214 let violation = events.iter().find(|e| {
3215 matches!(
3216 e,
3217 HiveEvent::SecurityViolation {
3218 kind: crate::observer::SecurityViolationKind::UnencryptedInStrictMode,
3219 ..
3220 }
3221 )
3222 });
3223 assert!(violation.is_some());
3224
3225 if let Some(HiveEvent::SecurityViolation { source, .. }) = violation {
3226 assert!(source.is_some());
3227 assert_eq!(source.as_ref().unwrap(), "test-device-uuid");
3228 }
3229 }
3230
3231 #[test]
3232 fn test_decryption_failure_emits_security_violation() {
3233 let secret1 = [0x42u8; 32];
3234 let secret2 = [0x43u8; 32]; let mesh1 = create_encrypted_mesh(0x11111111, "ALPHA-1", secret1);
3236 let mesh2 = create_encrypted_mesh(0x22222222, "BRAVO-1", secret2);
3237
3238 let observer = Arc::new(CollectingObserver::new());
3239 mesh2.add_observer(observer.clone());
3240
3241 let doc = mesh1.build_document();
3243
3244 let result = mesh2.on_ble_data_received_from_node(NodeId::new(0x11111111), &doc, 1000);
3246 assert!(result.is_none());
3247
3248 let events = observer.events();
3250 assert!(events.iter().any(|e| matches!(
3251 e,
3252 HiveEvent::SecurityViolation {
3253 kind: crate::observer::SecurityViolationKind::DecryptionFailed,
3254 ..
3255 }
3256 )));
3257 }
3258
3259 #[test]
3260 fn test_strict_mode_builder_chain() {
3261 let secret = [0x42u8; 32];
3262 let config = HiveMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
3263 .with_encryption(secret)
3264 .with_strict_encryption()
3265 .with_sync_interval(10_000)
3266 .with_peer_timeout(60_000);
3267
3268 let mesh = HiveMesh::new(config);
3269
3270 assert!(mesh.is_encryption_enabled());
3271 assert!(mesh.is_strict_encryption_enabled());
3272 }
3273
3274 fn create_relay_mesh(node_id: u32, callsign: &str) -> HiveMesh {
3277 let config = HiveMeshConfig::new(NodeId::new(node_id), callsign, "TEST").with_relay();
3278 HiveMesh::new(config)
3279 }
3280
3281 #[test]
3282 fn test_relay_disabled_by_default() {
3283 let mesh = create_mesh(0x11111111, "ALPHA-1");
3284 assert!(!mesh.is_relay_enabled());
3285 }
3286
3287 #[test]
3288 fn test_relay_enabled() {
3289 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
3290 assert!(mesh.is_relay_enabled());
3291 }
3292
3293 #[test]
3294 fn test_relay_config_builder() {
3295 let config = HiveMeshConfig::new(NodeId::new(0x11111111), "ALPHA-1", "TEST")
3296 .with_relay()
3297 .with_max_relay_hops(5)
3298 .with_relay_fanout(3)
3299 .with_seen_cache_ttl(60_000);
3300
3301 assert!(config.enable_relay);
3302 assert_eq!(config.max_relay_hops, 5);
3303 assert_eq!(config.relay_fanout, 3);
3304 assert_eq!(config.seen_cache_ttl_ms, 60_000);
3305 }
3306
3307 #[test]
3308 fn test_seen_message_deduplication() {
3309 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
3310 let origin = NodeId::new(0x22222222);
3311 let msg_id = crate::relay::MessageId::from_content(origin, 1000, 0xDEADBEEF);
3312
3313 assert!(mesh.mark_message_seen(msg_id, origin, 1000));
3315
3316 assert!(!mesh.mark_message_seen(msg_id, origin, 2000));
3318
3319 assert_eq!(mesh.seen_cache_size(), 1);
3320 }
3321
3322 #[test]
3323 fn test_wrap_for_relay() {
3324 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
3325
3326 let payload = vec![1, 2, 3, 4, 5];
3327 let wrapped = mesh.wrap_for_relay(payload.clone());
3328
3329 assert_eq!(wrapped[0], crate::relay::RELAY_ENVELOPE_MARKER);
3331
3332 let envelope = crate::relay::RelayEnvelope::decode(&wrapped).unwrap();
3334 assert_eq!(envelope.payload, payload);
3335 assert_eq!(envelope.origin_node, NodeId::new(0x11111111));
3336 assert_eq!(envelope.hop_count, 0);
3337 }
3338
3339 #[test]
3340 fn test_process_relay_envelope_new_message() {
3341 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
3342 let observer = Arc::new(CollectingObserver::new());
3343 mesh.add_observer(observer.clone());
3344
3345 let payload = vec![1, 2, 3, 4, 5];
3347 let envelope =
3348 crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload.clone())
3349 .with_max_hops(7);
3350 let data = envelope.encode();
3351
3352 let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
3354
3355 assert!(decision.is_some());
3356 let decision = decision.unwrap();
3357 assert_eq!(decision.payload, payload);
3358 assert_eq!(decision.origin_node.as_u32(), 0x22222222);
3359 assert_eq!(decision.hop_count, 0);
3360 assert!(decision.should_relay);
3361 assert!(decision.relay_envelope.is_some());
3362
3363 let relay_env = decision.relay_envelope.unwrap();
3365 assert_eq!(relay_env.hop_count, 1);
3366 }
3367
3368 #[test]
3369 fn test_process_relay_envelope_duplicate() {
3370 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
3371 let observer = Arc::new(CollectingObserver::new());
3372 mesh.add_observer(observer.clone());
3373
3374 let payload = vec![1, 2, 3, 4, 5];
3375 let envelope = crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload);
3376 let data = envelope.encode();
3377
3378 let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
3380 assert!(decision.is_some());
3381
3382 let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 2000);
3384 assert!(decision.is_none());
3385
3386 let events = observer.events();
3388 assert!(events
3389 .iter()
3390 .any(|e| matches!(e, HiveEvent::DuplicateMessageDropped { .. })));
3391 }
3392
3393 #[test]
3394 fn test_process_relay_envelope_ttl_expired() {
3395 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
3396 let observer = Arc::new(CollectingObserver::new());
3397 mesh.add_observer(observer.clone());
3398
3399 let payload = vec![1, 2, 3, 4, 5];
3401 let mut envelope =
3402 crate::relay::RelayEnvelope::broadcast(NodeId::new(0x22222222), payload.clone())
3403 .with_max_hops(3);
3404
3405 envelope = envelope.relay().unwrap(); envelope = envelope.relay().unwrap(); envelope = envelope.relay().unwrap(); let data = envelope.encode();
3411
3412 let decision = mesh.process_relay_envelope(&data, NodeId::new(0x33333333), 1000);
3414
3415 assert!(decision.is_some());
3416 let decision = decision.unwrap();
3417 assert_eq!(decision.payload, payload);
3418 assert!(!decision.should_relay); assert!(decision.relay_envelope.is_none());
3420
3421 let events = observer.events();
3423 assert!(events
3424 .iter()
3425 .any(|e| matches!(e, HiveEvent::MessageTtlExpired { .. })));
3426 }
3427
3428 #[test]
3429 fn test_build_relay_document() {
3430 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
3431
3432 let relay_doc = mesh.build_relay_document();
3433
3434 assert_eq!(relay_doc[0], crate::relay::RELAY_ENVELOPE_MARKER);
3436
3437 let envelope = crate::relay::RelayEnvelope::decode(&relay_doc).unwrap();
3439 assert_eq!(envelope.origin_node.as_u32(), 0x11111111);
3440
3441 let doc = crate::document::HiveDocument::decode(&envelope.payload);
3443 assert!(doc.is_some());
3444 }
3445
3446 #[test]
3447 fn test_relay_targets_excludes_source() {
3448 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
3449
3450 mesh.on_ble_discovered(
3452 "peer-1",
3453 Some("HIVE_TEST-22222222"),
3454 -60,
3455 Some("TEST"),
3456 1000,
3457 );
3458 mesh.on_ble_connected("peer-1", 1000);
3459
3460 mesh.on_ble_discovered(
3461 "peer-2",
3462 Some("HIVE_TEST-33333333"),
3463 -65,
3464 Some("TEST"),
3465 1000,
3466 );
3467 mesh.on_ble_connected("peer-2", 1000);
3468
3469 mesh.on_ble_discovered(
3470 "peer-3",
3471 Some("HIVE_TEST-44444444"),
3472 -70,
3473 Some("TEST"),
3474 1000,
3475 );
3476 mesh.on_ble_connected("peer-3", 1000);
3477
3478 let targets = mesh.get_relay_targets(Some(NodeId::new(0x33333333)));
3480
3481 assert!(targets.iter().all(|p| p.node_id.as_u32() != 0x33333333));
3483 }
3484
3485 #[test]
3486 fn test_clear_seen_cache() {
3487 let mesh = create_relay_mesh(0x11111111, "ALPHA-1");
3488 let origin = NodeId::new(0x22222222);
3489
3490 mesh.mark_message_seen(
3492 crate::relay::MessageId::from_content(origin, 1000, 0x11111111),
3493 origin,
3494 1000,
3495 );
3496 mesh.mark_message_seen(
3497 crate::relay::MessageId::from_content(origin, 2000, 0x22222222),
3498 origin,
3499 2000,
3500 );
3501
3502 assert_eq!(mesh.seen_cache_size(), 2);
3503
3504 mesh.clear_seen_cache();
3506 assert_eq!(mesh.seen_cache_size(), 0);
3507 }
3508}