1use crate::global::protocol_structures::block_header::BlockType;
2use crate::global::protocol_structures::routing_header::SignatureType;
3use crate::stdlib::boxed::Box;
4use crate::stdlib::{cell::RefCell, rc::Rc};
5use crate::task::{self, sleep, spawn_with_panic_notify};
6use crate::utils::time::Time;
7use core::prelude::rust_2024::*;
8use core::result::Result;
9use futures::channel::oneshot::Sender;
10use itertools::Itertools;
11use log::{debug, error, info, warn};
12use core::cmp::PartialEq;
13use crate::collections::{HashMap, HashSet};
14use core::fmt::{Debug, Display, Formatter};
15use crate::stdlib::sync::{Arc};
16use crate::std_sync::Mutex;
17use core::time::Duration;
18#[cfg(feature = "tokio_runtime")]
19use tokio::task::yield_now;
20use crate::stdlib::vec::Vec;
21use crate::stdlib::vec;
22use crate::stdlib::string::String;
23use crate::stdlib::string::ToString;
24use super::com_interfaces::com_interface::{
25 self, ComInterfaceError, ComInterfaceState
26};
27use super::com_interfaces::{
28 com_interface::ComInterface, com_interface_socket::ComInterfaceSocket,
29};
30use crate::values::core_values::endpoint::{Endpoint, EndpointInstance};
31use crate::global::dxb_block::{DXBBlock, IncomingSection};
32use crate::network::block_handler::{BlockHandler, BlockHistoryData};
33use crate::network::com_hub_network_tracing::{NetworkTraceHop, NetworkTraceHopDirection, NetworkTraceHopSocket};
34use crate::network::com_interfaces::com_interface::ComInterfaceUUID;
35use crate::network::com_interfaces::com_interface_properties::{
36 InterfaceDirection, ReconnectionConfig,
37};
38use crate::network::com_interfaces::com_interface_socket::ComInterfaceSocketUUID;
39use crate::network::com_interfaces::default_com_interfaces::local_loopback_interface::LocalLoopbackInterface;
40use crate::runtime::AsyncContext;
41use crate::values::value_container::ValueContainer;
42
43#[derive(Debug, Clone)]
44pub struct DynamicEndpointProperties {
45 pub known_since: u64,
46 pub distance: i8,
47 pub is_direct: bool,
48 pub channel_factor: u32,
49 pub direction: InterfaceDirection,
50}
51
52pub type ComInterfaceFactoryFn =
53 fn(
54 setup_data: ValueContainer,
55 ) -> Result<Rc<RefCell<dyn ComInterface>>, ComInterfaceError>;
56
57#[derive(Debug)]
58pub struct ComHubOptions {
59 default_receive_timeout: Duration,
60}
61
62impl Default for ComHubOptions {
63 fn default() -> Self {
64 ComHubOptions {
65 default_receive_timeout: Duration::from_secs(5),
66 }
67 }
68}
69
70type SocketMap = HashMap<
71 ComInterfaceSocketUUID,
72 (Arc<Mutex<ComInterfaceSocket>>, HashSet<Endpoint>),
73>;
74type InterfaceMap = HashMap<
75 ComInterfaceUUID,
76 (Rc<RefCell<dyn ComInterface>>, InterfacePriority),
77>;
78
79pub type IncomingBlockInterceptor =
80 Box<dyn Fn(&DXBBlock, &ComInterfaceSocketUUID) + 'static>;
81
82pub type OutgoingBlockInterceptor =
83 Box<dyn Fn(&DXBBlock, &ComInterfaceSocketUUID, &[Endpoint]) + 'static>;
84
85pub struct ComHub {
86 pub endpoint: Endpoint,
88
89 pub async_context: AsyncContext,
90
91 pub options: ComHubOptions,
93
94 pub interface_factories: RefCell<HashMap<String, ComInterfaceFactoryFn>>,
96
97 pub interfaces: RefCell<InterfaceMap>,
99
100 pub sockets: RefCell<SocketMap>,
103
104 pub endpoint_sockets_blacklist:
106 RefCell<HashMap<Endpoint, HashSet<ComInterfaceSocketUUID>>>,
107
108 pub fallback_sockets:
111 RefCell<Vec<(ComInterfaceSocketUUID, u16, InterfaceDirection)>>,
112
113 pub endpoint_sockets: RefCell<
116 HashMap<
117 Endpoint,
118 Vec<(ComInterfaceSocketUUID, DynamicEndpointProperties)>,
119 >,
120 >,
121
122 update_loop_running: RefCell<bool>,
125 update_loop_stop_sender: RefCell<Option<Sender<()>>>,
126
127 pub block_handler: BlockHandler,
128
129 incoming_block_interceptors: RefCell<Vec<IncomingBlockInterceptor>>,
130 outgoing_block_interceptors: RefCell<Vec<OutgoingBlockInterceptor>>,
131}
132
133impl Debug for ComHub {
134 fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
135 f.debug_struct("ComHub")
136 .field("endpoint", &self.endpoint)
137 .field("options", &self.options)
138 .field("sockets", &self.sockets)
139 .field(
140 "endpoint_sockets_blacklist",
141 &self.endpoint_sockets_blacklist,
142 )
143 .field("fallback_sockets", &self.fallback_sockets)
144 .field("endpoint_sockets", &self.endpoint_sockets)
145 .finish()
146 }
147}
148
149#[derive(Debug, Clone, Default)]
150struct EndpointIterateOptions<'a> {
151 pub only_direct: bool,
152 pub exact_instance: bool,
153 pub exclude_sockets: &'a [ComInterfaceSocketUUID],
154}
155
156#[derive(Debug, Clone, Copy, Ord, PartialOrd, Eq, PartialEq)]
157pub enum InterfacePriority {
158 None,
162 Priority(u16),
166}
167
168impl Default for InterfacePriority {
169 fn default() -> Self {
170 InterfacePriority::Priority(0)
171 }
172}
173
174#[derive(Debug, Clone, PartialEq)]
175pub enum ComHubError {
176 InterfaceError(ComInterfaceError),
177 InterfaceOpenFailed,
178 InterfaceCloseFailed,
179 InterfaceAlreadyExists,
180 InterfaceDoesNotExist,
181 InterfaceNotConnected,
182 InterfaceTypeDoesNotExist,
183 InvalidInterfaceDirectionForFallbackInterface,
184 NoResponse,
185 SignatureError,
186}
187
188impl Display for ComHubError {
189 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
190 match self {
191 ComHubError::InterfaceError(_msg) => {
192 core::write!(f, "ComHubError: ComInterfaceError")
193 }
194 ComHubError::InterfaceCloseFailed => {
195 core::write!(f, "ComHubError: Failed to close interface")
196 }
197 ComHubError::InterfaceNotConnected => {
198 core::write!(f, "ComHubError: Interface not connected")
199 }
200 ComHubError::InterfaceDoesNotExist => {
201 core::write!(f, "ComHubError: Interface does not exit")
202 }
203 ComHubError::InterfaceAlreadyExists => {
204 core::write!(f, "ComHubError: Interface already exists")
205 }
206 ComHubError::InterfaceTypeDoesNotExist => {
207 core::write!(f, "ComHubError: Type of interface does not exist")
208 }
209 ComHubError::InvalidInterfaceDirectionForFallbackInterface => {
210 core::write!(
211 f,
212 "ComHubError: Invalid direction for fallback interface"
213 )
214 }
215 ComHubError::NoResponse => {
216 core::write!(f, "ComHubError: No response")
217 }
218 ComHubError::InterfaceOpenFailed => {
219 core::write!(f, "ComHubError: Failed to open interface")
220 }
221 ComHubError::SignatureError => {
222 core::write!(f, "ComHubError: CryptoError")
223 }
224 }
225 }
226}
227
228#[derive(Debug)]
229pub enum SocketEndpointRegistrationError {
230 SocketDisconnected,
231 SocketUninitialized,
232 SocketEndpointAlreadyRegistered,
233}
234
235#[cfg_attr(feature = "embassy_runtime", embassy_executor::task)]
236async fn update_loop_task(self_rc: Rc<ComHub>) {
237 while *self_rc.update_loop_running.borrow() {
238 self_rc.update().await;
239 sleep(Duration::from_millis(1)).await;
240 }
241 if let Some(sender) = self_rc.update_loop_stop_sender.borrow_mut().take() {
242 sender.send(()).expect("Failed to send stop signal");
243 }
244}
245
246#[cfg_attr(feature = "embassy_runtime", embassy_executor::task)]
247async fn reconnect_interface_task(interface_rc: Rc<RefCell<dyn ComInterface>>) {
248 let interface = interface_rc.clone();
249 let mut interface = interface.borrow_mut();
250
251 let config = interface.get_properties_mut();
252 config.close_timestamp = None;
253
254 let current_attempts = config.reconnect_attempts.unwrap_or(0);
255 config.reconnect_attempts = Some(current_attempts + 1);
256
257 let res = interface.handle_open().await;
258 if res {
259 interface.set_state(ComInterfaceState::Connected);
260 } else {
262 interface.set_state(ComInterfaceState::NotConnected);
263 }
264}
265
266impl ComHub {
267 pub fn new(
268 endpoint: impl Into<Endpoint>,
269 async_context: AsyncContext,
270 ) -> ComHub {
271 ComHub {
272 endpoint: endpoint.into(),
273 async_context,
274 options: ComHubOptions::default(),
275 interface_factories: RefCell::new(HashMap::new()),
276 interfaces: RefCell::new(HashMap::new()),
277 endpoint_sockets: RefCell::new(HashMap::new()),
278 block_handler: BlockHandler::new(),
279 sockets: RefCell::new(HashMap::new()),
280 fallback_sockets: RefCell::new(Vec::new()),
281 endpoint_sockets_blacklist: RefCell::new(HashMap::new()),
282 update_loop_running: RefCell::new(false),
283 update_loop_stop_sender: RefCell::new(None),
284 incoming_block_interceptors: RefCell::new(Vec::new()),
285 outgoing_block_interceptors: RefCell::new(Vec::new()),
286 }
287 }
288
289 pub async fn init(&self) -> Result<(), ComHubError> {
290 let local_interface = LocalLoopbackInterface::new();
292 self.open_and_add_interface(
293 Rc::new(RefCell::new(local_interface)),
294 InterfacePriority::None,
295 )
296 .await
297 }
298
299 pub fn register_interface_factory(
302 &self,
303 interface_type: String,
304 factory: ComInterfaceFactoryFn,
305 ) {
306 self.interface_factories
307 .borrow_mut()
308 .insert(interface_type, factory);
309 }
310
311 pub async fn create_interface(
315 &self,
316 interface_type: &str,
317 setup_data: ValueContainer,
318 priority: InterfacePriority,
319 ) -> Result<Rc<RefCell<dyn ComInterface>>, ComHubError> {
320 info!("creating interface {interface_type}");
321 let interface_factories = self.interface_factories.borrow();
322 if let Some(factory) = interface_factories.get(interface_type) {
323 let interface =
324 factory(setup_data).map_err(ComHubError::InterfaceError)?;
325 drop(interface_factories);
326
327 self.open_and_add_interface(interface.clone(), priority)
328 .await
329 .map(|_| interface)
330 } else {
331 Err(ComHubError::InterfaceTypeDoesNotExist)
332 }
333 }
334
335 fn try_downcast<T: 'static>(
336 input: Rc<RefCell<dyn ComInterface>>,
337 ) -> Option<Rc<RefCell<T>>> {
338 if input.borrow().as_any().is::<T>() {
340 let ptr = Rc::into_raw(input) as *const RefCell<T>;
342 unsafe { Some(Rc::from_raw(ptr)) }
343 } else {
344 None
345 }
346 }
347
348 pub fn register_incoming_block_interceptor<F>(&self, interceptor: F)
350 where
351 F: Fn(&DXBBlock, &ComInterfaceSocketUUID) + 'static,
352 {
353 self.incoming_block_interceptors
354 .borrow_mut()
355 .push(Box::new(interceptor));
356 }
357
358 pub fn register_outgoing_block_interceptor<F>(&self, interceptor: F)
360 where
361 F: Fn(&DXBBlock, &ComInterfaceSocketUUID, &[Endpoint]) + 'static,
362 {
363 self.outgoing_block_interceptors
364 .borrow_mut()
365 .push(Box::new(interceptor));
366 }
367
368 pub fn get_interface_by_uuid<T: ComInterface>(
369 &self,
370 interface_uuid: &ComInterfaceUUID,
371 ) -> Option<Rc<RefCell<T>>> {
372 ComHub::try_downcast(
373 self.interfaces.borrow().get(interface_uuid)?.0.clone(),
374 )
375 }
376
377 pub fn has_interface(&self, interface_uuid: &ComInterfaceUUID) -> bool {
378 self.interfaces.borrow().contains_key(interface_uuid)
379 }
380
381 pub fn get_dyn_interface_by_uuid(
382 &self,
383 uuid: &ComInterfaceUUID,
384 ) -> Option<Rc<RefCell<dyn ComInterface>>> {
385 self.interfaces
386 .borrow()
387 .get(uuid)
388 .map(|(interface, _)| interface.clone())
389 }
390
391 pub async fn open_and_add_interface(
392 &self,
393 interface: Rc<RefCell<dyn ComInterface>>,
394 priority: InterfacePriority,
395 ) -> Result<(), ComHubError> {
396 if interface.borrow().get_state() != ComInterfaceState::Connected {
397 if !(interface.borrow_mut().handle_open().await) {
401 return Err(ComHubError::InterfaceOpenFailed);
402 }
403 }
404 self.add_interface(interface.clone(), priority)
405 }
406
407 pub fn add_interface(
408 &self,
409 interface: Rc<RefCell<dyn ComInterface>>,
410 priority: InterfacePriority,
411 ) -> Result<(), ComHubError> {
412 let uuid = interface.borrow().get_uuid().clone();
413 let mut interfaces = self.interfaces.borrow_mut();
414 if interfaces.contains_key(&uuid) {
415 return Err(ComHubError::InterfaceAlreadyExists);
416 }
417
418 if priority != InterfacePriority::None
420 && interface.borrow_mut().get_properties().direction
421 == InterfaceDirection::In
422 {
423 return Err(
424 ComHubError::InvalidInterfaceDirectionForFallbackInterface,
425 );
426 }
427
428 interfaces.insert(uuid, (interface, priority));
429 Ok(())
430 }
431
432 pub async fn remove_interface(
435 &self,
436 interface_uuid: ComInterfaceUUID,
437 ) -> Result<(), ComHubError> {
438 info!("Removing interface {interface_uuid}");
439 let interface = self
440 .interfaces
441 .borrow_mut()
442 .get_mut(&interface_uuid.clone())
443 .ok_or(ComHubError::InterfaceDoesNotExist)?
444 .0
445 .clone();
446 {
447 let mut interface = interface.borrow_mut();
450 interface.handle_destroy().await;
451 }
452
453 self.update_sockets().await;
455
456 self.cleanup_interface(interface_uuid)
457 .ok_or(ComHubError::InterfaceDoesNotExist)?;
458
459 Ok(())
460 }
461
462 fn cleanup_interface(
465 &self,
466 interface_uuid: ComInterfaceUUID,
467 ) -> Option<Rc<RefCell<dyn ComInterface>>> {
468 let interface = self
469 .interfaces
470 .borrow_mut()
471 .remove(&interface_uuid)
472 .or(None)?
473 .0;
474 Some(interface)
475 }
476
477 pub(crate) async fn receive_block(
478 &self,
479 block: &DXBBlock,
480 socket_uuid: ComInterfaceSocketUUID,
481 ) {
482 info!("{} received block: {}", self.endpoint, block);
483
484 match self.validate_block(block).await {
486 Ok(true) => { }
487 Ok(false) => {
488 warn!("Block validation failed. Dropping block...");
489 return;
490 }
491 Err(e) => {
492 warn!("Error in block validation {e}. Dropping block...");
493 return;
494 }
495 }
496
497 for interceptor in self.incoming_block_interceptors.borrow().iter() {
498 interceptor(block, &socket_uuid);
499 }
500
501 let block_type = block.block_header.flags_and_timestamp.block_type();
502
503 let is_new_block = !self.block_handler.is_block_in_history(block);
505 if is_new_block && block.routing_header.sender != self.endpoint {
508 self.register_socket_endpoint_from_incoming_block(
509 socket_uuid.clone(),
510 block,
511 );
512 }
513
514 let receivers = block.receiver_endpoints();
515 if !receivers.is_empty() {
516 let is_for_own = receivers.iter().any(|e| {
517 e == &self.endpoint
518 || e == &Endpoint::ANY
519 || e == &Endpoint::ANY_ALL_INSTANCES
520 });
521
522 if is_for_own && block_type != BlockType::Hello {
524 info!("Block is for this endpoint");
525
526 match block_type {
527 BlockType::Trace => {
528 self.handle_trace_block(block, socket_uuid.clone())
529 .await;
530 }
531 BlockType::TraceBack => {
532 self.handle_trace_back_block(
533 block,
534 socket_uuid.clone(),
535 );
536 }
537 _ => {
538 self.block_handler.handle_incoming_block(block.clone());
539 }
540 };
541 }
542
543 let should_relay =
545 !(
547 is_for_own && block_type == BlockType::Hello
548 );
549
550 if should_relay {
552 let remaining_receivers = if is_for_own {
554 &self.get_remote_receivers(&receivers)
555 } else {
556 &receivers
557 };
558
559 if !remaining_receivers.is_empty() {
561 match block_type {
562 BlockType::Trace | BlockType::TraceBack => {
563 self.redirect_trace_block(
564 block.clone_with_new_receivers(
565 remaining_receivers,
566 ),
567 socket_uuid.clone(),
568 is_for_own,
569 );
570 }
571 _ => {
572 self.redirect_block(
573 block.clone_with_new_receivers(
574 remaining_receivers,
575 ),
576 socket_uuid.clone(),
577 is_for_own,
578 );
579 }
580 }
581 }
582 }
583 }
584
585 if is_new_block {
587 self.block_handler
588 .add_block_to_history(block, Some(socket_uuid));
589 }
590 }
591
592 fn get_remote_receivers(
595 &self,
596 receiver_endpoints: &[Endpoint],
597 ) -> Vec<Endpoint> {
598 receiver_endpoints
599 .iter()
600 .filter(|e| e != &&self.endpoint)
601 .cloned()
602 .collect::<Vec<_>>()
603 }
604
605 fn register_socket_endpoint_from_incoming_block(
608 &self,
609 socket_uuid: ComInterfaceSocketUUID,
610 block: &DXBBlock,
611 ) {
612 let socket = self.get_socket_by_uuid(&socket_uuid);
613 let mut socket_ref = socket.try_lock().unwrap();
614
615 let distance = block.routing_header.distance;
616 let sender = block.routing_header.sender.clone();
617
618 if socket_ref.direct_endpoint.is_none() && distance == 1 {
620 info!(
621 "Setting direct endpoint for socket {}: {}",
622 socket_ref.uuid, sender
623 );
624 socket_ref.direct_endpoint = Some(sender.clone());
625 }
626
627 drop(socket_ref);
628 match self.register_socket_endpoint(
629 socket.clone(),
630 sender.clone(),
631 distance,
632 ) {
633 Err(SocketEndpointRegistrationError::SocketEndpointAlreadyRegistered) => {
634 debug!(
635 "Socket already registered for endpoint {sender}",
636 );
637 }
638 Err(error) => {
639 core::panic!("Failed to register socket endpoint {sender}: {error:?}");
640 },
641 Ok(_) => { }
642 }
643 }
644
645 pub(crate) fn redirect_block(
648 &self,
649 mut block: DXBBlock,
650 incoming_socket: ComInterfaceSocketUUID,
651 forked: bool,
653 ) {
654 let receivers = block.receiver_endpoints();
655
656 let history_block_data =
659 self.block_handler.get_block_data_from_history(&block);
660 if history_block_data.is_some() {
661 for receiver in &receivers {
662 if receiver != &self.endpoint {
663 info!(
664 "{}: Adding socket {} to blacklist for receiver {}",
665 self.endpoint, incoming_socket, receiver
666 );
667 self.endpoint_sockets_blacklist
668 .borrow_mut()
669 .entry(receiver.clone())
670 .or_default()
671 .insert(incoming_socket.clone());
672 }
673 }
674 }
675
676 block.routing_header.distance += 1;
678
679 if block.routing_header.ttl > 1 {
682 block.routing_header.ttl -= 1;
683 }
684 else if block.routing_header.ttl == 1 {
686 block.routing_header.ttl -= 1;
687 warn!("Block TTL expired. Dropping block...");
688 return;
689 } else {
691 warn!("Block TTL expired. Dropping block...");
692 return;
693 }
694
695 let mut prefer_incoming_socket_for_bounce_back = false;
696 let res = {
699 if block.routing_header.sender == self.endpoint {
700 prefer_incoming_socket_for_bounce_back =
702 !block.is_bounce_back();
703 Err(receivers.to_vec())
704 } else {
705 let mut excluded_sockets = vec![incoming_socket.clone()];
706 if let Some(BlockHistoryData {
707 original_socket_uuid: Some(original_socket_uuid),
708 }) = &history_block_data
709 {
710 excluded_sockets.push(original_socket_uuid.clone())
711 }
712 self.send_block(block.clone(), excluded_sockets, forked)
713 }
714 };
715
716 if let Err(unreachable_endpoints) = res {
718 let send_back_socket = if !prefer_incoming_socket_for_bounce_back
722 && let Some(history_block_data) = history_block_data
723 {
724 history_block_data.original_socket_uuid
725 } else {
726 Some(incoming_socket.clone())
727 };
728
729 if let Some(send_back_socket) = send_back_socket {
732 if block.is_bounce_back() && send_back_socket == incoming_socket
734 {
735 warn!(
736 "{}: Tried to send bounce back block back to incoming socket, but this is not allowed",
737 self.endpoint
738 );
739 } else if self
740 .get_socket_by_uuid(&send_back_socket)
741 .try_lock()
742 .unwrap()
743 .can_send()
744 {
745 block.set_bounce_back(true);
746 self.send_block_to_endpoints_via_socket(
747 block,
748 &send_back_socket,
749 &unreachable_endpoints,
750 if forked { Some(0) } else { None },
751 )
752 } else {
753 error!(
754 "Tried to send bounce back block, but cannot send back to incoming socket"
755 )
756 }
757 }
758 else {
762 self.send_block(block, vec![], forked).unwrap_or_else(|_| {
763 error!(
764 "Failed to send out block to {}",
765 unreachable_endpoints
766 .iter()
767 .map(|e| e.to_string())
768 .join(",")
769 );
770 });
771 }
772 }
773 }
774
775 pub async fn validate_block(
778 &self,
779 block: &DXBBlock,
780 ) -> Result<bool, ComHubError> {
781 let is_signed =
784 block.routing_header.flags.signature_type() != SignatureType::None;
785
786 match is_signed {
787 true => {
788 cfg_if::cfg_if! {
792 if #[cfg(feature = "native_crypto")] {
793 use crate::runtime::global_context::get_global_context;
794 match block.routing_header.flags.signature_type() {
795 SignatureType::Encrypted => {
796 let crypto = get_global_context().crypto;
797 let raw_sign = block.signature
798 .as_ref()
799 .ok_or(ComHubError::SignatureError)?;
800 let (enc_sign, pub_key) = raw_sign.split_at(64);
801 let hash = crypto.hkdf_sha256(pub_key, &[0u8; 16])
802 .await
803 .map_err(|_| ComHubError::SignatureError)?;
804 let signature = crypto
805 .aes_ctr_decrypt(&hash, &[0u8; 16], enc_sign)
806 .await
807 .map_err(|_| ComHubError::SignatureError)?;
808
809 let raw_signed = [
810 pub_key,
811 &block.body.clone()
812 ]
813 .concat();
814 let hashed_signed = crypto
815 .hash_sha256(&raw_signed)
816 .await
817 .map_err(|_| ComHubError::SignatureError)?;
818
819 let ver = crypto
820 .ver_ed25519(pub_key, &signature, &hashed_signed)
821 .await
822 .map_err(|_| ComHubError::SignatureError)?;
823 return Ok(ver);
824 },
825 SignatureType::Unencrypted => {
826 let crypto = get_global_context().crypto;
827 let raw_sign = block.signature
828 .as_ref()
829 .ok_or(ComHubError::SignatureError)?;
830 let (signature, pub_key) = raw_sign.split_at(64);
831
832 let raw_signed = [
833 pub_key,
834 &block.body.clone()
835 ]
836 .concat();
837 let hashed_signed = crypto
838 .hash_sha256(&raw_signed)
839 .await
840 .map_err(|_| ComHubError::SignatureError)?;
841
842 let ver = crypto
843 .ver_ed25519(pub_key, signature, &hashed_signed)
844 .await
845 .map_err(|_| ComHubError::SignatureError)?;
846 return Ok(ver);
847 },
848 SignatureType::None => {
849 unreachable!("If (is_signed == true) => !None");
850 }
851 }
852 }
853 else {
854 Ok(true)
855 }
856 }
857 }
858 false => {
859 let endpoint = block.routing_header.sender.clone();
860 let is_trusted = {
861 cfg_if::cfg_if! {
862 if #[cfg(feature = "debug")] {
863 use crate::runtime::global_context::get_global_context;
864 get_global_context().debug_flags.allow_unsigned_blocks
865 }
866 else {
867 false
869 }
870 }
871 };
872 match is_trusted {
873 true => Ok(true),
874 false => {
875 warn!(
876 "Block received by {endpoint} is not signed. Dropping block..."
877 );
878 Ok(false)
879 }
880 }
881 }
882 }
883 }
884
885 pub fn register_socket_endpoint(
890 &self,
891 socket: Arc<Mutex<ComInterfaceSocket>>,
892 endpoint: Endpoint,
893 distance: i8,
894 ) -> Result<(), SocketEndpointRegistrationError> {
895 log::info!(
896 "{} registering endpoint {} for socket {}",
897 self.endpoint,
898 endpoint,
899 socket.try_lock().unwrap().uuid
900 );
901 let socket_ref = socket.try_lock().unwrap();
902
903 let is_direct = socket_ref.direct_endpoint == Some(endpoint.clone());
906
907 if !socket_ref.state.is_open() {
909 return Err(SocketEndpointRegistrationError::SocketDisconnected);
910 }
911
912 if let Some(entries) = self.endpoint_sockets.borrow().get(&endpoint)
914 && entries
915 .iter()
916 .any(|(socket_uuid, _)| socket_uuid == &socket_ref.uuid)
917 {
918 return Err(SocketEndpointRegistrationError::SocketEndpointAlreadyRegistered);
919 }
920
921 let socket_uuid = socket_ref.uuid.clone();
922 let channel_factor = socket_ref.channel_factor;
923 let direction = socket_ref.direction.clone();
924 drop(socket_ref);
925
926 self.add_socket_endpoint(&socket_uuid, endpoint.clone());
928
929 self.add_endpoint_socket(
931 &endpoint,
932 socket_uuid,
933 distance,
934 is_direct,
935 channel_factor,
936 direction,
937 );
938
939 self.sort_sockets(&endpoint);
941
942 Ok(())
943 }
944
945 pub async fn wait_for_update_async(&self) {
949 loop {
950 let mut is_done = true;
951 for interface in self.interfaces.borrow().values() {
952 let interface = interface.0.clone();
953 let interface = interface.borrow_mut();
954 let outgoing_blocks_count =
955 interface.get_info().outgoing_blocks_count.get();
956 if outgoing_blocks_count > 0 {
958 is_done = false;
959 break;
960 }
961 if interface.get_state() == ComInterfaceState::Connecting {
963 is_done = false;
964 break;
965 }
966 }
967 if is_done {
968 break;
969 }
970 sleep(Duration::from_millis(10)).await;
971 }
972 }
973
974 pub async fn update_async(&self) {
978 self.update().await;
979 self.wait_for_update_async().await;
980 }
981
982 fn add_endpoint_socket(
985 &self,
986 endpoint: &Endpoint,
987 socket_uuid: ComInterfaceSocketUUID,
988 distance: i8,
989 is_direct: bool,
990 channel_factor: u32,
991 direction: InterfaceDirection,
992 ) {
993 let mut endpoint_sockets = self.endpoint_sockets.borrow_mut();
994 if !endpoint_sockets.contains_key(endpoint) {
995 endpoint_sockets.insert(endpoint.clone(), Vec::new());
996 }
997
998 let endpoint_sockets = endpoint_sockets.get_mut(endpoint).unwrap();
999 endpoint_sockets.push((
1000 socket_uuid,
1001 DynamicEndpointProperties {
1002 known_since: Time::now(),
1003 distance,
1004 is_direct,
1005 channel_factor,
1006 direction,
1007 },
1008 ));
1009 }
1010
1011 async fn add_socket(
1016 &self,
1017 socket: Arc<Mutex<ComInterfaceSocket>>,
1018 priority: InterfacePriority,
1019 ) -> Result<(), ComHubError> {
1020 let socket_ref = socket.try_lock().unwrap();
1021 let socket_uuid = socket_ref.uuid.clone();
1022 if self.sockets.borrow().contains_key(&socket_ref.uuid) {
1023 core::panic!("Socket {} already exists in ComHub", socket_ref.uuid);
1024 }
1025
1026 if !socket_ref.can_send() && priority != InterfacePriority::None {
1032 core::panic!(
1033 "Socket {} cannot be used for fallback routing, since it has no send capability",
1034 socket_ref.uuid
1035 );
1036 }
1037 let direction = socket_ref.direction.clone();
1038
1039 self.sockets
1040 .borrow_mut()
1041 .insert(socket_ref.uuid.clone(), (socket.clone(), HashSet::new()));
1042
1043 if socket_ref.can_send() {
1045 match priority {
1046 InterfacePriority::None => {
1047 }
1049 InterfacePriority::Priority(priority) => {
1050 self.add_fallback_socket(&socket_uuid, priority, direction);
1052 }
1053 }
1054
1055 let mut block: DXBBlock = DXBBlock::default();
1057 block
1058 .block_header
1059 .flags_and_timestamp
1060 .set_block_type(BlockType::Hello);
1061 block
1062 .routing_header
1063 .flags
1064 .set_signature_type(SignatureType::Unencrypted);
1065 let block = self.prepare_own_block(block).await?;
1068
1069 drop(socket_ref);
1070 self.send_block_to_endpoints_via_socket(
1071 block,
1072 &socket_uuid,
1073 &[Endpoint::ANY],
1074 None,
1075 );
1076 }
1077 Ok(())
1078 }
1079
1080 fn add_fallback_socket(
1084 &self,
1085 socket_uuid: &ComInterfaceSocketUUID,
1086 priority: u16,
1087 direction: InterfaceDirection,
1088 ) {
1089 let mut fallback_sockets = self.fallback_sockets.borrow_mut();
1091 fallback_sockets.push((socket_uuid.clone(), priority, direction));
1092 fallback_sockets.sort_by_key(|(_, priority, direction)| {
1095 let dir_rank = match direction {
1096 InterfaceDirection::InOut => 0,
1097 InterfaceDirection::Out => 1,
1098 InterfaceDirection::In => {
1099 core::panic!("Socket {socket_uuid} is not allowed to be used as fallback socket")
1100 }
1101 };
1102 (dir_rank, core::cmp::Reverse(*priority))
1103 });
1104 }
1105
1106 fn delete_socket(&self, socket_uuid: &ComInterfaceSocketUUID) {
1108 self.sockets.borrow_mut().remove(socket_uuid).or_else(|| {
1109 core::panic!("Socket {socket_uuid} not found in ComHub")
1110 });
1111
1112 self.endpoint_sockets.borrow_mut().retain(|_, sockets| {
1115 sockets.retain(|(uuid, _)| uuid != socket_uuid);
1116 !sockets.is_empty()
1117 });
1118
1119 self.fallback_sockets
1121 .borrow_mut()
1122 .retain(|(uuid, _, _)| uuid != socket_uuid);
1123 }
1124
1125 fn add_socket_endpoint(
1127 &self,
1128 socket_uuid: &ComInterfaceSocketUUID,
1129 endpoint: Endpoint,
1130 ) {
1131 core::assert!(
1132 self.sockets.borrow().contains_key(socket_uuid),
1133 "Socket not found in ComHub"
1134 );
1135 self.sockets
1137 .borrow_mut()
1138 .get_mut(socket_uuid)
1139 .unwrap()
1140 .1
1141 .insert(endpoint.clone());
1142 }
1143
1144 fn sort_sockets(&self, endpoint: &Endpoint) {
1154 let mut endpoint_sockets = self.endpoint_sockets.borrow_mut();
1155 let sockets = endpoint_sockets.get_mut(endpoint).unwrap();
1156
1157 sockets.sort_by(|(_, a), (_, b)| {
1158 b.is_direct
1160 .cmp(&a.is_direct)
1161 .then_with(|| b.channel_factor.cmp(&a.channel_factor))
1162 .then_with(|| b.distance.cmp(&a.distance))
1163 .then_with(
1164 || {
1165 cfg_if::cfg_if! {
1166 if #[cfg(feature = "debug")] {
1167 use crate::runtime::global_context::get_global_context;
1168 use core::cmp::Ordering;
1169 if get_global_context().debug_flags.enable_deterministic_behavior {
1170 Ordering::Equal
1171 }
1172 else {
1173 b.known_since.cmp(&a.known_since)
1174 }
1175 }
1176 else {
1177 b.known_since.cmp(&a.known_since)
1178 }
1179 }
1180 }
1181 )
1182
1183 });
1184 }
1185
1186 pub(crate) fn get_socket_by_uuid(
1190 &self,
1191 socket_uuid: &ComInterfaceSocketUUID,
1192 ) -> Arc<Mutex<ComInterfaceSocket>> {
1193 self.sockets
1194 .borrow()
1195 .get(socket_uuid)
1196 .map(|socket| socket.0.clone())
1197 .unwrap_or_else(|| {
1198 core::panic!("Socket for uuid {socket_uuid} not found")
1199 })
1200 }
1201
1202 pub(crate) fn get_com_interface_by_uuid(
1206 &self,
1207 interface_uuid: &ComInterfaceUUID,
1208 ) -> Rc<RefCell<dyn ComInterface>> {
1209 self.interfaces
1210 .borrow()
1211 .get(interface_uuid)
1212 .unwrap_or_else(|| {
1213 core::panic!("Interface for uuid {interface_uuid} not found")
1214 })
1215 .0
1216 .clone()
1217 }
1218
1219 pub(crate) fn get_com_interface_from_socket_uuid(
1223 &self,
1224 socket_uuid: &ComInterfaceSocketUUID,
1225 ) -> Rc<RefCell<dyn ComInterface>> {
1226 let socket = self.get_socket_by_uuid(socket_uuid);
1227 let socket = socket.try_lock().unwrap();
1228 self.get_com_interface_by_uuid(&socket.interface_uuid)
1229 }
1230
1231 fn iterate_endpoint_sockets<'a>(
1235 &'a self,
1236 endpoint: &'a Endpoint,
1237 options: EndpointIterateOptions<'a>,
1238 ) -> impl Iterator<Item = ComInterfaceSocketUUID> + 'a {
1239 core::iter::from_coroutine(
1240 #[coroutine]
1241 move || {
1242 let endpoint_sockets_borrow = self.endpoint_sockets.borrow();
1243 let endpoint_sockets =
1245 endpoint_sockets_borrow.get(endpoint).cloned();
1246 if endpoint_sockets.is_none() {
1247 return;
1248 }
1249 for (socket_uuid, _) in endpoint_sockets.unwrap() {
1250 {
1251 let socket = self.get_socket_by_uuid(&socket_uuid);
1252 let socket = socket.try_lock().unwrap();
1253
1254 if options.only_direct
1256 && socket.direct_endpoint.is_some()
1257 && socket.direct_endpoint.as_ref().unwrap()
1258 == endpoint
1259 {
1260 debug!(
1261 "No direct socket found for endpoint {endpoint}. Skipping..."
1262 );
1263 continue;
1264 }
1265
1266 if options.exclude_sockets.contains(&socket.uuid) {
1268 debug!(
1269 "Socket {} is excluded for endpoint {}. Skipping...",
1270 socket.uuid, endpoint
1271 );
1272 continue;
1273 }
1274
1275 if !socket.can_send() {
1280 info!(
1281 "Socket {} is not outgoing for endpoint {}. Skipping...",
1282 socket.uuid, endpoint
1283 );
1284 return;
1285 }
1286 }
1287
1288 debug!(
1289 "Found matching socket {socket_uuid} for endpoint {endpoint}"
1290 );
1291 yield socket_uuid.clone()
1292 }
1293 },
1294 )
1295 }
1296
1297 fn find_known_endpoint_socket(
1299 &self,
1300 endpoint: &Endpoint,
1301 exclude_socket: &[ComInterfaceSocketUUID],
1302 ) -> Option<ComInterfaceSocketUUID> {
1303 match endpoint.instance {
1304 EndpointInstance::Any => {
1306 let options = EndpointIterateOptions {
1307 only_direct: false,
1308 exact_instance: false,
1309 exclude_sockets: exclude_socket,
1310 };
1311 if let Some(socket) =
1312 self.iterate_endpoint_sockets(endpoint, options).next()
1313 {
1314 return Some(socket);
1315 }
1316 None
1317 }
1318
1319 EndpointInstance::Instance(_) => {
1321 let options = EndpointIterateOptions {
1323 only_direct: false,
1324 exact_instance: true,
1325 exclude_sockets: exclude_socket,
1326 };
1327 if let Some(socket) =
1328 self.iterate_endpoint_sockets(endpoint, options).next()
1329 {
1330 return Some(socket);
1331 }
1332 None
1333 }
1334 EndpointInstance::All => {
1336 core::todo!("#186 Undescribed by author.")
1337 }
1338 }
1339 }
1340 fn find_best_endpoint_socket(
1344 &self,
1345 endpoint: &Endpoint,
1346 exclude_sockets: &[ComInterfaceSocketUUID],
1347 ) -> Option<ComInterfaceSocketUUID> {
1348 if endpoint == &self.endpoint
1351 && let Some(socket) = self
1352 .find_known_endpoint_socket(&Endpoint::LOCAL, exclude_sockets)
1353 {
1354 return Some(socket);
1355 }
1356
1357 let matching_socket =
1359 self.find_known_endpoint_socket(endpoint, exclude_sockets);
1360
1361 if matching_socket.is_some() {
1363 matching_socket
1364 }
1365 else {
1367 let sockets = self.fallback_sockets.borrow();
1368 for (socket_uuid, _, _) in sockets.iter() {
1369 let socket = self.get_socket_by_uuid(socket_uuid);
1370 info!(
1371 "{}: Find best for {}: {} ({}); excluded:{}",
1372 self.endpoint,
1373 endpoint,
1374 socket_uuid,
1375 socket
1376 .try_lock()
1377 .unwrap()
1378 .direct_endpoint
1379 .clone()
1380 .map(|e| e.to_string())
1381 .unwrap_or("None".to_string()),
1382 exclude_sockets.contains(socket_uuid)
1383 );
1384 if !exclude_sockets.contains(socket_uuid) {
1385 return Some(socket_uuid.clone());
1386 }
1387 }
1388 None
1389 }
1390 }
1391
1392 fn get_outbound_receiver_groups(
1395 &self,
1396 block: &DXBBlock,
1399 mut exclude_sockets: Vec<ComInterfaceSocketUUID>,
1400 ) -> Option<Vec<(Option<ComInterfaceSocketUUID>, Vec<Endpoint>)>> {
1401 let receivers = block.receiver_endpoints();
1402
1403 if !receivers.is_empty() {
1404 let endpoint_sockets = receivers
1405 .iter()
1406 .map(|e| {
1407 if let Some(blacklist) =
1409 self.endpoint_sockets_blacklist.borrow().get(e)
1410 {
1411 exclude_sockets.extend(blacklist.iter().cloned());
1412 }
1413 let socket =
1414 self.find_best_endpoint_socket(e, &exclude_sockets);
1415 (socket, e)
1416 })
1417 .group_by(|(socket, _)| socket.clone())
1418 .into_iter()
1419 .map(|(socket, group)| {
1420 let endpoints = group
1421 .map(|(_, endpoint)| endpoint.clone())
1422 .collect::<Vec<_>>();
1423 (socket, endpoints)
1424 })
1425 .collect::<Vec<_>>();
1426 Some(endpoint_sockets)
1427 } else {
1428 None
1429 }
1430 }
1431
1432 pub fn _start_update_loop(self_rc: Rc<Self>) {
1438 if *self_rc.update_loop_running.borrow() {
1440 return;
1441 }
1442
1443 *self_rc.update_loop_running.borrow_mut() = true;
1445
1446 spawn_with_panic_notify(
1447 &self_rc.clone().async_context,
1448 update_loop_task(self_rc),
1449 );
1450 }
1451
1452 pub async fn update(&self) {
1457 self.update_interfaces();
1459
1460 self.update_sockets().await;
1462
1463 self.collect_incoming_data().await;
1465
1466 self.receive_incoming_blocks().await;
1468
1469 self.flush_outgoing_blocks();
1471 }
1472
1473 pub async fn prepare_own_block(
1477 &self,
1478 mut block: DXBBlock,
1479 ) -> Result<DXBBlock, ComHubError> {
1480 cfg_if::cfg_if! {
1482 if #[cfg(feature = "native_crypto")] {
1483 use crate::runtime::global_context::get_global_context;
1484 match block.routing_header.flags.signature_type() {
1485 SignatureType::Encrypted => {
1486 let crypto = get_global_context().crypto;
1487 let (pub_key, pri_key) = crypto.gen_ed25519()
1488 .await
1489 .map_err(|_| ComHubError::SignatureError)?;
1490
1491 let raw_signed = [
1492 pub_key.clone(),
1493 block.body.clone()
1494 ]
1495 .concat();
1496 let hashed_signed = crypto
1497 .hash_sha256(&raw_signed)
1498 .await
1499 .map_err(|_| ComHubError::SignatureError)?;
1500
1501 let signature = crypto
1502 .sig_ed25519(&pri_key, &hashed_signed)
1503 .await
1504 .map_err(|_| ComHubError::SignatureError)?;
1505 let hash = crypto
1506 .hkdf_sha256(&pub_key, &[0u8; 16])
1507 .await
1508 .map_err(|_| ComHubError::SignatureError)?;
1509 let enc_sig = crypto
1510 .aes_ctr_encrypt(&hash, &[0u8; 16], &signature)
1511 .await
1512 .map_err(|_| ComHubError::SignatureError)?;
1513 block.signature =
1515 Some([enc_sig.to_vec(), pub_key].concat());
1516 }
1517 SignatureType::Unencrypted => {
1518 let crypto = get_global_context().crypto;
1519 let (pub_key, pri_key) = crypto.gen_ed25519()
1520 .await
1521 .map_err(|_| ComHubError::SignatureError)?;
1522
1523 let raw_signed = [
1524 pub_key.clone(),
1525 block.body.clone()
1526 ]
1527 .concat();
1528 let hashed_signed = crypto
1529 .hash_sha256(&raw_signed)
1530 .await
1531 .map_err(|_| ComHubError::SignatureError)?;
1532
1533 let signature = crypto
1534 .sig_ed25519(&pri_key, &hashed_signed)
1535 .await
1536 .map_err(|_| ComHubError::SignatureError)?;
1537 block.signature =
1539 Some([signature.to_vec(), pub_key].concat());
1540 }
1541 SignatureType::None => {
1542 }
1544 }
1545 }
1546 }
1547
1548 let now = Time::now();
1549 block.routing_header.sender = self.endpoint.clone();
1550 block
1551 .block_header
1552 .flags_and_timestamp
1553 .set_creation_timestamp(now);
1554
1555 block.routing_header.distance = 1;
1557 Ok(block)
1558 }
1559
1560 pub async fn send_own_block(
1562 &self,
1563 mut block: DXBBlock,
1564 ) -> Result<(), Vec<Endpoint>> {
1565 block = self.prepare_own_block(block).await.map_err(|_| vec![])?;
1566 self.block_handler.add_block_to_history(&block, None);
1568 self.send_block(block, vec![], false)
1569 }
1570
1571 pub async fn send_own_block_await_response(
1576 &self,
1577 block: DXBBlock,
1578 options: ResponseOptions,
1579 ) -> Vec<Result<Response, ResponseError>> {
1580 let context_id = block.block_header.context_id;
1581 let section_index = block.block_header.section_index;
1582
1583 let has_exact_receiver_count = block.has_exact_receiver_count();
1584 let receivers = block.receiver_endpoints();
1585
1586 let res = self.send_own_block(block).await;
1587 let failed_endpoints = res.err().unwrap_or_default();
1588
1589 #[cfg(feature = "tokio_runtime")]
1591 yield_now().await;
1592
1593 let timeout = options
1594 .timeout
1595 .unwrap_or_default(self.options.default_receive_timeout);
1596
1597 if has_exact_receiver_count {
1599 if (options.resolution_strategy
1601 == ResponseResolutionStrategy::ReturnOnAnyError
1602 || options.resolution_strategy
1603 == ResponseResolutionStrategy::ReturnOnFirstResult)
1604 && !failed_endpoints.is_empty()
1605 {
1606 return receivers
1608 .iter()
1609 .map(|receiver| {
1610 if failed_endpoints.contains(receiver) {
1611 Err(ResponseError::NotReachable(receiver.clone()))
1612 } else {
1613 Err(ResponseError::EarlyAbort(receiver.clone()))
1614 }
1615 })
1616 .collect::<Vec<_>>();
1617 }
1618
1619 let mut responses = HashMap::new();
1621 let mut missing_response_count = receivers.len();
1622 for receiver in &receivers {
1623 responses.insert(
1624 receiver.clone(),
1625 if failed_endpoints.contains(receiver) {
1626 Err(ResponseError::NotReachable(receiver.clone()))
1627 } else {
1628 Err(ResponseError::NoResponseAfterTimeout(
1629 receiver.clone(),
1630 timeout,
1631 ))
1632 },
1633 );
1634 }
1635 missing_response_count -= failed_endpoints.len();
1637
1638 info!(
1639 "Waiting for responses from receivers {}",
1640 receivers
1641 .iter()
1642 .map(|e| e.to_string())
1643 .collect::<Vec<_>>()
1644 .join(",")
1645 );
1646
1647 let mut rx = self
1648 .block_handler
1649 .register_incoming_block_observer(context_id, section_index);
1650
1651 let res = task::timeout(timeout, async {
1652 while let Some(section) = rx.next().await {
1653 let mut received_response = false;
1654 let mut sender = section.get_sender();
1656 if let Some(response) = responses.get_mut(&sender) {
1658 if response.is_err() {
1660 *response = Ok(Response::ExactResponse(sender.clone(), section));
1661 missing_response_count -= 1;
1662 info!("Received expected response from {sender}");
1663 received_response = true;
1664 }
1665 else {
1667 error!("Received multiple responses from the same sender: {sender}");
1668 }
1669 }
1670 else if let Some(response) = responses.get_mut(&sender.any_instance_endpoint()) {
1672 info!("Received resolved response from {} -> {}", &sender, &sender.any_instance_endpoint());
1673 sender = sender.any_instance_endpoint();
1674 if response.is_err() {
1676 *response = Ok(Response::ResolvedResponse(sender.clone(), section));
1677 missing_response_count -= 1;
1678 received_response = true;
1679 }
1680 else {
1682 info!("Received multiple resolved responses from the {}", &sender);
1683 }
1684 }
1685 else {
1687 error!("Received response from unexpected sender: {}", &sender);
1688 }
1689
1690 if received_response && options.resolution_strategy == ResponseResolutionStrategy::ReturnOnFirstResult {
1692 for (receiver, response) in responses.iter_mut() {
1694 if receiver != &sender {
1695 *response = Err(ResponseError::EarlyAbort(receiver.clone()));
1696 }
1697 }
1698 break;
1699 }
1700
1701 if missing_response_count == 0 {
1703 break;
1704 }
1705 }
1706 }).await;
1707
1708 if res.is_err() {
1709 error!("Timeout waiting for responses");
1710 }
1711
1712 responses.into_values().collect::<Vec<_>>()
1714 }
1715 else {
1717 let mut responses = vec![];
1718
1719 let res = task::timeout(timeout, async {
1720 let mut rx =
1721 self.block_handler.register_incoming_block_observer(
1722 context_id,
1723 section_index,
1724 );
1725 while let Some(section) = rx.next().await {
1726 let sender = section.get_sender();
1728 info!("Received response from {sender}");
1729 responses.push(Ok(Response::UnspecifiedResponse(section)));
1731
1732 if options.resolution_strategy
1734 == ResponseResolutionStrategy::ReturnOnFirstResult
1735 {
1736 break;
1737 }
1738 }
1739 })
1740 .await;
1741
1742 if res.is_err() {
1743 info!("Timeout waiting for responses");
1744 }
1745
1746 responses
1747 }
1748 }
1749
1750 pub fn send_block(
1757 &self,
1758 mut block: DXBBlock,
1759 exclude_sockets: Vec<ComInterfaceSocketUUID>,
1760 forked: bool,
1761 ) -> Result<(), Vec<Endpoint>> {
1762 let outbound_receiver_groups =
1763 self.get_outbound_receiver_groups(&block, exclude_sockets);
1764
1765 if outbound_receiver_groups.is_none() {
1766 error!("No outbound receiver groups found for block");
1767 return Err(vec![]);
1768 }
1769
1770 let outbound_receiver_groups = outbound_receiver_groups.unwrap();
1771
1772 let mut unreachable_endpoints = vec![];
1773
1774 let mut fork_count = if forked || outbound_receiver_groups.len() > 1 {
1779 Some(0)
1780 } else {
1781 None
1782 };
1783
1784 block.set_bounce_back(false);
1785
1786 for (receiver_socket, endpoints) in outbound_receiver_groups {
1787 if let Some(socket_uuid) = receiver_socket {
1788 self.send_block_to_endpoints_via_socket(
1789 block.clone(),
1790 &socket_uuid,
1791 &endpoints,
1792 fork_count,
1793 );
1794 } else {
1795 error!(
1796 "{}: cannot send block, no receiver sockets found for endpoints {:?}",
1797 self.endpoint,
1798 endpoints.iter().map(|e| e.to_string()).collect::<Vec<_>>()
1799 );
1800 unreachable_endpoints.extend(endpoints);
1801 }
1802 if let Some(count) = fork_count {
1804 fork_count = Some(count + 1);
1805 }
1806 }
1807
1808 if !unreachable_endpoints.is_empty() {
1809 return Err(unreachable_endpoints);
1810 }
1811 Ok(())
1812 }
1813
1814 fn send_block_to_endpoints_via_socket(
1817 &self,
1818 mut block: DXBBlock,
1819 socket_uuid: &ComInterfaceSocketUUID,
1820 endpoints: &[Endpoint],
1821 fork_count: Option<usize>,
1823 ) {
1824 block.set_receivers(endpoints);
1825
1826 if block.is_bounce_back() {
1829 block.routing_header.distance -= 2;
1830 }
1831
1832 match block.block_header.flags_and_timestamp.block_type() {
1834 BlockType::Trace | BlockType::TraceBack => {
1835 let distance = block.routing_header.distance;
1836 let new_fork_nr = self.calculate_fork_nr(&block, fork_count);
1837 let bounce_back = block.is_bounce_back();
1838
1839 self.add_hop_to_block_trace_data(
1840 &mut block,
1841 NetworkTraceHop {
1842 endpoint: self.endpoint.clone(),
1843 distance,
1844 socket: NetworkTraceHopSocket::new(
1845 self.get_com_interface_from_socket_uuid(
1846 socket_uuid,
1847 )
1848 .borrow_mut()
1849 .get_properties(),
1850 socket_uuid.clone(),
1851 ),
1852 direction: NetworkTraceHopDirection::Outgoing,
1853 fork_nr: new_fork_nr,
1854 bounce_back,
1855 },
1856 );
1857 }
1858 _ => {}
1859 }
1860
1861 let socket = self.get_socket_by_uuid(socket_uuid);
1862 let mut socket_ref = socket.try_lock().unwrap();
1863
1864 let is_broadcast = endpoints
1865 .iter()
1866 .any(|e| e == &Endpoint::ANY_ALL_INSTANCES || e == &Endpoint::ANY);
1867
1868 if is_broadcast
1869 && let Some(direct_endpoint) = &socket_ref.direct_endpoint
1870 && (direct_endpoint == &self.endpoint
1871 || direct_endpoint == &Endpoint::LOCAL)
1872 {
1873 return;
1874 }
1875 for interceptor in self.outgoing_block_interceptors.borrow().iter() {
1876 interceptor(&block, socket_uuid, endpoints);
1877 }
1878 match &block.to_bytes() {
1879 Ok(bytes) => {
1880 info!(
1881 "Sending block to socket {}: {}",
1882 socket_uuid,
1883 endpoints.iter().map(|e| e.to_string()).join(", ")
1884 );
1885
1886 socket_ref.queue_outgoing_block(bytes);
1888 }
1889 Err(err) => {
1890 error!("Failed to convert block to bytes: {err:?}");
1891 }
1892 }
1893 }
1894
1895 fn update_interfaces(&self) {
1898 let mut to_remove = Vec::new();
1899 for (interface, _) in self.interfaces.borrow().values() {
1900 let uuid = interface.borrow().get_uuid().clone();
1901 let state = interface.borrow().get_state();
1902
1903 if state.is_destroyed() {
1907 info!("Destroying interface on the ComHub {uuid}");
1908 to_remove.push(uuid);
1909 } else if state.is_not_connected()
1910 && interface.borrow_mut().get_properties().shall_reconnect()
1911 {
1912 let interface_rc = interface.clone();
1915 let mut interface = interface.borrow_mut();
1916
1917 let already_connecting =
1918 interface.get_state() == ComInterfaceState::Connecting;
1919
1920 if !already_connecting {
1921 let config = interface.get_properties_mut();
1922
1923 let reconnect_now = match &config.reconnection_config {
1924 ReconnectionConfig::InstantReconnect => true,
1925 ReconnectionConfig::ReconnectWithTimeout { timeout } => {
1926 ReconnectionConfig::check_reconnect_timeout(
1927 config.close_timestamp,
1928 timeout,
1929 )
1930 }
1931 ReconnectionConfig::ReconnectWithTimeoutAndAttempts {
1932 timeout,
1933 attempts,
1934 } => {
1935 let max_attempts = attempts;
1936
1937 let attempts = config.reconnect_attempts.unwrap_or(0);
1939 let attempts = attempts + 1;
1940 if attempts > *max_attempts {
1941 to_remove.push(uuid.clone());
1942 return;
1943 }
1944
1945 config.reconnect_attempts = Some(attempts);
1946
1947 ReconnectionConfig::check_reconnect_timeout(
1948 config.close_timestamp,
1949 timeout,
1950 )
1951 }
1952 ReconnectionConfig::NoReconnect => false,
1953 };
1954 if reconnect_now {
1955 debug!("Reconnecting interface {uuid}");
1956 interface.set_state(ComInterfaceState::Connecting);
1957 spawn_with_panic_notify(
1958 &self.async_context,
1959 reconnect_interface_task(interface_rc),
1960 );
1961 } else {
1962 debug!("Not reconnecting interface {uuid}");
1963 }
1964 }
1965 }
1966 }
1967
1968 for uuid in to_remove {
1969 self.cleanup_interface(uuid);
1970 }
1971 }
1972
1973 async fn update_sockets(&self) {
1976 let mut new_sockets = Vec::new();
1977 let mut deleted_sockets = Vec::new();
1978 let mut registered_sockets = Vec::new();
1979
1980 for (interface, priority) in self.interfaces.borrow().values() {
1981 let socket_updates = interface.clone().borrow().get_sockets();
1982 let mut socket_updates = socket_updates.try_lock().unwrap();
1983
1984 registered_sockets
1985 .extend(socket_updates.socket_registrations.drain(..));
1986 new_sockets.extend(
1987 socket_updates.new_sockets.drain(..).map(|s| (s, *priority)),
1988 );
1989 deleted_sockets.extend(socket_updates.deleted_sockets.drain(..));
1990 }
1991
1992 for (socket, priority) in new_sockets {
1993 self.add_socket(socket.clone(), priority).await;
1994 }
1995 for socket_uuid in deleted_sockets {
1996 self.delete_socket(&socket_uuid);
1997 }
1998 for (socket_uuid, distance, endpoint) in registered_sockets {
1999 let socket = self.get_socket_by_uuid(&socket_uuid);
2000 self.register_socket_endpoint(socket, endpoint.clone(), distance)
2001 .unwrap_or_else(|e| {
2002 error!(
2003 "Failed to register socket {socket_uuid} for endpoint {endpoint} {e:?}"
2004 );
2005 });
2006 }
2007 }
2008
2009 async fn collect_incoming_data(&self) {
2012 for (socket, _) in self.sockets.borrow().values() {
2014 let mut socket_ref = socket.try_lock().unwrap();
2015 socket_ref.collect_incoming_data().await;
2016 }
2017 }
2018
2019 async fn receive_incoming_blocks(&self) {
2022 let mut blocks = vec![];
2023 for (socket, _) in self.sockets.borrow().values() {
2025 let mut socket_ref = socket.try_lock().unwrap();
2026 let uuid = socket_ref.uuid.clone();
2027 let block_queue = socket_ref.get_incoming_block_queue();
2028 blocks.push((uuid, block_queue.drain(..).collect::<Vec<_>>()));
2029 }
2030
2031 for (uuid, blocks) in blocks {
2032 for block in blocks.iter() {
2033 self.receive_block(block, uuid.clone()).await;
2034 }
2035 }
2036 }
2037
2038 fn flush_outgoing_blocks(&self) {
2040 let interfaces = self.interfaces.borrow();
2041 for (interface, _) in interfaces.values() {
2042 com_interface::flush_outgoing_blocks(
2043 interface.clone(),
2044 &self.async_context,
2045 );
2046 }
2047 }
2048}
2049
2050#[derive(Default, PartialEq, Debug)]
2051pub enum ResponseResolutionStrategy {
2052 #[default]
2059 ReturnAfterAllSettled,
2060
2061 ReturnOnAnyError,
2069
2070 ReturnOnFirstResponse,
2073
2074 ReturnOnFirstResult,
2077}
2078
2079#[derive(Default, Debug)]
2080pub enum ResponseTimeout {
2081 #[default]
2082 Default,
2083 Custom(Duration),
2084}
2085
2086impl ResponseTimeout {
2087 pub fn unwrap_or_default(self, default: Duration) -> Duration {
2088 match self {
2089 ResponseTimeout::Default => default,
2090 ResponseTimeout::Custom(timeout) => timeout,
2091 }
2092 }
2093}
2094
2095#[derive(Default, Debug)]
2096pub struct ResponseOptions {
2097 pub resolution_strategy: ResponseResolutionStrategy,
2098 pub timeout: ResponseTimeout,
2099}
2100
2101impl ResponseOptions {
2102 pub fn new_with_resolution_strategy(
2103 resolution_strategy: ResponseResolutionStrategy,
2104 ) -> Self {
2105 Self {
2106 resolution_strategy,
2107 ..ResponseOptions::default()
2108 }
2109 }
2110
2111 pub fn new_with_timeout(timeout: Duration) -> Self {
2112 Self {
2113 timeout: ResponseTimeout::Custom(timeout),
2114 ..ResponseOptions::default()
2115 }
2116 }
2117}
2118
2119#[derive(Debug)]
2120pub enum Response {
2121 ExactResponse(Endpoint, IncomingSection),
2122 ResolvedResponse(Endpoint, IncomingSection),
2123 UnspecifiedResponse(IncomingSection),
2124}
2125
2126impl Response {
2127 pub fn take_incoming_section(self) -> IncomingSection {
2128 match self {
2129 Response::ExactResponse(_, section) => section,
2130 Response::ResolvedResponse(_, section) => section,
2131 Response::UnspecifiedResponse(section) => section,
2132 }
2133 }
2134}
2135
2136#[derive(Debug)]
2137pub enum ResponseError {
2138 NoResponseAfterTimeout(Endpoint, Duration),
2139 NotReachable(Endpoint),
2140 EarlyAbort(Endpoint),
2141}
2142
2143impl Display for ResponseError {
2144 fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
2145 match self {
2146 ResponseError::NoResponseAfterTimeout(endpoint, duration) => {
2147 core::write!(
2148 f,
2149 "No response after timeout ({}s) for endpoint {}",
2150 duration.as_secs(),
2151 endpoint
2152 )
2153 }
2154 ResponseError::NotReachable(endpoint) => {
2155 core::write!(f, "Endpoint {endpoint} is not reachable")
2156 }
2157 ResponseError::EarlyAbort(endpoint) => {
2158 core::write!(f, "Early abort for endpoint {endpoint}")
2159 }
2160 }
2161 }
2162}