datex_core/network/
com_hub.rs

1use crate::global::protocol_structures::block_header::BlockType;
2use crate::global::protocol_structures::routing_header::{
3    ReceiverEndpoints, SignatureType,
4};
5use crate::runtime::global_context::get_global_context;
6use crate::stdlib::{cell::RefCell, rc::Rc};
7use crate::task::{self, sleep, spawn_with_panic_notify};
8
9use futures::channel::oneshot::Sender;
10use futures_util::StreamExt;
11use itertools::Itertools;
12use log::{debug, error, info, warn};
13use std::cmp::{Ordering, PartialEq};
14use std::collections::{HashMap, HashSet};
15use std::fmt::{Debug, Display, Formatter};
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18#[cfg(feature = "tokio_runtime")]
19use tokio::task::yield_now;
20// FIXME #175 no-std
21
22use super::com_interfaces::com_interface::{
23    self, ComInterfaceError, ComInterfaceState
24};
25use super::com_interfaces::{
26    com_interface::ComInterface, com_interface_socket::ComInterfaceSocket,
27};
28use crate::values::core_values::endpoint::{Endpoint, EndpointInstance};
29use crate::global::dxb_block::{DXBBlock, IncomingSection};
30use crate::network::block_handler::{BlockHandler, BlockHistoryData};
31use crate::network::com_hub_network_tracing::{NetworkTraceHop, NetworkTraceHopDirection, NetworkTraceHopSocket};
32use crate::network::com_interfaces::com_interface::ComInterfaceUUID;
33use crate::network::com_interfaces::com_interface_properties::{
34    InterfaceDirection, ReconnectionConfig,
35};
36use crate::network::com_interfaces::com_interface_socket::ComInterfaceSocketUUID;
37use crate::network::com_interfaces::default_com_interfaces::local_loopback_interface::LocalLoopbackInterface;
38use crate::values::value_container::ValueContainer;
39
40#[derive(Debug, Clone)]
41pub struct DynamicEndpointProperties {
42    pub known_since: u64,
43    pub distance: i8,
44    pub is_direct: bool,
45    pub channel_factor: u32,
46    pub direction: InterfaceDirection,
47}
48
49pub type ComInterfaceFactoryFn =
50    fn(
51        setup_data: ValueContainer,
52    ) -> Result<Rc<RefCell<dyn ComInterface>>, ComInterfaceError>;
53
54#[derive(Debug)]
55pub struct ComHubOptions {
56    default_receive_timeout: Duration,
57}
58
59impl Default for ComHubOptions {
60    fn default() -> Self {
61        ComHubOptions {
62            default_receive_timeout: Duration::from_secs(5),
63        }
64    }
65}
66
67pub struct ComHub {
68    /// the runtime endpoint of the hub (@me)
69    pub endpoint: Endpoint,
70
71    /// ComHub configuration options
72    pub options: ComHubOptions,
73
74    /// a list of all available interface factories, keyed by their interface type
75    pub interface_factories: RefCell<HashMap<String, ComInterfaceFactoryFn>>,
76
77    /// a list of all available interfaces, keyed by their UUID
78    pub interfaces: RefCell<
79        HashMap<
80            ComInterfaceUUID,
81            (Rc<RefCell<dyn ComInterface>>, InterfacePriority),
82        >,
83    >,
84
85    /// a list of all available sockets, keyed by their UUID
86    /// contains the socket itself and a list of endpoints currently associated with it
87    pub sockets: RefCell<
88        HashMap<
89            ComInterfaceSocketUUID,
90            (Arc<Mutex<ComInterfaceSocket>>, HashSet<Endpoint>),
91        >,
92    >,
93
94    /// a blacklist of sockets that are not allowed to be used for a specific endpoint
95    pub endpoint_sockets_blacklist:
96        RefCell<HashMap<Endpoint, HashSet<ComInterfaceSocketUUID>>>,
97
98    /// fallback sockets that are used if no direct endpoint reachable socket is available
99    /// sorted by priority
100    pub fallback_sockets:
101        RefCell<Vec<(ComInterfaceSocketUUID, u16, InterfaceDirection)>>,
102
103    /// a list of all available sockets for each endpoint, with additional
104    /// DynamicEndpointProperties metadata
105    pub endpoint_sockets: RefCell<
106        HashMap<
107            Endpoint,
108            Vec<(ComInterfaceSocketUUID, DynamicEndpointProperties)>,
109        >,
110    >,
111
112    /// set to true if the update loop should be running
113    /// when set to false, the update loop will stop
114    update_loop_running: RefCell<bool>,
115    update_loop_stop_sender: RefCell<Option<Sender<()>>>,
116
117    pub block_handler: BlockHandler,
118}
119
120impl Debug for ComHub {
121    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
122        f.debug_struct("ComHub")
123            .field("endpoint", &self.endpoint)
124            .field("options", &self.options)
125            .field("sockets", &self.sockets)
126            .field("endpoint_sockets_blacklist", &self.endpoint_sockets_blacklist)
127            .field("fallback_sockets", &self.fallback_sockets)
128            .field("endpoint_sockets", &self.endpoint_sockets)
129            .finish()
130    }
131}
132
133#[derive(Debug, Clone, Default)]
134struct EndpointIterateOptions<'a> {
135    pub only_direct: bool,
136    pub exact_instance: bool,
137    pub exclude_sockets: &'a [ComInterfaceSocketUUID],
138}
139
140impl Default for ComHub {
141    fn default() -> Self {
142        ComHub {
143            endpoint: Endpoint::default(),
144            options: ComHubOptions::default(),
145            interface_factories: RefCell::new(HashMap::new()),
146            interfaces: RefCell::new(HashMap::new()),
147            endpoint_sockets: RefCell::new(HashMap::new()),
148            block_handler: BlockHandler::new(),
149            sockets: RefCell::new(HashMap::new()),
150            fallback_sockets: RefCell::new(Vec::new()),
151            endpoint_sockets_blacklist: RefCell::new(HashMap::new()),
152            update_loop_running: RefCell::new(false),
153            update_loop_stop_sender: RefCell::new(None),
154        }
155    }
156}
157
158#[derive(Debug, Clone, Copy, Ord, PartialOrd, Eq, PartialEq)]
159pub enum InterfacePriority {
160    /// The interface will not be used for fallback routing if no other interface is available
161    /// This is useful for interfaces which cannot communicate with the outside world or are not
162    /// capable of redirecting large amounts of data
163    None,
164    /// The interface will be used for fallback routing if no other interface is available,
165    /// depending on the defined priority
166    /// A higher number means a higher priority
167    Priority(u16),
168}
169
170impl Default for InterfacePriority {
171    fn default() -> Self {
172        InterfacePriority::Priority(0)
173    }
174}
175
176#[derive(Debug, Clone, PartialEq)]
177pub enum ComHubError {
178    InterfaceError(ComInterfaceError),
179    InterfaceCloseFailed,
180    InterfaceNotConnected,
181    InterfaceDoesNotExist,
182    InterfaceAlreadyExists,
183    InterfaceTypeDoesNotExist,
184    InvalidInterfaceDirectionForFallbackInterface,
185    NoResponse,
186    InterfaceOpenError,
187}
188
189#[derive(Debug)]
190pub enum SocketEndpointRegistrationError {
191    SocketDisconnected,
192    SocketUninitialized,
193    SocketEndpointAlreadyRegistered,
194}
195
196impl ComHub {
197    pub fn new(endpoint: impl Into<Endpoint>) -> ComHub {
198        ComHub {
199            endpoint: endpoint.into(),
200            ..ComHub::default()
201        }
202    }
203
204    pub async fn init(&self) -> Result<(), ComHubError> {
205        // add default local loopback interface
206        let local_interface = LocalLoopbackInterface::new();
207        self.open_and_add_interface(
208            Rc::new(RefCell::new(local_interface)),
209            InterfacePriority::None,
210        )
211        .await
212    }
213
214    /// Registers a new interface factory for a specific interface implementation.
215    /// This allows the ComHub to create new instances of the interface on demand.
216    pub fn register_interface_factory(
217        &self,
218        interface_type: String,
219        factory: ComInterfaceFactoryFn,
220    ) {
221        self.interface_factories
222            .borrow_mut()
223            .insert(interface_type, factory);
224    }
225
226    /// Creates a new interface instance using the registered factory
227    /// for the specified interface type if it exists.
228    /// The interface is opened and added to the ComHub.
229    pub async fn create_interface<'a>(
230        &self,
231        interface_type: &str,
232        setup_data: ValueContainer,
233        priority: InterfacePriority,
234    ) -> Result<Rc<RefCell<dyn ComInterface>>, ComHubError> {
235        info!("creating interface {interface_type}");
236        let interface_factories = self.interface_factories.borrow();
237        if let Some(factory) = interface_factories.get(interface_type) {
238            let interface =
239                factory(setup_data).map_err(ComHubError::InterfaceError)?;
240            drop(interface_factories);
241            
242            self
243                .open_and_add_interface(interface.clone(), priority)
244                .await
245                .map(|_| interface)
246        } else {
247            Err(ComHubError::InterfaceTypeDoesNotExist)
248        }
249    }
250
251    fn try_downcast<T: 'static>(
252        input: Rc<RefCell<dyn ComInterface>>,
253    ) -> Option<Rc<RefCell<T>>> {
254        // Try to get a reference to the inner value
255        if input.borrow().as_any().is::<T>() {
256            // SAFETY: We're ensuring T is the correct type via the check
257            let ptr = Rc::into_raw(input) as *const RefCell<T>;
258            unsafe { Some(Rc::from_raw(ptr)) }
259        } else {
260            None
261        }
262    }
263
264    pub fn get_interface_by_uuid<T: ComInterface>(
265        &self,
266        interface_uuid: &ComInterfaceUUID,
267    ) -> Option<Rc<RefCell<T>>> {
268        ComHub::try_downcast(
269            self.interfaces.borrow().get(interface_uuid)?.0.clone(),
270        )
271    }
272
273    pub fn has_interface(&self, interface_uuid: &ComInterfaceUUID) -> bool {
274        self.interfaces.borrow().contains_key(interface_uuid)
275    }
276
277    pub fn get_dyn_interface_by_uuid(
278        &self,
279        uuid: &ComInterfaceUUID,
280    ) -> Option<Rc<RefCell<dyn ComInterface>>> {
281        self.interfaces
282            .borrow()
283            .get(uuid)
284            .map(|(interface, _)| interface.clone())
285    }
286
287    pub async fn open_and_add_interface(
288        &self,
289        interface: Rc<RefCell<dyn ComInterface>>,
290        priority: InterfacePriority,
291    ) -> Result<(), ComHubError> {
292        if interface.borrow().get_state() != ComInterfaceState::Connected {
293            // If interface is not connected, open it
294            // and wait for it to be connected
295            // FIXME #240: borrow_mut across await point
296            if !(interface.borrow_mut().handle_open().await) {
297                return Err(ComHubError::InterfaceOpenError);
298            }
299        }
300        self.add_interface(interface.clone(), priority)
301    }
302
303    pub fn add_interface(
304        &self,
305        interface: Rc<RefCell<dyn ComInterface>>,
306        priority: InterfacePriority,
307    ) -> Result<(), ComHubError> {
308        let uuid = interface.borrow().get_uuid().clone();
309        let mut interfaces = self.interfaces.borrow_mut();
310        if interfaces.contains_key(&uuid) {
311            return Err(ComHubError::InterfaceAlreadyExists);
312        }
313
314        // make sure the interface can send if a priority is set
315        if priority != InterfacePriority::None
316            && interface.borrow_mut().get_properties().direction
317                == InterfaceDirection::In
318        {
319            return Err(
320                ComHubError::InvalidInterfaceDirectionForFallbackInterface,
321            );
322        }
323
324        interfaces.insert(uuid, (interface, priority));
325        Ok(())
326    }
327
328    /// User can proactively remove an interface from the hub.
329    /// This will destroy the interface and it's sockets (perform deep cleanup)
330    pub async fn remove_interface(
331        &self,
332        interface_uuid: ComInterfaceUUID,
333    ) -> Result<(), ComHubError> {
334        info!("Removing interface {interface_uuid}");
335        let interface = self
336            .interfaces
337            .borrow_mut()
338            .get_mut(&interface_uuid.clone())
339            .ok_or(ComHubError::InterfaceDoesNotExist)?
340            .0
341            .clone();
342        {
343            // Async close the interface (stop tasks, server, cleanup internal data)
344            // FIXME #176: borrow_mut should not be used here
345            let mut interface = interface.borrow_mut();
346            interface.handle_destroy().await;
347        }
348
349        // Remove old sockets from ComHub that have been deleted by the interface destroy_sockets()
350        self.update_sockets();
351
352        self.cleanup_interface(interface_uuid)
353            .ok_or(ComHubError::InterfaceDoesNotExist)?;
354
355        Ok(())
356    }
357
358    /// The internal cleanup function that removes the interface from the hub
359    /// and disabled the default interface if it was set to this interface
360    fn cleanup_interface(
361        &self,
362        interface_uuid: ComInterfaceUUID,
363    ) -> Option<Rc<RefCell<dyn ComInterface>>> {
364        let interface = self
365            .interfaces
366            .borrow_mut()
367            .remove(&interface_uuid)
368            .or(None)?
369            .0;
370        Some(interface)
371    }
372
373    pub(crate) fn receive_block(
374        &self,
375        block: &DXBBlock,
376        socket_uuid: ComInterfaceSocketUUID,
377    ) {
378        info!("{} received block: {}", self.endpoint, block);
379
380        // ignore invalid blocks (e.g. invalid signature)
381        if !self.validate_block(block) {
382            warn!("Block validation failed. Dropping block...");
383            return;
384        }
385
386        let block_type = block.block_header.flags_and_timestamp.block_type();
387
388        // register in block history
389        let is_new_block = !self.block_handler.is_block_in_history(block);
390        // assign endpoint to socket if none is assigned
391        // only if a new block and the sender in not the local endpoint
392        if is_new_block && block.routing_header.sender != self.endpoint {
393            self.register_socket_endpoint_from_incoming_block(
394                socket_uuid.clone(),
395                block,
396            );
397        }
398
399        if let Some(receivers) = &block.routing_header.receivers.endpoints {
400            let is_for_own = receivers.endpoints.iter().any(|e| {
401                e == &self.endpoint
402                    || e == &Endpoint::ANY
403                    || e == &Endpoint::ANY_ALL_INSTANCES
404            });
405
406            // handle blocks for own endpoint
407            if is_for_own && block_type != BlockType::Hello {
408                info!("Block is for this endpoint");
409
410                match block_type {
411                    BlockType::Trace => {
412                        self.handle_trace_block(block, socket_uuid.clone());
413                    }
414                    BlockType::TraceBack => {
415                        self.handle_trace_back_block(
416                            block,
417                            socket_uuid.clone(),
418                        );
419                    }
420                    _ => {
421                        self.block_handler.handle_incoming_block(block.clone());
422                    }
423                };
424            }
425
426            // TODO #177: handle this via TTL, not explicitly for Hello blocks
427            let should_relay =
428                // don't relay "Hello" blocks sent to own endpoint
429                !(
430                    is_for_own && block_type == BlockType::Hello
431                );
432
433            // relay the block to other endpoints
434            if should_relay {
435                // get all receivers that the block must be relayed to
436                let remaining_receivers = if is_for_own {
437                    &self.get_remote_receivers(receivers)
438                } else {
439                    &receivers.endpoints
440                };
441
442                // relay the block to all receivers
443                if !remaining_receivers.is_empty() {
444                    match block_type {
445                        BlockType::Trace | BlockType::TraceBack => {
446                            self.redirect_trace_block(
447                                block.clone_with_new_receivers(
448                                    remaining_receivers,
449                                ),
450                                socket_uuid.clone(),
451                                is_for_own,
452                            );
453                        }
454                        _ => {
455                            self.redirect_block(
456                                block.clone_with_new_receivers(
457                                    remaining_receivers,
458                                ),
459                                socket_uuid.clone(),
460                                is_for_own,
461                            );
462                        }
463                    }
464                }
465            }
466        }
467
468        // add to block history
469        if is_new_block {
470            self.block_handler
471                .add_block_to_history(block, Some(socket_uuid));
472        }
473    }
474
475    /// Returns a list of all receivers from a given ReceiverEndpoints
476    /// excluding the local endpoint
477    fn get_remote_receivers(
478        &self,
479        receiver_endpoints: &ReceiverEndpoints,
480    ) -> Vec<Endpoint> {
481        receiver_endpoints
482            .endpoints
483            .iter()
484            .filter(|e| e != &&self.endpoint)
485            .cloned()
486            .collect::<Vec<_>>()
487    }
488
489    /// Registers the socket endpoint from an incoming block
490    /// if the endpoint is not already registered for the socket
491    fn register_socket_endpoint_from_incoming_block(
492        &self,
493        socket_uuid: ComInterfaceSocketUUID,
494        block: &DXBBlock,
495    ) {
496        let socket = self.get_socket_by_uuid(&socket_uuid);
497        let mut socket_ref = socket.lock().unwrap();
498
499        let distance = block.routing_header.distance;
500        let sender = block.routing_header.sender.clone();
501
502        // set as direct endpoint if distance = 0
503        if socket_ref.direct_endpoint.is_none() && distance == 1 {
504            info!(
505                "Setting direct endpoint for socket {}: {}",
506                socket_ref.uuid, sender
507            );
508            socket_ref.direct_endpoint = Some(sender.clone());
509        }
510
511        drop(socket_ref);
512        match self.register_socket_endpoint(
513            socket.clone(),
514            sender.clone(),
515            distance,
516        ) {
517            Err(SocketEndpointRegistrationError::SocketEndpointAlreadyRegistered) => {
518                debug!(
519                    "Socket already registered for endpoint {sender}",
520                );
521            }
522            Err(error) => {
523                panic!("Failed to register socket endpoint {sender}: {error:?}");
524            },
525            Ok(_) => { }
526        }
527    }
528
529    /// Prepares a block and relays it to the given receivers.
530    /// The routing distance is incremented by 1.
531    pub(crate) fn redirect_block(
532        &self,
533        mut block: DXBBlock,
534        incoming_socket: ComInterfaceSocketUUID,
535        // only for debugging traces
536        forked: bool,
537    ) {
538        let receivers = block.get_receivers();
539
540        // check if block has already passed this endpoint (-> bounced back block)
541        // and add to blacklist for all receiver endpoints
542        let history_block_data =
543            self.block_handler.get_block_data_from_history(&block);
544        if history_block_data.is_some() {
545            for receiver in &receivers {
546                if receiver != &self.endpoint {
547                    info!(
548                        "{}: Adding socket {} to blacklist for receiver {}",
549                        self.endpoint, incoming_socket, receiver
550                    );
551                    self.endpoint_sockets_blacklist
552                        .borrow_mut()
553                        .entry(receiver.clone())
554                        .or_default()
555                        .insert(incoming_socket.clone());
556                }
557            }
558        }
559
560        // increment distance for next hop
561        block.routing_header.distance += 1;
562
563        // TODO #178: ensure ttl is >= 1
564        // decrease TTL by 1
565        block.routing_header.ttl -= 1;
566        // if ttl is 0, drop the block
567        if block.routing_header.ttl == 0 {
568            warn!("Block TTL expired. Dropping block...");
569            return;
570        }
571
572        let mut prefer_incoming_socket_for_bounce_back = false;
573        // if we are the original sender of the block, don't send again (prevent loop) and send
574        // bounce back block with all receivers
575        let res = {
576            if block.routing_header.sender == self.endpoint {
577                // if not bounce back block, directly send back to incoming socket (prevent loop)
578                prefer_incoming_socket_for_bounce_back =
579                    !block.is_bounce_back();
580                Err(receivers.to_vec())
581            } else {
582                let mut excluded_sockets = vec![incoming_socket.clone()];
583                if let Some(BlockHistoryData {
584                    original_socket_uuid: Some(original_socket_uuid),
585                }) = &history_block_data
586                {
587                    excluded_sockets.push(original_socket_uuid.clone())
588                }
589                self.send_block(block.clone(), excluded_sockets, forked)
590            }
591        };
592
593        // send block for unreachable endpoints back to the sender
594        if let Err(unreachable_endpoints) = res {
595            // try to send back to original socket
596            // if already in history, get original socket from history
597            // otherwise, directly send back to the incoming socket
598            let send_back_socket = if !prefer_incoming_socket_for_bounce_back
599                && let Some(history_block_data) = history_block_data
600            {
601                history_block_data.original_socket_uuid
602            } else {
603                Some(incoming_socket.clone())
604            };
605
606            // If a send_back_socket is set, the original block is not from this endpoint,
607            // so we can send it back to the original socket
608            if let Some(send_back_socket) = send_back_socket {
609                // never send a bounce back block back again to the incoming socket
610                if block.is_bounce_back() && send_back_socket == incoming_socket
611                {
612                    warn!("{}: Tried to send bounce back block back to incoming socket, but this is not allowed", self.endpoint);
613                } else if self
614                    .get_socket_by_uuid(&send_back_socket)
615                    .lock()
616                    .unwrap()
617                    .can_send()
618                {
619                    block.set_bounce_back(true);
620                    self.send_block_to_endpoints_via_socket(
621                        block,
622                        &send_back_socket,
623                        &unreachable_endpoints,
624                        if forked { Some(0) } else { None },
625                    )
626                } else {
627                    error!("Tried to send bounce back block, but cannot send back to incoming socket")
628                }
629            }
630            // Otherwise, the block originated from this endpoint, we can just call send again
631            // and try to send it via other remaining sockets that are not on the blacklist for the
632            // block receiver
633            else {
634                self.send_block(block, vec![], forked).unwrap_or_else(|_| {
635                    error!(
636                        "Failed to send out block to {}",
637                        unreachable_endpoints
638                            .iter()
639                            .map(|e| e.to_string())
640                            .join(",")
641                    );
642                });
643            }
644        }
645    }
646
647    fn validate_block(&self, block: &DXBBlock) -> bool {
648        // TODO #179 check for creation time, withdraw if too old (TBD) or in the future
649
650        let is_signed =
651            block.routing_header.flags.signature_type() != SignatureType::None;
652
653        match is_signed {
654            true => {
655                // TODO #180: verify signature and abort if invalid
656                // Check if signature is following in some later block and add them to
657                // a pool of incoming blocks awaiting some signature
658                true
659            }
660            false => {
661                let endpoint = block.routing_header.sender.clone();
662                let is_trusted = {
663                    cfg_if::cfg_if! {
664                        if #[cfg(feature = "debug")] {
665                            get_global_context().debug_flags.allow_unsigned_blocks
666                        }
667                        else {
668                            // TODO #181 Check if the sender is trusted (endpoint + interface) connection
669                            false
670                        }
671                    }
672                };
673                match is_trusted {
674                    true => true,
675                    false => {
676                        warn!("Block received by {endpoint} is not signed. Dropping block...");
677                        false
678                    }
679                }
680            }
681        }
682    }
683
684    /// Registers a new endpoint that is reachable over the socket if the socket is not
685    /// already registered, it will be added to the socket list.
686    /// If the provided endpoint is not the same as the socket endpoint, it is registered
687    /// as an indirect socket to the endpoint
688    pub fn register_socket_endpoint(
689        &self,
690        socket: Arc<Mutex<ComInterfaceSocket>>,
691        endpoint: Endpoint,
692        distance: i8,
693    ) -> Result<(), SocketEndpointRegistrationError> {
694        log::info!(
695            "{} registering endpoint {} for socket {}",
696            self.endpoint,
697            endpoint,
698            socket.lock().unwrap().uuid
699        );
700        let socket_ref = socket.lock().unwrap();
701
702        // if the registered endpoint is the same as the socket endpoint,
703        // this is a direct socket to the endpoint
704        let is_direct = socket_ref.direct_endpoint == Some(endpoint.clone());
705
706        // cannot register endpoint if socket is not connected
707        if !socket_ref.state.is_open() {
708            return Err(SocketEndpointRegistrationError::SocketDisconnected);
709        }
710
711        // check if the socket is already registered for the endpoint
712        if let Some(entries) = self.endpoint_sockets.borrow().get(&endpoint)
713            && entries
714                .iter()
715                .any(|(socket_uuid, _)| socket_uuid == &socket_ref.uuid)
716        {
717            return Err(SocketEndpointRegistrationError::SocketEndpointAlreadyRegistered);
718        }
719
720        let socket_uuid = socket_ref.uuid.clone();
721        let channel_factor = socket_ref.channel_factor;
722        let direction = socket_ref.direction.clone();
723        drop(socket_ref);
724
725        // add endpoint to socket endpoint list
726        self.add_socket_endpoint(&socket_uuid, endpoint.clone());
727
728        // add socket to endpoint socket list
729        self.add_endpoint_socket(
730            &endpoint,
731            socket_uuid,
732            distance,
733            is_direct,
734            channel_factor,
735            direction,
736        );
737
738        // resort sockets for endpoint
739        self.sort_sockets(&endpoint);
740
741        Ok(())
742    }
743
744    /// Waits for all background tasks scheduled by the update() function to finish
745    /// This includes block flushes from `flush_outgoing_blocks()`
746    /// and interface (re)-connections from `update_interfaces()`
747    pub async fn wait_for_update_async(&self) {
748        loop {
749            let mut is_done = true;
750            for interface in self.interfaces.borrow().values() {
751                let interface = interface.0.clone();
752                let interface = interface.borrow_mut();
753                let outgoing_blocks_count =
754                    interface.get_info().outgoing_blocks_count.get();
755                // blocks are still sent out on this interface
756                if outgoing_blocks_count > 0 {
757                    is_done = false;
758                    break;
759                }
760                // interface is still in connection task
761                if interface.get_state() == ComInterfaceState::Connecting {
762                    is_done = false;
763                    break;
764                }
765            }
766            if is_done {
767                break;
768            }
769            sleep(Duration::from_millis(10)).await;
770        }
771    }
772
773    /// Updates all sockets and interfaces,
774    /// collecting incoming data and sending out queued blocks.
775    /// In contrast to the update() function, this function is asynchronous
776    /// and will wait for all background tasks scheduled by the update() function to finish
777    pub async fn update_async(&self) {
778        self.update();
779        self.wait_for_update_async().await;
780    }
781
782    /// Adds a socket to the socket list for a specific endpoint,
783    /// attaching metadata as DynamicEndpointProperties
784    fn add_endpoint_socket(
785        &self,
786        endpoint: &Endpoint,
787        socket_uuid: ComInterfaceSocketUUID,
788        distance: i8,
789        is_direct: bool,
790        channel_factor: u32,
791        direction: InterfaceDirection,
792    ) {
793        let mut endpoint_sockets = self.endpoint_sockets.borrow_mut();
794        if !endpoint_sockets.contains_key(endpoint) {
795            endpoint_sockets.insert(endpoint.clone(), Vec::new());
796        }
797
798        let endpoint_sockets = endpoint_sockets.get_mut(endpoint).unwrap();
799        endpoint_sockets.push((
800            socket_uuid,
801            DynamicEndpointProperties {
802                known_since: get_global_context().time.lock().unwrap().now(),
803                distance,
804                is_direct,
805                channel_factor,
806                direction,
807            },
808        ));
809    }
810
811    /// Adds a socket to the socket list.
812    /// If the priority is not set to `InterfacePriority::None`, the socket
813    /// is also registered as a fallback socket for outgoing connections with the
814    /// specified priority.
815    fn add_socket(
816        &self,
817        socket: Arc<Mutex<ComInterfaceSocket>>,
818        priority: InterfacePriority,
819    ) {
820        let socket_ref = socket.lock().unwrap();
821        let socket_uuid = socket_ref.uuid.clone();
822        if self.sockets.borrow().contains_key(&socket_ref.uuid) {
823            panic!("Socket {} already exists in ComHub", socket_ref.uuid);
824        }
825
826        // info!(
827        //     "Adding socket {} to ComHub with priority {:?}",
828        //     socket_ref.uuid, priority
829        // );
830
831        if !socket_ref.can_send() && priority != InterfacePriority::None {
832            panic!(
833                "Socket {} cannot be used for fallback routing, since it has no send capability",
834                socket_ref.uuid
835            );
836        }
837        let direction = socket_ref.direction.clone();
838
839        self.sockets
840            .borrow_mut()
841            .insert(socket_ref.uuid.clone(), (socket.clone(), HashSet::new()));
842
843        // add outgoing socket to fallback sockets list if they have a priority flag
844        if socket_ref.can_send() {
845            match priority {
846                InterfacePriority::None => {
847                    // do nothing
848                }
849                InterfacePriority::Priority(priority) => {
850                    // add socket to fallback sockets list
851                    self.add_fallback_socket(&socket_uuid, priority, direction);
852                }
853            }
854
855            // send empty block to socket to say hello
856            let mut block: DXBBlock = DXBBlock::default();
857            block
858                .block_header
859                .flags_and_timestamp
860                .set_block_type(BlockType::Hello);
861            block
862                .routing_header
863                .flags
864                .set_signature_type(SignatureType::Unencrypted);
865            // TODO #182 include fingerprint of the own public key into body
866
867            let block = self.prepare_own_block(block);
868
869            drop(socket_ref);
870            self.send_block_to_endpoints_via_socket(
871                block,
872                &socket_uuid,
873                &[Endpoint::ANY],
874                None,
875            );
876        }
877    }
878
879    /// Registers a socket as a fallback socket for outgoing connections
880    /// that can be used if no known route exists for an endpoint
881    /// Note: only sockets that support sending data should be used as fallback sockets
882    fn add_fallback_socket(
883        &self,
884        socket_uuid: &ComInterfaceSocketUUID,
885        priority: u16,
886        direction: InterfaceDirection,
887    ) {
888        // add to vec
889        let mut fallback_sockets = self.fallback_sockets.borrow_mut();
890        fallback_sockets.push((socket_uuid.clone(), priority, direction));
891        // first sort by direction (InOut before Out - only In is not allowed)
892        // second sort by priority
893        fallback_sockets.sort_by_key(|(_, priority, direction)| {
894            let dir_rank = match direction {
895                InterfaceDirection::InOut => 0,
896                InterfaceDirection::Out => 1,
897                InterfaceDirection::In => {
898                    panic!("Socket {socket_uuid} is not allowed to be used as fallback socket")
899                }
900            };
901            (dir_rank, std::cmp::Reverse(*priority))
902        });
903    }
904
905    /// Removes a socket from the socket list
906    fn delete_socket(&self, socket_uuid: &ComInterfaceSocketUUID) {
907        self.sockets
908            .borrow_mut()
909            .remove(socket_uuid)
910            .or_else(|| panic!("Socket {socket_uuid} not found in ComHub"));
911
912        // remove socket from endpoint socket list
913        // remove endpoint key from endpoint_sockets if not sockets present
914        self.endpoint_sockets.borrow_mut().retain(|_, sockets| {
915            sockets.retain(|(uuid, _)| uuid != socket_uuid);
916            !sockets.is_empty()
917        });
918
919        // remove socket if it is the default socket
920        self.fallback_sockets
921            .borrow_mut()
922            .retain(|(uuid, _, _)| uuid != socket_uuid);
923    }
924
925    /// Adds an endpoint to the endpoint list of a specific socket
926    fn add_socket_endpoint(
927        &self,
928        socket_uuid: &ComInterfaceSocketUUID,
929        endpoint: Endpoint,
930    ) {
931        assert!(
932            self.sockets.borrow().contains_key(socket_uuid),
933            "Socket not found in ComHub"
934        );
935        // add endpoint to socket endpoint list
936        self.sockets
937            .borrow_mut()
938            .get_mut(socket_uuid)
939            .unwrap()
940            .1
941            .insert(endpoint.clone());
942    }
943
944    /// Sorts the sockets for an endpoint:
945    /// - socket with send capability first
946    /// - then direct sockets
947    /// - then sort by channel channel_factor (latency, bandwidth)
948    /// - then sort by socket connect_timestamp
949    /// When the global debug flag `enable_deterministic_behavior` is set,
950    /// Sockets are not sorted by their connect_timestamp to make sure that the order of
951    /// received blocks has no effect on the routing priorities
952    fn sort_sockets(&self, endpoint: &Endpoint) {
953        let mut endpoint_sockets = self.endpoint_sockets.borrow_mut();
954        let sockets = endpoint_sockets.get_mut(endpoint).unwrap();
955
956        sockets.sort_by(|(_, a), (_, b)| {
957            // sort by channel_factor
958            b.is_direct
959                .cmp(&a.is_direct)
960                .then_with(|| b.channel_factor.cmp(&a.channel_factor))
961                .then_with(|| b.distance.cmp(&a.distance))
962                .then_with(
963                    || {
964                        cfg_if::cfg_if! {
965                            if #[cfg(feature = "debug")] {
966                                if get_global_context().debug_flags.enable_deterministic_behavior {
967                                    Ordering::Equal
968                                }
969                                else {
970                                    b.known_since.cmp(&a.known_since)
971                                }
972                            }
973                            else {
974                                b.known_since.cmp(&a.known_since)
975                            }
976                        }
977                    }
978                )
979
980        });
981    }
982
983    /// Returns the socket for a given UUID
984    /// The socket must be registered in the ComHub,
985    /// otherwise a panic will be triggered
986    pub(crate) fn get_socket_by_uuid(
987        &self,
988        socket_uuid: &ComInterfaceSocketUUID,
989    ) -> Arc<Mutex<ComInterfaceSocket>> {
990        self.sockets
991            .borrow()
992            .get(socket_uuid)
993            .map(|socket| socket.0.clone())
994            .unwrap_or_else(|| {
995                panic!("Socket for uuid {socket_uuid} not found")
996            })
997    }
998
999    /// Returns the com interface for a given UUID
1000    /// The interface must be registered in the ComHub,
1001    /// otherwise a panic will be triggered
1002    pub(crate) fn get_com_interface_by_uuid(
1003        &self,
1004        interface_uuid: &ComInterfaceUUID,
1005    ) -> Rc<RefCell<dyn ComInterface>> {
1006        self.interfaces
1007            .borrow()
1008            .get(interface_uuid)
1009            .unwrap_or_else(|| {
1010                panic!("Interface for uuid {interface_uuid} not found")
1011            })
1012            .0
1013            .clone()
1014    }
1015
1016    /// Returns the com interface for a given socket UUID
1017    /// The interface and socket must be registered in the ComHub,
1018    /// otherwise a panic will be triggered
1019    pub(crate) fn get_com_interface_from_socket_uuid(
1020        &self,
1021        socket_uuid: &ComInterfaceSocketUUID,
1022    ) -> Rc<RefCell<dyn ComInterface>> {
1023        let socket = self.get_socket_by_uuid(socket_uuid);
1024        let socket = socket.lock().unwrap();
1025        self.get_com_interface_by_uuid(&socket.interface_uuid)
1026    }
1027
1028    /// Returns an iterator over all sockets for a given endpoint
1029    /// The sockets are yielded in the order of their priority, starting with the
1030    /// highest priority socket (the best socket for sending data to the endpoint)
1031    fn iterate_endpoint_sockets<'a>(
1032        &'a self,
1033        endpoint: &'a Endpoint,
1034        options: EndpointIterateOptions<'a>,
1035    ) -> impl Iterator<Item = ComInterfaceSocketUUID> + 'a {
1036        std::iter::from_coroutine(
1037            #[coroutine]
1038            move || {
1039                let endpoint_sockets_borrow = self.endpoint_sockets.borrow();
1040                // TODO #183: can we optimize this to avoid cloning the endpoint_sockets vector?
1041                let endpoint_sockets =
1042                    endpoint_sockets_borrow.get(endpoint).cloned();
1043                if endpoint_sockets.is_none() {
1044                    return;
1045                }
1046                for (socket_uuid, _) in endpoint_sockets.unwrap() {
1047                    {
1048                        let socket = self.get_socket_by_uuid(&socket_uuid);
1049                        let socket = socket.lock().unwrap();
1050
1051                        // check if only_direct is set and the endpoint equals the direct endpoint of the socket
1052                        if options.only_direct
1053                            && socket.direct_endpoint.is_some()
1054                            && socket.direct_endpoint.as_ref().unwrap()
1055                                == endpoint
1056                        {
1057                            debug!(
1058                                "No direct socket found for endpoint {endpoint}. Skipping..."
1059                            );
1060                            continue;
1061                        }
1062
1063                        // check if the socket is excluded if exclude_socket is set
1064                        if options.exclude_sockets.contains(&socket.uuid) {
1065                            debug!(
1066                                    "Socket {} is excluded for endpoint {}. Skipping...",
1067                                    socket.uuid,
1068                                    endpoint
1069                                );
1070                            continue;
1071                        }
1072
1073                        // TODO #184 optimize and separate outgoing/non-outgoing sockets for endpoint
1074                        // only yield outgoing sockets
1075                        // if a non-outgoing socket is found, all following sockets
1076                        // will also be non-outgoing
1077                        if !socket.can_send() {
1078                            info!(
1079                                "Socket {} is not outgoing for endpoint {}. Skipping...",
1080                                socket.uuid,
1081                                endpoint
1082                            );
1083                            return;
1084                        }
1085                    }
1086
1087                    debug!(
1088                        "Found matching socket {socket_uuid} for endpoint {endpoint}"
1089                    );
1090                    yield socket_uuid.clone()
1091                }
1092            },
1093        )
1094    }
1095
1096    /// Finds the best matching socket over which an endpoint is known to be reachable.
1097    fn find_known_endpoint_socket(
1098        &self,
1099        endpoint: &Endpoint,
1100        exclude_socket: &[ComInterfaceSocketUUID],
1101    ) -> Option<ComInterfaceSocketUUID> {
1102        match endpoint.instance {
1103            // find socket for any endpoint instance
1104            EndpointInstance::Any => {
1105                let options = EndpointIterateOptions {
1106                    only_direct: false,
1107                    exact_instance: false,
1108                    exclude_sockets: exclude_socket,
1109                };
1110                if let Some(socket) =
1111                    self.iterate_endpoint_sockets(endpoint, options).next()
1112                {
1113                    return Some(socket);
1114                }
1115                None
1116            }
1117
1118            // find socket for exact instance
1119            EndpointInstance::Instance(_) => {
1120                // iterate over all sockets of all interfaces
1121                let options = EndpointIterateOptions {
1122                    only_direct: false,
1123                    exact_instance: true,
1124                    exclude_sockets: exclude_socket,
1125                };
1126                if let Some(socket) =
1127                    self.iterate_endpoint_sockets(endpoint, options).next()
1128                {
1129                    return Some(socket);
1130                }
1131                None
1132            }
1133
1134            // TODO #185: how to handle broadcasts?
1135            EndpointInstance::All => {
1136                todo!("#186 Undescribed by author.")
1137            }
1138        }
1139    }
1140
1141    /// Finds the best socket over which to send a block to an endpoint.
1142    /// If a known socket is found, it is returned, otherwise the default socket is returned, if it
1143    /// exists and is not excluded.
1144    fn find_best_endpoint_socket(
1145        &self,
1146        endpoint: &Endpoint,
1147        exclude_sockets: &[ComInterfaceSocketUUID],
1148    ) -> Option<ComInterfaceSocketUUID> {
1149        // if the endpoint is the same as the hub endpoint, try to find an interface
1150        // that redirects @@local
1151        if endpoint == &self.endpoint
1152            && let Some(socket) = self
1153                .find_known_endpoint_socket(&Endpoint::LOCAL, exclude_sockets)
1154        {
1155            return Some(socket);
1156        }
1157
1158        // find best known socket for endpoint
1159        let matching_socket =
1160            self.find_known_endpoint_socket(endpoint, exclude_sockets);
1161
1162        // if a matching socket is found, return it
1163        if matching_socket.is_some() {
1164            matching_socket
1165        }
1166        // otherwise, return the highest priority socket that is not excluded
1167        else {
1168            let sockets = self.fallback_sockets.borrow();
1169            for (socket_uuid, _, _) in sockets.iter() {
1170                let socket = self.get_socket_by_uuid(socket_uuid);
1171                info!(
1172                    "{}: Find best for {}: {} ({}); excluded:{}",
1173                    self.endpoint,
1174                    endpoint,
1175                    socket_uuid,
1176                    socket
1177                        .lock()
1178                        .unwrap()
1179                        .direct_endpoint
1180                        .clone()
1181                        .map(|e| e.to_string())
1182                        .unwrap_or("None".to_string()),
1183                    exclude_sockets.contains(socket_uuid)
1184                );
1185                if !exclude_sockets.contains(socket_uuid) {
1186                    return Some(socket_uuid.clone());
1187                }
1188            }
1189            None
1190        }
1191    }
1192
1193    /// Returns all receivers to which the block has to be sent, grouped by the
1194    /// outbound socket uuids
1195    fn get_outbound_receiver_groups(
1196        &self,
1197        // TODO #187: do we need the block here for additional information (match conditions),
1198        // otherwise receivers are enough
1199        block: &DXBBlock,
1200        mut exclude_sockets: Vec<ComInterfaceSocketUUID>,
1201    ) -> Option<Vec<(Option<ComInterfaceSocketUUID>, Vec<Endpoint>)>> {
1202        if let Some(receivers) = block.receivers() {
1203            if !receivers.is_empty() {
1204                let endpoint_sockets = receivers
1205                    .iter()
1206                    .map(|e| {
1207                        // add sockets from endpoint blacklist
1208                        if let Some(blacklist) =
1209                            self.endpoint_sockets_blacklist.borrow().get(e)
1210                        {
1211                            exclude_sockets.extend(blacklist.iter().cloned());
1212                        }
1213                        let socket =
1214                            self.find_best_endpoint_socket(e, &exclude_sockets);
1215                        (socket, e)
1216                    })
1217                    .group_by(|(socket, _)| socket.clone())
1218                    .into_iter()
1219                    .map(|(socket, group)| {
1220                        let endpoints = group
1221                            .map(|(_, endpoint)| endpoint.clone())
1222                            .collect::<Vec<_>>();
1223                        (socket, endpoints)
1224                    })
1225                    .collect::<Vec<_>>();
1226
1227                Some(endpoint_sockets)
1228            } else {
1229                None
1230            }
1231        } else {
1232            None
1233        }
1234    }
1235
1236    /// Runs the update loop for the ComHub.
1237    /// This method will continuously handle incoming data, send out
1238    /// queued blocks and update the sockets.
1239    /// This is only used for internal tests - in a full runtime setup, the main runtime update loop triggers
1240    /// ComHub updates.
1241    pub fn _start_update_loop(self_rc: Rc<Self>) {
1242        // if already running, do nothing
1243        if *self_rc.update_loop_running.borrow() {
1244            return;
1245        }
1246
1247        // set update loop running flag
1248        *self_rc.update_loop_running.borrow_mut() = true;
1249
1250        spawn_with_panic_notify(async move {
1251            while *self_rc.update_loop_running.borrow() {
1252                self_rc.update();
1253                sleep(Duration::from_millis(1)).await;
1254            }
1255            if let Some(sender) =
1256                self_rc.update_loop_stop_sender.borrow_mut().take()
1257            {
1258                sender.send(()).expect("Failed to send stop signal");
1259            }
1260        });
1261    }
1262
1263    /// Update all sockets and interfaces,
1264    /// collecting incoming data and sending out queued blocks.
1265    /// Updates are scheduled in local tasks and are not immediately visible.
1266    /// To wait for the block update to finish, use `wait_for_update_async()`.
1267    pub fn update(&self) {
1268        // update all interfaces
1269        self.update_interfaces();
1270
1271        // update own socket lists for routing
1272        self.update_sockets();
1273
1274        // update sockets block collectors
1275        self.collect_incoming_data();
1276
1277        // receive blocks from all sockets
1278        self.receive_incoming_blocks();
1279
1280        // send all queued blocks from all interfaces
1281        self.flush_outgoing_blocks();
1282    }
1283
1284    /// Prepares a block for sending out by updating the creation timestamp,
1285    /// sender and add signature and encryption if needed.
1286    fn prepare_own_block(&self, mut block: DXBBlock) -> DXBBlock {
1287        // TODO #188 signature & encryption
1288        let now = get_global_context().clone().time.lock().unwrap().now();
1289        block.routing_header.sender = self.endpoint.clone();
1290        block
1291            .block_header
1292            .flags_and_timestamp
1293            .set_creation_timestamp(now);
1294
1295        // set distance to 1
1296        block.routing_header.distance = 1;
1297        block
1298    }
1299
1300    /// Public method to send an outgoing block from this endpoint. Called by the runtime.
1301    pub fn send_own_block(
1302        &self,
1303        mut block: DXBBlock,
1304    ) -> Result<(), Vec<Endpoint>> {
1305        block = self.prepare_own_block(block);
1306        // add own outgoing block to history
1307        self.block_handler.add_block_to_history(&block, None);
1308        self.send_block(block, vec![], false)
1309    }
1310
1311    /// Sends a block and wait for a response block.
1312    /// Fix number of exact endpoints -> Expected responses are known at send time.
1313    /// TODO #189: make sure that mutating blocks are always send to specific endpoint instances (@jonas/0001), not generic endpoints like @jonas.
1314    /// @jonas -> response comes from a specific instance of @jonas/0001
1315    pub async fn send_own_block_await_response(
1316        &self,
1317        block: DXBBlock,
1318        options: ResponseOptions,
1319    ) -> Vec<Result<Response, ResponseError>> {
1320        let context_id = block.block_header.context_id;
1321        let section_index = block.block_header.section_index;
1322
1323        let has_exact_receiver_count = block.has_exact_receiver_count();
1324        let receivers = block.get_receivers();
1325
1326        let res = self.send_own_block(block);
1327        let failed_endpoints = res.err().unwrap_or_default();
1328
1329        // yield
1330        #[cfg(feature = "tokio_runtime")]
1331        yield_now().await;
1332
1333        let timeout = options
1334            .timeout
1335            .unwrap_or_default(self.options.default_receive_timeout);
1336
1337        // return fixed number of responses
1338        if has_exact_receiver_count {
1339            // if resolution strategy is ReturnOnAnyError or ReturnOnFirstResult, directly return if any endpoint failed
1340            if (options.resolution_strategy
1341                == ResponseResolutionStrategy::ReturnOnAnyError
1342                || options.resolution_strategy
1343                    == ResponseResolutionStrategy::ReturnOnFirstResult)
1344                && !failed_endpoints.is_empty()
1345            {
1346                // for each failed endpoint, set NotReachable error, for all others EarlyAbort
1347                return receivers
1348                    .iter()
1349                    .map(|receiver| {
1350                        if failed_endpoints.contains(receiver) {
1351                            Err(ResponseError::NotReachable(receiver.clone()))
1352                        } else {
1353                            Err(ResponseError::EarlyAbort(receiver.clone()))
1354                        }
1355                    })
1356                    .collect::<Vec<_>>();
1357            }
1358
1359            // store received responses in map for all receivers
1360            let mut responses = HashMap::new();
1361            let mut missing_response_count = receivers.len();
1362            for receiver in &receivers {
1363                responses.insert(
1364                    receiver.clone(),
1365                    if failed_endpoints.contains(receiver) {
1366                        Err(ResponseError::NotReachable(receiver.clone()))
1367                    } else {
1368                        Err(ResponseError::NoResponseAfterTimeout(
1369                            receiver.clone(),
1370                            timeout,
1371                        ))
1372                    },
1373                );
1374            }
1375            // directly subtract number of already failed endpoints from missing responses
1376            missing_response_count -= failed_endpoints.len();
1377
1378            info!(
1379                "Waiting for responses from receivers {}",
1380                receivers
1381                    .iter()
1382                    .map(|e| e.to_string())
1383                    .collect::<Vec<_>>()
1384                    .join(",")
1385            );
1386
1387            let mut rx = self
1388                .block_handler
1389                .register_incoming_block_observer(context_id, section_index);
1390
1391            let res = task::timeout(timeout, async {
1392                while let Some(section) = rx.next().await {
1393                    let mut received_response = false;
1394                    // get sender
1395                    let mut sender = section.get_sender();
1396                    // add to response for exactly matching endpoint instance
1397                    if let Some(response) = responses.get_mut(&sender) {
1398                        // check if the receiver is already set (= current set response is Err)
1399                        if response.is_err() {
1400                            *response = Ok(Response::ExactResponse(sender.clone(), section));
1401                            missing_response_count -= 1;
1402                            info!("Received expected response from {sender}");
1403                            received_response = true;
1404                        }
1405                        // already received a response from this exact sender - this should not happen
1406                        else {
1407                            error!("Received multiple responses from the same sender: {sender}");
1408                        }
1409                    }
1410                    // add to response for matching endpoint
1411                    else if let Some(response) = responses.get_mut(&sender.any_instance_endpoint()) {
1412                        info!("Received resolved response from {} -> {}", &sender, &sender.any_instance_endpoint());
1413                        sender = sender.any_instance_endpoint();
1414                        // check if the receiver is already set (= current set response is Err)
1415                        if response.is_err() {
1416                            *response = Ok(Response::ResolvedResponse(sender.clone(), section));
1417                            missing_response_count -= 1;
1418                            received_response = true;
1419                        }
1420                        // already received a response from a matching endpoint - ignore
1421                        else {
1422                            info!("Received multiple resolved responses from the {}", &sender);
1423                        }
1424                    }
1425                    // response from unexpected sender
1426                    else {
1427                        error!("Received response from unexpected sender: {}", &sender);
1428                    }
1429
1430                    // if resolution strategy is ReturnOnFirstResult, break if any response is received
1431                    if received_response && options.resolution_strategy == ResponseResolutionStrategy::ReturnOnFirstResult {
1432                        // set all other responses to EarlyAbort
1433                        for (receiver, response) in responses.iter_mut() {
1434                            if receiver != &sender {
1435                                *response = Err(ResponseError::EarlyAbort(receiver.clone()));
1436                            }
1437                        }
1438                        break;
1439                    }
1440
1441                    // if all responses are received, break
1442                    if missing_response_count == 0 {
1443                        break;
1444                    }
1445                }
1446            }).await;
1447
1448            if res.is_err() {
1449                error!("Timeout waiting for responses");
1450            }
1451
1452            // return responses as vector
1453            responses.into_values().collect::<Vec<_>>()
1454        }
1455        // return all received responses
1456        else {
1457            let mut responses = vec![];
1458
1459            let res = task::timeout(timeout, async {
1460                let mut rx = self.block_handler.register_incoming_block_observer(context_id, section_index);
1461                while let Some(section) = rx.next().await {
1462                    // get sender
1463                    let sender = section.get_sender();
1464                    info!("Received response from {sender}");
1465                    // add to response for exactly matching endpoint instance
1466                    responses.push(Ok(Response::UnspecifiedResponse(section)));
1467
1468                    // if resolution strategy is ReturnOnFirstResult, break if any response is received
1469                    if options.resolution_strategy == ResponseResolutionStrategy::ReturnOnFirstResult {
1470                        break;
1471                    }
1472                }
1473            }).await;
1474
1475            if res.is_err() {
1476                info!("Timeout waiting for responses");
1477            }
1478
1479            responses
1480        }
1481    }
1482
1483    /// Sends a block to all endpoints specified in the block header.
1484    /// The routing algorithm decides which sockets are used to send the block, based on the endpoint.
1485    /// A block can be sent to multiple endpoints at the same time over a socket or to multiple sockets for each endpoint.
1486    /// The original_socket parameter is used to prevent sending the block back to the sender.
1487    /// When this method is called, the block is queued in the send queue.
1488    /// Returns an Err with a list of unreachable endpoints if the block could not be sent to all endpoints.
1489    pub fn send_block(
1490        &self,
1491        mut block: DXBBlock,
1492        exclude_sockets: Vec<ComInterfaceSocketUUID>,
1493        forked: bool,
1494    ) -> Result<(), Vec<Endpoint>> {
1495        let outbound_receiver_groups =
1496            self.get_outbound_receiver_groups(&block, exclude_sockets);
1497
1498        if outbound_receiver_groups.is_none() {
1499            error!("No outbound receiver groups found for block");
1500            return Err(vec![]);
1501        }
1502
1503        let outbound_receiver_groups = outbound_receiver_groups.unwrap();
1504
1505        let mut unreachable_endpoints = vec![];
1506
1507        // currently only used for trace debugging (TODO: put behind debug flag)
1508        // if more than one addressed block is sent, the block is forked, thus the fork count is set to 0
1509        // for each forked block, the fork count is incremented
1510        // if only one block is sent, the block is just moved and not forked
1511        let mut fork_count = if forked || outbound_receiver_groups.len() > 1 {
1512            Some(0)
1513        } else {
1514            None
1515        };
1516
1517        block.set_bounce_back(false);
1518
1519        for (receiver_socket, endpoints) in outbound_receiver_groups {
1520            if let Some(socket_uuid) = receiver_socket {
1521                self.send_block_to_endpoints_via_socket(
1522                    block.clone(),
1523                    &socket_uuid,
1524                    &endpoints,
1525                    fork_count,
1526                );
1527            } else {
1528                error!("{}: cannot send block, no receiver sockets found for endpoints {:?}", self.endpoint, endpoints.iter().map(|e| e.to_string()).collect::<Vec<_>>());
1529                unreachable_endpoints.extend(endpoints);
1530            }
1531            // increment fork_count if Some
1532            if let Some(count) = fork_count {
1533                fork_count = Some(count + 1);
1534            }
1535        }
1536
1537        if !unreachable_endpoints.is_empty() {
1538            return Err(unreachable_endpoints);
1539        }
1540        Ok(())
1541    }
1542
1543    /// Sends a block via a socket to a list of endpoints.
1544    /// Before the block is sent, it is modified to include the list of endpoints as receivers.
1545    fn send_block_to_endpoints_via_socket(
1546        &self,
1547        mut block: DXBBlock,
1548        socket_uuid: &ComInterfaceSocketUUID,
1549        endpoints: &[Endpoint],
1550        // currently only used for trace debugging (TODO: put behind debug flag)
1551        fork_count: Option<usize>,
1552    ) {
1553        block.set_receivers(endpoints);
1554
1555        // assuming the distance was already increment during redirect, we
1556        // effectively decrement the block distance by 1 if it is a bounce back
1557        if block.is_bounce_back() {
1558            block.routing_header.distance -= 2;
1559        }
1560
1561        // if type is Trace or TraceBack, add the outgoing socket to the hops
1562        match block.block_header.flags_and_timestamp.block_type() {
1563            BlockType::Trace | BlockType::TraceBack => {
1564                let distance = block.routing_header.distance;
1565                let new_fork_nr = self.calculate_fork_nr(&block, fork_count);
1566                let bounce_back = block.is_bounce_back();
1567
1568                self.add_hop_to_block_trace_data(
1569                    &mut block,
1570                    NetworkTraceHop {
1571                        endpoint: self.endpoint.clone(),
1572                        distance,
1573                        socket: NetworkTraceHopSocket::new(
1574                            self.get_com_interface_from_socket_uuid(
1575                                socket_uuid,
1576                            )
1577                            .borrow_mut()
1578                            .get_properties(),
1579                            socket_uuid.clone(),
1580                        ),
1581                        direction: NetworkTraceHopDirection::Outgoing,
1582                        fork_nr: new_fork_nr,
1583                        bounce_back,
1584                    },
1585                );
1586            }
1587            _ => {}
1588        }
1589
1590        let socket = self.get_socket_by_uuid(socket_uuid);
1591        let mut socket_ref = socket.lock().unwrap();
1592
1593        let is_broadcast = endpoints
1594            .iter()
1595            .any(|e| e == &Endpoint::ANY_ALL_INSTANCES || e == &Endpoint::ANY);
1596
1597        if is_broadcast
1598            && let Some(direct_endpoint) = &socket_ref.direct_endpoint
1599            && (direct_endpoint == &self.endpoint
1600                || direct_endpoint == &Endpoint::LOCAL)
1601        {
1602            return;
1603        }
1604
1605        match &block.to_bytes() {
1606            Ok(bytes) => {
1607                info!(
1608                    "Sending block to socket {}: {}",
1609                    socket_uuid,
1610                    endpoints.iter().map(|e| e.to_string()).join(", ")
1611                );
1612
1613                // TODO #190: resend block if socket failed to send
1614                socket_ref.queue_outgoing_block(bytes);
1615            }
1616            Err(err) => {
1617                error!("Failed to convert block to bytes: {err:?}");
1618            }
1619        }
1620    }
1621
1622    /// Updates all interfaces to handle reconnections if the interface can be reconnected
1623    /// or remove the interface if it cannot be reconnected.
1624    fn update_interfaces(&self) {
1625        let mut to_remove = Vec::new();
1626        for (interface, _) in self.interfaces.borrow().values() {
1627            let uuid = interface.borrow().get_uuid().clone();
1628            let state = interface.borrow().get_state();
1629
1630            // If the interface has been proactively destroyed, remove it from the hub
1631            // and clean up the sockets. This happens when the user calls the destroy
1632            // method on the interface and not the remove_interface on the ComHub.
1633            if state.is_destroyed() {
1634                info!("Destroying interface on the ComHub {uuid}");
1635                to_remove.push(uuid);
1636            } else if state.is_not_connected()
1637                && interface.borrow_mut().get_properties().shall_reconnect()
1638            {
1639                // If the interface is disconnected and the interface has
1640                // reconnection enabled, check if the interface should be reconnected
1641                let interface_rc = interface.clone();
1642                let mut interface = interface.borrow_mut();
1643
1644                let already_connecting =
1645                    interface.get_state() == ComInterfaceState::Connecting;
1646
1647                if !already_connecting {
1648                    let config = interface.get_properties_mut();
1649
1650                    let reconnect_now = match &config.reconnection_config {
1651                        ReconnectionConfig::InstantReconnect => true,
1652                        ReconnectionConfig::ReconnectWithTimeout { timeout } => {
1653                            ReconnectionConfig::check_reconnect_timeout(
1654                                config.close_timestamp,
1655                                timeout,
1656                            )
1657                        }
1658                        ReconnectionConfig::ReconnectWithTimeoutAndAttempts {
1659                            timeout,
1660                            attempts,
1661                        } => {
1662                            let max_attempts = attempts;
1663
1664                            // check if the attempts are not exceeded
1665                            let attempts = config.reconnect_attempts.unwrap_or(0);
1666                            let attempts = attempts + 1;
1667                            if attempts > *max_attempts {
1668                                to_remove.push(uuid.clone());
1669                                return;
1670                            }
1671
1672                            config.reconnect_attempts = Some(attempts);
1673
1674                            ReconnectionConfig::check_reconnect_timeout(
1675                                config.close_timestamp,
1676                                timeout,
1677                            )
1678                        }
1679                        ReconnectionConfig::NoReconnect => false,
1680                    };
1681                    if reconnect_now {
1682                        debug!("Reconnecting interface {uuid}");
1683                        interface.set_state(ComInterfaceState::Connecting);
1684                        spawn_with_panic_notify(async move {
1685                            let interface = interface_rc.clone();
1686                            let mut interface = interface.borrow_mut();
1687
1688                            let config = interface.get_properties_mut();
1689                            config.close_timestamp = None;
1690
1691                            let current_attempts =
1692                                config.reconnect_attempts.unwrap_or(0);
1693                            config.reconnect_attempts =
1694                                Some(current_attempts + 1);
1695
1696                            let res = interface.handle_open().await;
1697                            if res {
1698                                interface
1699                                    .set_state(ComInterfaceState::Connected);
1700                                // config.reconnect_attempts = None;
1701                            } else {
1702                                interface
1703                                    .set_state(ComInterfaceState::NotConnected);
1704                            }
1705                        });
1706                    } else {
1707                        debug!("Not reconnecting interface {uuid}");
1708                    }
1709                }
1710            }
1711        }
1712
1713        for uuid in to_remove {
1714            self.cleanup_interface(uuid);
1715        }
1716    }
1717
1718    /// Updates all known sockets for all interfaces to update routing
1719    /// information, remove deleted sockets and add new sockets and endpoint relations
1720    fn update_sockets(&self) {
1721        let mut new_sockets = Vec::new();
1722        let mut deleted_sockets = Vec::new();
1723        let mut registered_sockets = Vec::new();
1724
1725        for (interface, priority) in self.interfaces.borrow().values() {
1726            let socket_updates = interface.clone().borrow().get_sockets();
1727            let mut socket_updates = socket_updates.lock().unwrap();
1728
1729            registered_sockets
1730                .extend(socket_updates.socket_registrations.drain(..));
1731            new_sockets.extend(
1732                socket_updates.new_sockets.drain(..).map(|s| (s, *priority)),
1733            );
1734            deleted_sockets.extend(socket_updates.deleted_sockets.drain(..));
1735        }
1736
1737        for (socket, priority) in new_sockets {
1738            self.add_socket(socket.clone(), priority);
1739        }
1740        for socket_uuid in deleted_sockets {
1741            self.delete_socket(&socket_uuid);
1742        }
1743        for (socket_uuid, distance, endpoint) in registered_sockets {
1744            let socket = self.get_socket_by_uuid(&socket_uuid);
1745            self.register_socket_endpoint(socket, endpoint.clone(), distance)
1746                .unwrap_or_else(|e| {
1747                    error!(
1748                        "Failed to register socket {socket_uuid} for endpoint {endpoint} {e:?}"
1749                    );
1750                });
1751        }
1752    }
1753
1754    /// Collects incoming data slices from all sockets. The sockets will call their
1755    /// BlockCollector to collect the data into blocks.
1756    fn collect_incoming_data(&self) {
1757        // update sockets, collect incoming data into full blocks
1758        for (socket, _) in self.sockets.borrow().values() {
1759            let mut socket_ref = socket.lock().unwrap();
1760            socket_ref.collect_incoming_data();
1761        }
1762    }
1763
1764    /// Collects all blocks from the receive queues of all sockets and process them
1765    /// in the receive_block method.
1766    fn receive_incoming_blocks(&self) {
1767        let mut blocks = vec![];
1768        // iterate over all sockets
1769        for (socket, _) in self.sockets.borrow().values() {
1770            let mut socket_ref = socket.lock().unwrap();
1771            let uuid = socket_ref.uuid.clone();
1772            let block_queue = socket_ref.get_incoming_block_queue();
1773            blocks.push((uuid, block_queue.drain(..).collect::<Vec<_>>()));
1774        }
1775
1776        for (uuid, blocks) in blocks {
1777            for block in blocks.iter() {
1778                self.receive_block(block, uuid.clone());
1779            }
1780        }
1781    }
1782
1783    /// Sends all queued blocks from all interfaces.
1784    fn flush_outgoing_blocks(&self) {
1785        let interfaces = self.interfaces.borrow();
1786        for (interface, _) in interfaces.values() {
1787            com_interface::flush_outgoing_blocks(interface.clone());
1788        }
1789    }
1790}
1791
1792#[derive(Default, PartialEq, Debug)]
1793pub enum ResponseResolutionStrategy {
1794    /// Promise.allSettled
1795    /// - For know fixed receivers:
1796    ///   return after all known sends are finished (either success or error
1797    ///   if block could not be sent / timed out)
1798    /// - For unknown receiver count:
1799    ///   return after timeout
1800    #[default]
1801    ReturnAfterAllSettled,
1802
1803    /// Promise.all
1804    /// - For know fixed receivers:
1805    ///  return after all known sends are finished successfully
1806    ///  return immediately if one send fails early (e.g. endpoint not reachable)
1807    /// - For unknown receiver count:
1808    ///   return after timeout
1809    ///
1810    ReturnOnAnyError,
1811
1812    /// Promise.any
1813    /// Return after first successful response received
1814    ReturnOnFirstResponse,
1815
1816    /// Promise.race
1817    /// Return after first response received (success or error)
1818    ReturnOnFirstResult,
1819}
1820
1821#[derive(Default, Debug)]
1822pub enum ResponseTimeout {
1823    #[default]
1824    Default,
1825    Custom(Duration),
1826}
1827
1828impl ResponseTimeout {
1829    pub fn unwrap_or_default(self, default: Duration) -> Duration {
1830        match self {
1831            ResponseTimeout::Default => default,
1832            ResponseTimeout::Custom(timeout) => timeout,
1833        }
1834    }
1835}
1836
1837#[derive(Default, Debug)]
1838pub struct ResponseOptions {
1839    pub resolution_strategy: ResponseResolutionStrategy,
1840    pub timeout: ResponseTimeout,
1841}
1842
1843impl ResponseOptions {
1844    pub fn new_with_resolution_strategy(
1845        resolution_strategy: ResponseResolutionStrategy,
1846    ) -> Self {
1847        Self {
1848            resolution_strategy,
1849            ..ResponseOptions::default()
1850        }
1851    }
1852
1853    pub fn new_with_timeout(timeout: Duration) -> Self {
1854        Self {
1855            timeout: ResponseTimeout::Custom(timeout),
1856            ..ResponseOptions::default()
1857        }
1858    }
1859}
1860
1861#[derive(Debug)]
1862pub enum Response {
1863    ExactResponse(Endpoint, IncomingSection),
1864    ResolvedResponse(Endpoint, IncomingSection),
1865    UnspecifiedResponse(IncomingSection),
1866}
1867
1868impl Response {
1869    pub fn take_incoming_section(self) -> IncomingSection {
1870        match self {
1871            Response::ExactResponse(_, section) => section,
1872            Response::ResolvedResponse(_, section) => section,
1873            Response::UnspecifiedResponse(section) => section,
1874        }
1875    }
1876}
1877
1878#[derive(Debug)]
1879pub enum ResponseError {
1880    NoResponseAfterTimeout(Endpoint, Duration),
1881    NotReachable(Endpoint),
1882    EarlyAbort(Endpoint),
1883}
1884
1885impl Display for ResponseError {
1886    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1887        match self {
1888            ResponseError::NoResponseAfterTimeout(endpoint, duration) => {
1889                write!(
1890                    f,
1891                    "No response after timeout ({}s) for endpoint {}",
1892                    duration.as_secs(),
1893                    endpoint
1894                )
1895            }
1896            ResponseError::NotReachable(endpoint) => {
1897                write!(f, "Endpoint {endpoint} is not reachable")
1898            }
1899            ResponseError::EarlyAbort(endpoint) => {
1900                write!(f, "Early abort for endpoint {endpoint}")
1901            }
1902        }
1903    }
1904}