datex_core/network/
com_hub.rs

1use crate::global::protocol_structures::block_header::BlockType;
2use crate::global::protocol_structures::routing_header::SignatureType;
3use crate::stdlib::{cell::RefCell, rc::Rc};
4use crate::task::{self, sleep, spawn_with_panic_notify};
5use crate::utils::time::Time;
6
7use futures::channel::oneshot::Sender;
8use futures_util::StreamExt;
9use itertools::Itertools;
10use log::{debug, error, info, warn};
11use std::cmp::PartialEq;
12use std::collections::{HashMap, HashSet};
13use std::fmt::{Debug, Display, Formatter};
14use std::sync::{Arc, Mutex};
15use std::time::Duration;
16#[cfg(feature = "tokio_runtime")]
17use tokio::task::yield_now;
18// FIXME #175 no-std
19
20use super::com_interfaces::com_interface::{
21    self, ComInterfaceError, ComInterfaceState
22};
23use super::com_interfaces::{
24    com_interface::ComInterface, com_interface_socket::ComInterfaceSocket,
25};
26use crate::values::core_values::endpoint::{Endpoint, EndpointInstance};
27use crate::global::dxb_block::{DXBBlock, IncomingSection};
28use crate::network::block_handler::{BlockHandler, BlockHistoryData};
29use crate::network::com_hub_network_tracing::{NetworkTraceHop, NetworkTraceHopDirection, NetworkTraceHopSocket};
30use crate::network::com_interfaces::com_interface::ComInterfaceUUID;
31use crate::network::com_interfaces::com_interface_properties::{
32    InterfaceDirection, ReconnectionConfig,
33};
34use crate::network::com_interfaces::com_interface_socket::ComInterfaceSocketUUID;
35use crate::network::com_interfaces::default_com_interfaces::local_loopback_interface::LocalLoopbackInterface;
36use crate::values::value_container::ValueContainer;
37
38#[derive(Debug, Clone)]
39pub struct DynamicEndpointProperties {
40    pub known_since: u64,
41    pub distance: i8,
42    pub is_direct: bool,
43    pub channel_factor: u32,
44    pub direction: InterfaceDirection,
45}
46
47pub type ComInterfaceFactoryFn =
48    fn(
49        setup_data: ValueContainer,
50    ) -> Result<Rc<RefCell<dyn ComInterface>>, ComInterfaceError>;
51
52#[derive(Debug)]
53pub struct ComHubOptions {
54    default_receive_timeout: Duration,
55}
56
57impl Default for ComHubOptions {
58    fn default() -> Self {
59        ComHubOptions {
60            default_receive_timeout: Duration::from_secs(5),
61        }
62    }
63}
64
65type SocketMap = HashMap<
66    ComInterfaceSocketUUID,
67    (Arc<Mutex<ComInterfaceSocket>>, HashSet<Endpoint>),
68>;
69type InterfaceMap = HashMap<
70    ComInterfaceUUID,
71    (Rc<RefCell<dyn ComInterface>>, InterfacePriority),
72>;
73
74pub struct ComHub {
75    /// the runtime endpoint of the hub (@me)
76    pub endpoint: Endpoint,
77
78    /// ComHub configuration options
79    pub options: ComHubOptions,
80
81    /// a list of all available interface factories, keyed by their interface type
82    pub interface_factories: RefCell<HashMap<String, ComInterfaceFactoryFn>>,
83
84    /// a list of all available interfaces, keyed by their UUID
85    pub interfaces: RefCell<InterfaceMap>,
86
87    /// a list of all available sockets, keyed by their UUID
88    /// contains the socket itself and a list of endpoints currently associated with it
89    pub sockets: RefCell<SocketMap>,
90
91    /// a blacklist of sockets that are not allowed to be used for a specific endpoint
92    pub endpoint_sockets_blacklist:
93        RefCell<HashMap<Endpoint, HashSet<ComInterfaceSocketUUID>>>,
94
95    /// fallback sockets that are used if no direct endpoint reachable socket is available
96    /// sorted by priority
97    pub fallback_sockets:
98        RefCell<Vec<(ComInterfaceSocketUUID, u16, InterfaceDirection)>>,
99
100    /// a list of all available sockets for each endpoint, with additional
101    /// DynamicEndpointProperties metadata
102    pub endpoint_sockets: RefCell<
103        HashMap<
104            Endpoint,
105            Vec<(ComInterfaceSocketUUID, DynamicEndpointProperties)>,
106        >,
107    >,
108
109    /// set to true if the update loop should be running
110    /// when set to false, the update loop will stop
111    update_loop_running: RefCell<bool>,
112    update_loop_stop_sender: RefCell<Option<Sender<()>>>,
113
114    pub block_handler: BlockHandler,
115}
116
117impl Debug for ComHub {
118    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
119        f.debug_struct("ComHub")
120            .field("endpoint", &self.endpoint)
121            .field("options", &self.options)
122            .field("sockets", &self.sockets)
123            .field(
124                "endpoint_sockets_blacklist",
125                &self.endpoint_sockets_blacklist,
126            )
127            .field("fallback_sockets", &self.fallback_sockets)
128            .field("endpoint_sockets", &self.endpoint_sockets)
129            .finish()
130    }
131}
132
133#[derive(Debug, Clone, Default)]
134struct EndpointIterateOptions<'a> {
135    pub only_direct: bool,
136    pub exact_instance: bool,
137    pub exclude_sockets: &'a [ComInterfaceSocketUUID],
138}
139
140impl Default for ComHub {
141    fn default() -> Self {
142        ComHub {
143            endpoint: Endpoint::default(),
144            options: ComHubOptions::default(),
145            interface_factories: RefCell::new(HashMap::new()),
146            interfaces: RefCell::new(HashMap::new()),
147            endpoint_sockets: RefCell::new(HashMap::new()),
148            block_handler: BlockHandler::new(),
149            sockets: RefCell::new(HashMap::new()),
150            fallback_sockets: RefCell::new(Vec::new()),
151            endpoint_sockets_blacklist: RefCell::new(HashMap::new()),
152            update_loop_running: RefCell::new(false),
153            update_loop_stop_sender: RefCell::new(None),
154        }
155    }
156}
157
158#[derive(Debug, Clone, Copy, Ord, PartialOrd, Eq, PartialEq)]
159pub enum InterfacePriority {
160    /// The interface will not be used for fallback routing if no other interface is available
161    /// This is useful for interfaces which cannot communicate with the outside world or are not
162    /// capable of redirecting large amounts of data
163    None,
164    /// The interface will be used for fallback routing if no other interface is available,
165    /// depending on the defined priority
166    /// A higher number means a higher priority
167    Priority(u16),
168}
169
170impl Default for InterfacePriority {
171    fn default() -> Self {
172        InterfacePriority::Priority(0)
173    }
174}
175
176#[derive(Debug, Clone, PartialEq)]
177pub enum ComHubError {
178    InterfaceError(ComInterfaceError),
179    InterfaceCloseFailed,
180    InterfaceNotConnected,
181    InterfaceDoesNotExist,
182    InterfaceAlreadyExists,
183    InterfaceTypeDoesNotExist,
184    InvalidInterfaceDirectionForFallbackInterface,
185    NoResponse,
186    InterfaceOpenError,
187}
188
189#[derive(Debug)]
190pub enum SocketEndpointRegistrationError {
191    SocketDisconnected,
192    SocketUninitialized,
193    SocketEndpointAlreadyRegistered,
194}
195
196impl ComHub {
197    pub fn new(endpoint: impl Into<Endpoint>) -> ComHub {
198        ComHub {
199            endpoint: endpoint.into(),
200            ..ComHub::default()
201        }
202    }
203
204    pub async fn init(&self) -> Result<(), ComHubError> {
205        // add default local loopback interface
206        let local_interface = LocalLoopbackInterface::new();
207        self.open_and_add_interface(
208            Rc::new(RefCell::new(local_interface)),
209            InterfacePriority::None,
210        )
211        .await
212    }
213
214    /// Registers a new interface factory for a specific interface implementation.
215    /// This allows the ComHub to create new instances of the interface on demand.
216    pub fn register_interface_factory(
217        &self,
218        interface_type: String,
219        factory: ComInterfaceFactoryFn,
220    ) {
221        self.interface_factories
222            .borrow_mut()
223            .insert(interface_type, factory);
224    }
225
226    /// Creates a new interface instance using the registered factory
227    /// for the specified interface type if it exists.
228    /// The interface is opened and added to the ComHub.
229    pub async fn create_interface(
230        &self,
231        interface_type: &str,
232        setup_data: ValueContainer,
233        priority: InterfacePriority,
234    ) -> Result<Rc<RefCell<dyn ComInterface>>, ComHubError> {
235        info!(
236            "creating interface {interface_type} with setup data: {setup_data:?}"
237        );
238        let interface_factories = self.interface_factories.borrow();
239        if let Some(factory) = interface_factories.get(interface_type) {
240            let interface =
241                factory(setup_data).map_err(ComHubError::InterfaceError)?;
242            drop(interface_factories);
243
244            self.open_and_add_interface(interface.clone(), priority)
245                .await
246                .map(|_| interface)
247        } else {
248            Err(ComHubError::InterfaceTypeDoesNotExist)
249        }
250    }
251
252    fn try_downcast<T: 'static>(
253        input: Rc<RefCell<dyn ComInterface>>,
254    ) -> Option<Rc<RefCell<T>>> {
255        // Try to get a reference to the inner value
256        if input.borrow().as_any().is::<T>() {
257            // SAFETY: We're ensuring T is the correct type via the check
258            let ptr = Rc::into_raw(input) as *const RefCell<T>;
259            unsafe { Some(Rc::from_raw(ptr)) }
260        } else {
261            None
262        }
263    }
264
265    pub fn get_interface_by_uuid<T: ComInterface>(
266        &self,
267        interface_uuid: &ComInterfaceUUID,
268    ) -> Option<Rc<RefCell<T>>> {
269        ComHub::try_downcast(
270            self.interfaces.borrow().get(interface_uuid)?.0.clone(),
271        )
272    }
273
274    pub fn has_interface(&self, interface_uuid: &ComInterfaceUUID) -> bool {
275        self.interfaces.borrow().contains_key(interface_uuid)
276    }
277
278    pub fn get_dyn_interface_by_uuid(
279        &self,
280        uuid: &ComInterfaceUUID,
281    ) -> Option<Rc<RefCell<dyn ComInterface>>> {
282        self.interfaces
283            .borrow()
284            .get(uuid)
285            .map(|(interface, _)| interface.clone())
286    }
287
288    pub async fn open_and_add_interface(
289        &self,
290        interface: Rc<RefCell<dyn ComInterface>>,
291        priority: InterfacePriority,
292    ) -> Result<(), ComHubError> {
293        if interface.borrow().get_state() != ComInterfaceState::Connected {
294            // If interface is not connected, open it
295            // and wait for it to be connected
296            // FIXME #240: borrow_mut across await point
297            if !(interface.borrow_mut().handle_open().await) {
298                return Err(ComHubError::InterfaceOpenError);
299            }
300        }
301        self.add_interface(interface.clone(), priority)
302    }
303
304    pub fn add_interface(
305        &self,
306        interface: Rc<RefCell<dyn ComInterface>>,
307        priority: InterfacePriority,
308    ) -> Result<(), ComHubError> {
309        let uuid = interface.borrow().get_uuid().clone();
310        let mut interfaces = self.interfaces.borrow_mut();
311        if interfaces.contains_key(&uuid) {
312            return Err(ComHubError::InterfaceAlreadyExists);
313        }
314
315        // make sure the interface can send if a priority is set
316        if priority != InterfacePriority::None
317            && interface.borrow_mut().get_properties().direction
318                == InterfaceDirection::In
319        {
320            return Err(
321                ComHubError::InvalidInterfaceDirectionForFallbackInterface,
322            );
323        }
324
325        interfaces.insert(uuid, (interface, priority));
326        Ok(())
327    }
328
329    /// User can proactively remove an interface from the hub.
330    /// This will destroy the interface and it's sockets (perform deep cleanup)
331    pub async fn remove_interface(
332        &self,
333        interface_uuid: ComInterfaceUUID,
334    ) -> Result<(), ComHubError> {
335        info!("Removing interface {interface_uuid}");
336        let interface = self
337            .interfaces
338            .borrow_mut()
339            .get_mut(&interface_uuid.clone())
340            .ok_or(ComHubError::InterfaceDoesNotExist)?
341            .0
342            .clone();
343        {
344            // Async close the interface (stop tasks, server, cleanup internal data)
345            // FIXME #176: borrow_mut should not be used here
346            let mut interface = interface.borrow_mut();
347            interface.handle_destroy().await;
348        }
349
350        // Remove old sockets from ComHub that have been deleted by the interface destroy_sockets()
351        self.update_sockets();
352
353        self.cleanup_interface(interface_uuid)
354            .ok_or(ComHubError::InterfaceDoesNotExist)?;
355
356        Ok(())
357    }
358
359    /// The internal cleanup function that removes the interface from the hub
360    /// and disabled the default interface if it was set to this interface
361    fn cleanup_interface(
362        &self,
363        interface_uuid: ComInterfaceUUID,
364    ) -> Option<Rc<RefCell<dyn ComInterface>>> {
365        let interface = self
366            .interfaces
367            .borrow_mut()
368            .remove(&interface_uuid)
369            .or(None)?
370            .0;
371        Some(interface)
372    }
373
374    pub(crate) fn receive_block(
375        &self,
376        block: &DXBBlock,
377        socket_uuid: ComInterfaceSocketUUID,
378    ) {
379        info!("{} received block: {}", self.endpoint, block);
380
381        // ignore invalid blocks (e.g. invalid signature)
382        if !self.validate_block(block) {
383            warn!("Block validation failed. Dropping block...");
384            return;
385        }
386
387        let block_type = block.block_header.flags_and_timestamp.block_type();
388
389        // register in block history
390        let is_new_block = !self.block_handler.is_block_in_history(block);
391        // assign endpoint to socket if none is assigned
392        // only if a new block and the sender in not the local endpoint
393        if is_new_block && block.routing_header.sender != self.endpoint {
394            self.register_socket_endpoint_from_incoming_block(
395                socket_uuid.clone(),
396                block,
397            );
398        }
399
400        let receivers = block.receiver_endpoints();
401        if !receivers.is_empty() {
402            let is_for_own = receivers.iter().any(|e| {
403                e == &self.endpoint
404                    || e == &Endpoint::ANY
405                    || e == &Endpoint::ANY_ALL_INSTANCES
406            });
407
408            // handle blocks for own endpoint
409            if is_for_own && block_type != BlockType::Hello {
410                info!("Block is for this endpoint");
411
412                match block_type {
413                    BlockType::Trace => {
414                        self.handle_trace_block(block, socket_uuid.clone());
415                    }
416                    BlockType::TraceBack => {
417                        self.handle_trace_back_block(
418                            block,
419                            socket_uuid.clone(),
420                        );
421                    }
422                    _ => {
423                        self.block_handler.handle_incoming_block(block.clone());
424                    }
425                };
426            }
427
428            // TODO #177: handle this via TTL, not explicitly for Hello blocks
429            let should_relay =
430                // don't relay "Hello" blocks sent to own endpoint
431                !(
432                    is_for_own && block_type == BlockType::Hello
433                );
434
435            // relay the block to other endpoints
436            if should_relay {
437                // get all receivers that the block must be relayed to
438                let remaining_receivers = if is_for_own {
439                    &self.get_remote_receivers(&receivers)
440                } else {
441                    &receivers
442                };
443
444                // relay the block to all receivers
445                if !remaining_receivers.is_empty() {
446                    match block_type {
447                        BlockType::Trace | BlockType::TraceBack => {
448                            self.redirect_trace_block(
449                                block.clone_with_new_receivers(
450                                    remaining_receivers,
451                                ),
452                                socket_uuid.clone(),
453                                is_for_own,
454                            );
455                        }
456                        _ => {
457                            self.redirect_block(
458                                block.clone_with_new_receivers(
459                                    remaining_receivers,
460                                ),
461                                socket_uuid.clone(),
462                                is_for_own,
463                            );
464                        }
465                    }
466                }
467            }
468        }
469
470        // add to block history
471        if is_new_block {
472            self.block_handler
473                .add_block_to_history(block, Some(socket_uuid));
474        }
475    }
476
477    /// Returns a list of all receivers from a given ReceiverEndpoints
478    /// excluding the local endpoint
479    fn get_remote_receivers(
480        &self,
481        receiver_endpoints: &[Endpoint],
482    ) -> Vec<Endpoint> {
483        receiver_endpoints
484            .iter()
485            .filter(|e| e != &&self.endpoint)
486            .cloned()
487            .collect::<Vec<_>>()
488    }
489
490    /// Registers the socket endpoint from an incoming block
491    /// if the endpoint is not already registered for the socket
492    fn register_socket_endpoint_from_incoming_block(
493        &self,
494        socket_uuid: ComInterfaceSocketUUID,
495        block: &DXBBlock,
496    ) {
497        let socket = self.get_socket_by_uuid(&socket_uuid);
498        let mut socket_ref = socket.lock().unwrap();
499
500        let distance = block.routing_header.distance;
501        let sender = block.routing_header.sender.clone();
502
503        // set as direct endpoint if distance = 0
504        if socket_ref.direct_endpoint.is_none() && distance == 1 {
505            info!(
506                "Setting direct endpoint for socket {}: {}",
507                socket_ref.uuid, sender
508            );
509            socket_ref.direct_endpoint = Some(sender.clone());
510        }
511
512        drop(socket_ref);
513        match self.register_socket_endpoint(
514            socket.clone(),
515            sender.clone(),
516            distance,
517        ) {
518            Err(SocketEndpointRegistrationError::SocketEndpointAlreadyRegistered) => {
519                debug!(
520                    "Socket already registered for endpoint {sender}",
521                );
522            }
523            Err(error) => {
524                panic!("Failed to register socket endpoint {sender}: {error:?}");
525            },
526            Ok(_) => { }
527        }
528    }
529
530    /// Prepares a block and relays it to the given receivers.
531    /// The routing distance is incremented by 1.
532    pub(crate) fn redirect_block(
533        &self,
534        mut block: DXBBlock,
535        incoming_socket: ComInterfaceSocketUUID,
536        // only for debugging traces
537        forked: bool,
538    ) {
539        let receivers = block.receiver_endpoints();
540
541        // check if block has already passed this endpoint (-> bounced back block)
542        // and add to blacklist for all receiver endpoints
543        let history_block_data =
544            self.block_handler.get_block_data_from_history(&block);
545        if history_block_data.is_some() {
546            for receiver in &receivers {
547                if receiver != &self.endpoint {
548                    info!(
549                        "{}: Adding socket {} to blacklist for receiver {}",
550                        self.endpoint, incoming_socket, receiver
551                    );
552                    self.endpoint_sockets_blacklist
553                        .borrow_mut()
554                        .entry(receiver.clone())
555                        .or_default()
556                        .insert(incoming_socket.clone());
557                }
558            }
559        }
560
561        // increment distance for next hop
562        block.routing_header.distance += 1;
563
564        // TODO #178: ensure ttl is >= 1
565        // decrease TTL by 1
566        block.routing_header.ttl -= 1;
567        // if ttl is 0, drop the block
568        if block.routing_header.ttl == 0 {
569            warn!("Block TTL expired. Dropping block...");
570            return;
571        }
572
573        let mut prefer_incoming_socket_for_bounce_back = false;
574        // if we are the original sender of the block, don't send again (prevent loop) and send
575        // bounce back block with all receivers
576        let res = {
577            if block.routing_header.sender == self.endpoint {
578                // if not bounce back block, directly send back to incoming socket (prevent loop)
579                prefer_incoming_socket_for_bounce_back =
580                    !block.is_bounce_back();
581                Err(receivers.to_vec())
582            } else {
583                let mut excluded_sockets = vec![incoming_socket.clone()];
584                if let Some(BlockHistoryData {
585                    original_socket_uuid: Some(original_socket_uuid),
586                }) = &history_block_data
587                {
588                    excluded_sockets.push(original_socket_uuid.clone())
589                }
590                self.send_block(block.clone(), excluded_sockets, forked)
591            }
592        };
593
594        // send block for unreachable endpoints back to the sender
595        if let Err(unreachable_endpoints) = res {
596            // try to send back to original socket
597            // if already in history, get original socket from history
598            // otherwise, directly send back to the incoming socket
599            let send_back_socket = if !prefer_incoming_socket_for_bounce_back
600                && let Some(history_block_data) = history_block_data
601            {
602                history_block_data.original_socket_uuid
603            } else {
604                Some(incoming_socket.clone())
605            };
606
607            // If a send_back_socket is set, the original block is not from this endpoint,
608            // so we can send it back to the original socket
609            if let Some(send_back_socket) = send_back_socket {
610                // never send a bounce back block back again to the incoming socket
611                if block.is_bounce_back() && send_back_socket == incoming_socket
612                {
613                    warn!(
614                        "{}: Tried to send bounce back block back to incoming socket, but this is not allowed",
615                        self.endpoint
616                    );
617                } else if self
618                    .get_socket_by_uuid(&send_back_socket)
619                    .lock()
620                    .unwrap()
621                    .can_send()
622                {
623                    block.set_bounce_back(true);
624                    self.send_block_to_endpoints_via_socket(
625                        block,
626                        &send_back_socket,
627                        &unreachable_endpoints,
628                        if forked { Some(0) } else { None },
629                    )
630                } else {
631                    error!(
632                        "Tried to send bounce back block, but cannot send back to incoming socket"
633                    )
634                }
635            }
636            // Otherwise, the block originated from this endpoint, we can just call send again
637            // and try to send it via other remaining sockets that are not on the blacklist for the
638            // block receiver
639            else {
640                self.send_block(block, vec![], forked).unwrap_or_else(|_| {
641                    error!(
642                        "Failed to send out block to {}",
643                        unreachable_endpoints
644                            .iter()
645                            .map(|e| e.to_string())
646                            .join(",")
647                    );
648                });
649            }
650        }
651    }
652
653    /// Validates a block including it's signature if set
654    /// TODO #378 @Norbert
655    fn validate_block(&self, block: &DXBBlock) -> bool {
656        // TODO #179 check for creation time, withdraw if too old (TBD) or in the future
657
658        let is_signed =
659            block.routing_header.flags.signature_type() != SignatureType::None;
660
661        match is_signed {
662            true => {
663                // TODO #180: verify signature and abort if invalid
664                // Check if signature is following in some later block and add them to
665                // a pool of incoming blocks awaiting some signature
666                true
667            }
668            false => {
669                let endpoint = block.routing_header.sender.clone();
670                let is_trusted = {
671                    cfg_if::cfg_if! {
672                        if #[cfg(feature = "debug")] {
673                            use crate::runtime::global_context::get_global_context;
674                            get_global_context().debug_flags.allow_unsigned_blocks
675                        }
676                        else {
677                            // TODO #181 Check if the sender is trusted (endpoint + interface) connection
678                            false
679                        }
680                    }
681                };
682                match is_trusted {
683                    true => true,
684                    false => {
685                        warn!(
686                            "Block received by {endpoint} is not signed. Dropping block..."
687                        );
688                        false
689                    }
690                }
691            }
692        }
693    }
694
695    /// Registers a new endpoint that is reachable over the socket if the socket is not
696    /// already registered, it will be added to the socket list.
697    /// If the provided endpoint is not the same as the socket endpoint, it is registered
698    /// as an indirect socket to the endpoint
699    pub fn register_socket_endpoint(
700        &self,
701        socket: Arc<Mutex<ComInterfaceSocket>>,
702        endpoint: Endpoint,
703        distance: i8,
704    ) -> Result<(), SocketEndpointRegistrationError> {
705        log::info!(
706            "{} registering endpoint {} for socket {}",
707            self.endpoint,
708            endpoint,
709            socket.lock().unwrap().uuid
710        );
711        let socket_ref = socket.lock().unwrap();
712
713        // if the registered endpoint is the same as the socket endpoint,
714        // this is a direct socket to the endpoint
715        let is_direct = socket_ref.direct_endpoint == Some(endpoint.clone());
716
717        // cannot register endpoint if socket is not connected
718        if !socket_ref.state.is_open() {
719            return Err(SocketEndpointRegistrationError::SocketDisconnected);
720        }
721
722        // check if the socket is already registered for the endpoint
723        if let Some(entries) = self.endpoint_sockets.borrow().get(&endpoint)
724            && entries
725                .iter()
726                .any(|(socket_uuid, _)| socket_uuid == &socket_ref.uuid)
727        {
728            return Err(SocketEndpointRegistrationError::SocketEndpointAlreadyRegistered);
729        }
730
731        let socket_uuid = socket_ref.uuid.clone();
732        let channel_factor = socket_ref.channel_factor;
733        let direction = socket_ref.direction.clone();
734        drop(socket_ref);
735
736        // add endpoint to socket endpoint list
737        self.add_socket_endpoint(&socket_uuid, endpoint.clone());
738
739        // add socket to endpoint socket list
740        self.add_endpoint_socket(
741            &endpoint,
742            socket_uuid,
743            distance,
744            is_direct,
745            channel_factor,
746            direction,
747        );
748
749        // resort sockets for endpoint
750        self.sort_sockets(&endpoint);
751
752        Ok(())
753    }
754
755    /// Waits for all background tasks scheduled by the update() function to finish
756    /// This includes block flushes from `flush_outgoing_blocks()`
757    /// and interface (re)-connections from `update_interfaces()`
758    pub async fn wait_for_update_async(&self) {
759        loop {
760            let mut is_done = true;
761            for interface in self.interfaces.borrow().values() {
762                let interface = interface.0.clone();
763                let interface = interface.borrow_mut();
764                let outgoing_blocks_count =
765                    interface.get_info().outgoing_blocks_count.get();
766                // blocks are still sent out on this interface
767                if outgoing_blocks_count > 0 {
768                    is_done = false;
769                    break;
770                }
771                // interface is still in connection task
772                if interface.get_state() == ComInterfaceState::Connecting {
773                    is_done = false;
774                    break;
775                }
776            }
777            if is_done {
778                break;
779            }
780            sleep(Duration::from_millis(10)).await;
781        }
782    }
783
784    /// Updates all sockets and interfaces,
785    /// collecting incoming data and sending out queued blocks.
786    /// In contrast to the update() function, this function is asynchronous
787    /// and will wait for all background tasks scheduled by the update() function to finish
788    pub async fn update_async(&self) {
789        self.update();
790        self.wait_for_update_async().await;
791    }
792
793    /// Adds a socket to the socket list for a specific endpoint,
794    /// attaching metadata as DynamicEndpointProperties
795    fn add_endpoint_socket(
796        &self,
797        endpoint: &Endpoint,
798        socket_uuid: ComInterfaceSocketUUID,
799        distance: i8,
800        is_direct: bool,
801        channel_factor: u32,
802        direction: InterfaceDirection,
803    ) {
804        let mut endpoint_sockets = self.endpoint_sockets.borrow_mut();
805        if !endpoint_sockets.contains_key(endpoint) {
806            endpoint_sockets.insert(endpoint.clone(), Vec::new());
807        }
808
809        let endpoint_sockets = endpoint_sockets.get_mut(endpoint).unwrap();
810        endpoint_sockets.push((
811            socket_uuid,
812            DynamicEndpointProperties {
813                known_since: Time::now(),
814                distance,
815                is_direct,
816                channel_factor,
817                direction,
818            },
819        ));
820    }
821
822    /// Adds a socket to the socket list.
823    /// If the priority is not set to `InterfacePriority::None`, the socket
824    /// is also registered as a fallback socket for outgoing connections with the
825    /// specified priority.
826    fn add_socket(
827        &self,
828        socket: Arc<Mutex<ComInterfaceSocket>>,
829        priority: InterfacePriority,
830    ) {
831        let socket_ref = socket.lock().unwrap();
832        let socket_uuid = socket_ref.uuid.clone();
833        if self.sockets.borrow().contains_key(&socket_ref.uuid) {
834            panic!("Socket {} already exists in ComHub", socket_ref.uuid);
835        }
836
837        // info!(
838        //     "Adding socket {} to ComHub with priority {:?}",
839        //     socket_ref.uuid, priority
840        // );
841
842        if !socket_ref.can_send() && priority != InterfacePriority::None {
843            panic!(
844                "Socket {} cannot be used for fallback routing, since it has no send capability",
845                socket_ref.uuid
846            );
847        }
848        let direction = socket_ref.direction.clone();
849
850        self.sockets
851            .borrow_mut()
852            .insert(socket_ref.uuid.clone(), (socket.clone(), HashSet::new()));
853
854        // add outgoing socket to fallback sockets list if they have a priority flag
855        if socket_ref.can_send() {
856            match priority {
857                InterfacePriority::None => {
858                    // do nothing
859                }
860                InterfacePriority::Priority(priority) => {
861                    // add socket to fallback sockets list
862                    self.add_fallback_socket(&socket_uuid, priority, direction);
863                }
864            }
865
866            // send empty block to socket to say hello
867            let mut block: DXBBlock = DXBBlock::default();
868            block
869                .block_header
870                .flags_and_timestamp
871                .set_block_type(BlockType::Hello);
872            block
873                .routing_header
874                .flags
875                .set_signature_type(SignatureType::Unencrypted);
876            // TODO #182 include fingerprint of the own public key into body
877
878            let block = self.prepare_own_block(block);
879
880            drop(socket_ref);
881            self.send_block_to_endpoints_via_socket(
882                block,
883                &socket_uuid,
884                &[Endpoint::ANY],
885                None,
886            );
887        }
888    }
889
890    /// Registers a socket as a fallback socket for outgoing connections
891    /// that can be used if no known route exists for an endpoint
892    /// Note: only sockets that support sending data should be used as fallback sockets
893    fn add_fallback_socket(
894        &self,
895        socket_uuid: &ComInterfaceSocketUUID,
896        priority: u16,
897        direction: InterfaceDirection,
898    ) {
899        // add to vec
900        let mut fallback_sockets = self.fallback_sockets.borrow_mut();
901        fallback_sockets.push((socket_uuid.clone(), priority, direction));
902        // first sort by direction (InOut before Out - only In is not allowed)
903        // second sort by priority
904        fallback_sockets.sort_by_key(|(_, priority, direction)| {
905            let dir_rank = match direction {
906                InterfaceDirection::InOut => 0,
907                InterfaceDirection::Out => 1,
908                InterfaceDirection::In => {
909                    panic!("Socket {socket_uuid} is not allowed to be used as fallback socket")
910                }
911            };
912            (dir_rank, std::cmp::Reverse(*priority))
913        });
914    }
915
916    /// Removes a socket from the socket list
917    fn delete_socket(&self, socket_uuid: &ComInterfaceSocketUUID) {
918        self.sockets
919            .borrow_mut()
920            .remove(socket_uuid)
921            .or_else(|| panic!("Socket {socket_uuid} not found in ComHub"));
922
923        // remove socket from endpoint socket list
924        // remove endpoint key from endpoint_sockets if not sockets present
925        self.endpoint_sockets.borrow_mut().retain(|_, sockets| {
926            sockets.retain(|(uuid, _)| uuid != socket_uuid);
927            !sockets.is_empty()
928        });
929
930        // remove socket if it is the default socket
931        self.fallback_sockets
932            .borrow_mut()
933            .retain(|(uuid, _, _)| uuid != socket_uuid);
934    }
935
936    /// Adds an endpoint to the endpoint list of a specific socket
937    fn add_socket_endpoint(
938        &self,
939        socket_uuid: &ComInterfaceSocketUUID,
940        endpoint: Endpoint,
941    ) {
942        assert!(
943            self.sockets.borrow().contains_key(socket_uuid),
944            "Socket not found in ComHub"
945        );
946        // add endpoint to socket endpoint list
947        self.sockets
948            .borrow_mut()
949            .get_mut(socket_uuid)
950            .unwrap()
951            .1
952            .insert(endpoint.clone());
953    }
954
955    /// Sorts the sockets for an endpoint:
956    /// - socket with send capability first
957    /// - then direct sockets
958    /// - then sort by channel channel_factor (latency, bandwidth)
959    /// - then sort by socket connect_timestamp
960    ///
961    /// When the global debug flag `enable_deterministic_behavior` is set,
962    /// Sockets are not sorted by their connect_timestamp to make sure that the order of
963    /// received blocks has no effect on the routing priorities
964    fn sort_sockets(&self, endpoint: &Endpoint) {
965        let mut endpoint_sockets = self.endpoint_sockets.borrow_mut();
966        let sockets = endpoint_sockets.get_mut(endpoint).unwrap();
967
968        sockets.sort_by(|(_, a), (_, b)| {
969            // sort by channel_factor
970            b.is_direct
971                .cmp(&a.is_direct)
972                .then_with(|| b.channel_factor.cmp(&a.channel_factor))
973                .then_with(|| b.distance.cmp(&a.distance))
974                .then_with(
975                    || {
976                        cfg_if::cfg_if! {
977                            if #[cfg(feature = "debug")] {
978                                use crate::runtime::global_context::get_global_context;
979                                use std::cmp::Ordering;
980                                if get_global_context().debug_flags.enable_deterministic_behavior {
981                                    Ordering::Equal
982                                }
983                                else {
984                                    b.known_since.cmp(&a.known_since)
985                                }
986                            }
987                            else {
988                                b.known_since.cmp(&a.known_since)
989                            }
990                        }
991                    }
992                )
993
994        });
995    }
996
997    /// Returns the socket for a given UUID
998    /// The socket must be registered in the ComHub,
999    /// otherwise a panic will be triggered
1000    pub(crate) fn get_socket_by_uuid(
1001        &self,
1002        socket_uuid: &ComInterfaceSocketUUID,
1003    ) -> Arc<Mutex<ComInterfaceSocket>> {
1004        self.sockets
1005            .borrow()
1006            .get(socket_uuid)
1007            .map(|socket| socket.0.clone())
1008            .unwrap_or_else(|| {
1009                panic!("Socket for uuid {socket_uuid} not found")
1010            })
1011    }
1012
1013    /// Returns the com interface for a given UUID
1014    /// The interface must be registered in the ComHub,
1015    /// otherwise a panic will be triggered
1016    pub(crate) fn get_com_interface_by_uuid(
1017        &self,
1018        interface_uuid: &ComInterfaceUUID,
1019    ) -> Rc<RefCell<dyn ComInterface>> {
1020        self.interfaces
1021            .borrow()
1022            .get(interface_uuid)
1023            .unwrap_or_else(|| {
1024                panic!("Interface for uuid {interface_uuid} not found")
1025            })
1026            .0
1027            .clone()
1028    }
1029
1030    /// Returns the com interface for a given socket UUID
1031    /// The interface and socket must be registered in the ComHub,
1032    /// otherwise a panic will be triggered
1033    pub(crate) fn get_com_interface_from_socket_uuid(
1034        &self,
1035        socket_uuid: &ComInterfaceSocketUUID,
1036    ) -> Rc<RefCell<dyn ComInterface>> {
1037        let socket = self.get_socket_by_uuid(socket_uuid);
1038        let socket = socket.lock().unwrap();
1039        self.get_com_interface_by_uuid(&socket.interface_uuid)
1040    }
1041
1042    /// Returns an iterator over all sockets for a given endpoint
1043    /// The sockets are yielded in the order of their priority, starting with the
1044    /// highest priority socket (the best socket for sending data to the endpoint)
1045    fn iterate_endpoint_sockets<'a>(
1046        &'a self,
1047        endpoint: &'a Endpoint,
1048        options: EndpointIterateOptions<'a>,
1049    ) -> impl Iterator<Item = ComInterfaceSocketUUID> + 'a {
1050        std::iter::from_coroutine(
1051            #[coroutine]
1052            move || {
1053                let endpoint_sockets_borrow = self.endpoint_sockets.borrow();
1054                // TODO #183: can we optimize this to avoid cloning the endpoint_sockets vector?
1055                let endpoint_sockets =
1056                    endpoint_sockets_borrow.get(endpoint).cloned();
1057                if endpoint_sockets.is_none() {
1058                    return;
1059                }
1060                for (socket_uuid, _) in endpoint_sockets.unwrap() {
1061                    {
1062                        let socket = self.get_socket_by_uuid(&socket_uuid);
1063                        let socket = socket.lock().unwrap();
1064
1065                        // check if only_direct is set and the endpoint equals the direct endpoint of the socket
1066                        if options.only_direct
1067                            && socket.direct_endpoint.is_some()
1068                            && socket.direct_endpoint.as_ref().unwrap()
1069                                == endpoint
1070                        {
1071                            debug!(
1072                                "No direct socket found for endpoint {endpoint}. Skipping..."
1073                            );
1074                            continue;
1075                        }
1076
1077                        // check if the socket is excluded if exclude_socket is set
1078                        if options.exclude_sockets.contains(&socket.uuid) {
1079                            debug!(
1080                                "Socket {} is excluded for endpoint {}. Skipping...",
1081                                socket.uuid, endpoint
1082                            );
1083                            continue;
1084                        }
1085
1086                        // TODO #184 optimize and separate outgoing/non-outgoing sockets for endpoint
1087                        // only yield outgoing sockets
1088                        // if a non-outgoing socket is found, all following sockets
1089                        // will also be non-outgoing
1090                        if !socket.can_send() {
1091                            info!(
1092                                "Socket {} is not outgoing for endpoint {}. Skipping...",
1093                                socket.uuid, endpoint
1094                            );
1095                            return;
1096                        }
1097                    }
1098
1099                    debug!(
1100                        "Found matching socket {socket_uuid} for endpoint {endpoint}"
1101                    );
1102                    yield socket_uuid.clone()
1103                }
1104            },
1105        )
1106    }
1107
1108    /// Finds the best matching socket over which an endpoint is known to be reachable.
1109    fn find_known_endpoint_socket(
1110        &self,
1111        endpoint: &Endpoint,
1112        exclude_socket: &[ComInterfaceSocketUUID],
1113    ) -> Option<ComInterfaceSocketUUID> {
1114        match endpoint.instance {
1115            // find socket for any endpoint instance
1116            EndpointInstance::Any => {
1117                let options = EndpointIterateOptions {
1118                    only_direct: false,
1119                    exact_instance: false,
1120                    exclude_sockets: exclude_socket,
1121                };
1122                if let Some(socket) =
1123                    self.iterate_endpoint_sockets(endpoint, options).next()
1124                {
1125                    return Some(socket);
1126                }
1127                None
1128            }
1129
1130            // find socket for exact instance
1131            EndpointInstance::Instance(_) => {
1132                // iterate over all sockets of all interfaces
1133                let options = EndpointIterateOptions {
1134                    only_direct: false,
1135                    exact_instance: true,
1136                    exclude_sockets: exclude_socket,
1137                };
1138                if let Some(socket) =
1139                    self.iterate_endpoint_sockets(endpoint, options).next()
1140                {
1141                    return Some(socket);
1142                }
1143                None
1144            }
1145
1146            // TODO #185: how to handle broadcasts?
1147            EndpointInstance::All => {
1148                todo!("#186 Undescribed by author.")
1149            }
1150        }
1151    }
1152
1153    /// Finds the best socket over which to send a block to an endpoint.
1154    /// If a known socket is found, it is returned, otherwise the default socket is returned, if it
1155    /// exists and is not excluded.
1156    fn find_best_endpoint_socket(
1157        &self,
1158        endpoint: &Endpoint,
1159        exclude_sockets: &[ComInterfaceSocketUUID],
1160    ) -> Option<ComInterfaceSocketUUID> {
1161        // if the endpoint is the same as the hub endpoint, try to find an interface
1162        // that redirects @@local
1163        if endpoint == &self.endpoint
1164            && let Some(socket) = self
1165                .find_known_endpoint_socket(&Endpoint::LOCAL, exclude_sockets)
1166        {
1167            return Some(socket);
1168        }
1169
1170        // find best known socket for endpoint
1171        let matching_socket =
1172            self.find_known_endpoint_socket(endpoint, exclude_sockets);
1173
1174        // if a matching socket is found, return it
1175        if matching_socket.is_some() {
1176            matching_socket
1177        }
1178        // otherwise, return the highest priority socket that is not excluded
1179        else {
1180            let sockets = self.fallback_sockets.borrow();
1181            for (socket_uuid, _, _) in sockets.iter() {
1182                let socket = self.get_socket_by_uuid(socket_uuid);
1183                info!(
1184                    "{}: Find best for {}: {} ({}); excluded:{}",
1185                    self.endpoint,
1186                    endpoint,
1187                    socket_uuid,
1188                    socket
1189                        .lock()
1190                        .unwrap()
1191                        .direct_endpoint
1192                        .clone()
1193                        .map(|e| e.to_string())
1194                        .unwrap_or("None".to_string()),
1195                    exclude_sockets.contains(socket_uuid)
1196                );
1197                if !exclude_sockets.contains(socket_uuid) {
1198                    return Some(socket_uuid.clone());
1199                }
1200            }
1201            None
1202        }
1203    }
1204
1205    /// Returns all receivers to which the block has to be sent, grouped by the
1206    /// outbound socket uuids
1207    fn get_outbound_receiver_groups(
1208        &self,
1209        // TODO #187: do we need the block here for additional information (match conditions),
1210        // otherwise receivers are enough
1211        block: &DXBBlock,
1212        mut exclude_sockets: Vec<ComInterfaceSocketUUID>,
1213    ) -> Option<Vec<(Option<ComInterfaceSocketUUID>, Vec<Endpoint>)>> {
1214        let receivers = block.receiver_endpoints();
1215
1216        if !receivers.is_empty() {
1217            let endpoint_sockets = receivers
1218                .iter()
1219                .map(|e| {
1220                    // add sockets from endpoint blacklist
1221                    if let Some(blacklist) =
1222                        self.endpoint_sockets_blacklist.borrow().get(e)
1223                    {
1224                        exclude_sockets.extend(blacklist.iter().cloned());
1225                    }
1226                    let socket =
1227                        self.find_best_endpoint_socket(e, &exclude_sockets);
1228                    (socket, e)
1229                })
1230                .group_by(|(socket, _)| socket.clone())
1231                .into_iter()
1232                .map(|(socket, group)| {
1233                    let endpoints = group
1234                        .map(|(_, endpoint)| endpoint.clone())
1235                        .collect::<Vec<_>>();
1236                    (socket, endpoints)
1237                })
1238                .collect::<Vec<_>>();
1239
1240            Some(endpoint_sockets)
1241        } else {
1242            None
1243        }
1244    }
1245
1246    /// Runs the update loop for the ComHub.
1247    /// This method will continuously handle incoming data, send out
1248    /// queued blocks and update the sockets.
1249    /// This is only used for internal tests - in a full runtime setup, the main runtime update loop triggers
1250    /// ComHub updates.
1251    pub fn _start_update_loop(self_rc: Rc<Self>) {
1252        // if already running, do nothing
1253        if *self_rc.update_loop_running.borrow() {
1254            return;
1255        }
1256
1257        // set update loop running flag
1258        *self_rc.update_loop_running.borrow_mut() = true;
1259
1260        spawn_with_panic_notify(async move {
1261            while *self_rc.update_loop_running.borrow() {
1262                self_rc.update();
1263                sleep(Duration::from_millis(1)).await;
1264            }
1265            if let Some(sender) =
1266                self_rc.update_loop_stop_sender.borrow_mut().take()
1267            {
1268                sender.send(()).expect("Failed to send stop signal");
1269            }
1270        });
1271    }
1272
1273    /// Update all sockets and interfaces,
1274    /// collecting incoming data and sending out queued blocks.
1275    /// Updates are scheduled in local tasks and are not immediately visible.
1276    /// To wait for the block update to finish, use `wait_for_update_async()`.
1277    pub fn update(&self) {
1278        // update all interfaces
1279        self.update_interfaces();
1280
1281        // update own socket lists for routing
1282        self.update_sockets();
1283
1284        // update sockets block collectors
1285        self.collect_incoming_data();
1286
1287        // receive blocks from all sockets
1288        self.receive_incoming_blocks();
1289
1290        // send all queued blocks from all interfaces
1291        self.flush_outgoing_blocks();
1292    }
1293
1294    /// Prepares a block for sending out by updating the creation timestamp,
1295    /// sender and add signature and encryption if needed.
1296    /// TODO #379 @Norbert
1297    fn prepare_own_block(&self, mut block: DXBBlock) -> DXBBlock {
1298        // TODO #188 signature & encryption
1299        let now = Time::now();
1300        block.routing_header.sender = self.endpoint.clone();
1301        block
1302            .block_header
1303            .flags_and_timestamp
1304            .set_creation_timestamp(now);
1305
1306        // set distance to 1
1307        block.routing_header.distance = 1;
1308        block
1309    }
1310
1311    /// Public method to send an outgoing block from this endpoint. Called by the runtime.
1312    pub fn send_own_block(
1313        &self,
1314        mut block: DXBBlock,
1315    ) -> Result<(), Vec<Endpoint>> {
1316        block = self.prepare_own_block(block);
1317        // add own outgoing block to history
1318        self.block_handler.add_block_to_history(&block, None);
1319        self.send_block(block, vec![], false)
1320    }
1321
1322    /// Sends a block and wait for a response block.
1323    /// Fix number of exact endpoints -> Expected responses are known at send time.
1324    /// TODO #189: make sure that mutating blocks are always send to specific endpoint instances (@jonas/0001), not generic endpoints like @jonas.
1325    /// @jonas -> response comes from a specific instance of @jonas/0001
1326    pub async fn send_own_block_await_response(
1327        &self,
1328        block: DXBBlock,
1329        options: ResponseOptions,
1330    ) -> Vec<Result<Response, ResponseError>> {
1331        let context_id = block.block_header.context_id;
1332        let section_index = block.block_header.section_index;
1333
1334        let has_exact_receiver_count = block.has_exact_receiver_count();
1335        let receivers = block.receiver_endpoints();
1336
1337        let res = self.send_own_block(block);
1338        let failed_endpoints = res.err().unwrap_or_default();
1339
1340        // yield
1341        #[cfg(feature = "tokio_runtime")]
1342        yield_now().await;
1343
1344        let timeout = options
1345            .timeout
1346            .unwrap_or_default(self.options.default_receive_timeout);
1347
1348        // return fixed number of responses
1349        if has_exact_receiver_count {
1350            // if resolution strategy is ReturnOnAnyError or ReturnOnFirstResult, directly return if any endpoint failed
1351            if (options.resolution_strategy
1352                == ResponseResolutionStrategy::ReturnOnAnyError
1353                || options.resolution_strategy
1354                    == ResponseResolutionStrategy::ReturnOnFirstResult)
1355                && !failed_endpoints.is_empty()
1356            {
1357                // for each failed endpoint, set NotReachable error, for all others EarlyAbort
1358                return receivers
1359                    .iter()
1360                    .map(|receiver| {
1361                        if failed_endpoints.contains(receiver) {
1362                            Err(ResponseError::NotReachable(receiver.clone()))
1363                        } else {
1364                            Err(ResponseError::EarlyAbort(receiver.clone()))
1365                        }
1366                    })
1367                    .collect::<Vec<_>>();
1368            }
1369
1370            // store received responses in map for all receivers
1371            let mut responses = HashMap::new();
1372            let mut missing_response_count = receivers.len();
1373            for receiver in &receivers {
1374                responses.insert(
1375                    receiver.clone(),
1376                    if failed_endpoints.contains(receiver) {
1377                        Err(ResponseError::NotReachable(receiver.clone()))
1378                    } else {
1379                        Err(ResponseError::NoResponseAfterTimeout(
1380                            receiver.clone(),
1381                            timeout,
1382                        ))
1383                    },
1384                );
1385            }
1386            // directly subtract number of already failed endpoints from missing responses
1387            missing_response_count -= failed_endpoints.len();
1388
1389            info!(
1390                "Waiting for responses from receivers {}",
1391                receivers
1392                    .iter()
1393                    .map(|e| e.to_string())
1394                    .collect::<Vec<_>>()
1395                    .join(",")
1396            );
1397
1398            let mut rx = self
1399                .block_handler
1400                .register_incoming_block_observer(context_id, section_index);
1401
1402            let res = task::timeout(timeout, async {
1403                while let Some(section) = rx.next().await {
1404                    let mut received_response = false;
1405                    // get sender
1406                    let mut sender = section.get_sender();
1407                    // add to response for exactly matching endpoint instance
1408                    if let Some(response) = responses.get_mut(&sender) {
1409                        // check if the receiver is already set (= current set response is Err)
1410                        if response.is_err() {
1411                            *response = Ok(Response::ExactResponse(sender.clone(), section));
1412                            missing_response_count -= 1;
1413                            info!("Received expected response from {sender}");
1414                            received_response = true;
1415                        }
1416                        // already received a response from this exact sender - this should not happen
1417                        else {
1418                            error!("Received multiple responses from the same sender: {sender}");
1419                        }
1420                    }
1421                    // add to response for matching endpoint
1422                    else if let Some(response) = responses.get_mut(&sender.any_instance_endpoint()) {
1423                        info!("Received resolved response from {} -> {}", &sender, &sender.any_instance_endpoint());
1424                        sender = sender.any_instance_endpoint();
1425                        // check if the receiver is already set (= current set response is Err)
1426                        if response.is_err() {
1427                            *response = Ok(Response::ResolvedResponse(sender.clone(), section));
1428                            missing_response_count -= 1;
1429                            received_response = true;
1430                        }
1431                        // already received a response from a matching endpoint - ignore
1432                        else {
1433                            info!("Received multiple resolved responses from the {}", &sender);
1434                        }
1435                    }
1436                    // response from unexpected sender
1437                    else {
1438                        error!("Received response from unexpected sender: {}", &sender);
1439                    }
1440
1441                    // if resolution strategy is ReturnOnFirstResult, break if any response is received
1442                    if received_response && options.resolution_strategy == ResponseResolutionStrategy::ReturnOnFirstResult {
1443                        // set all other responses to EarlyAbort
1444                        for (receiver, response) in responses.iter_mut() {
1445                            if receiver != &sender {
1446                                *response = Err(ResponseError::EarlyAbort(receiver.clone()));
1447                            }
1448                        }
1449                        break;
1450                    }
1451
1452                    // if all responses are received, break
1453                    if missing_response_count == 0 {
1454                        break;
1455                    }
1456                }
1457            }).await;
1458
1459            if res.is_err() {
1460                error!("Timeout waiting for responses");
1461            }
1462
1463            // return responses as vector
1464            responses.into_values().collect::<Vec<_>>()
1465        }
1466        // return all received responses
1467        else {
1468            let mut responses = vec![];
1469
1470            let res = task::timeout(timeout, async {
1471                let mut rx =
1472                    self.block_handler.register_incoming_block_observer(
1473                        context_id,
1474                        section_index,
1475                    );
1476                while let Some(section) = rx.next().await {
1477                    // get sender
1478                    let sender = section.get_sender();
1479                    info!("Received response from {sender}");
1480                    // add to response for exactly matching endpoint instance
1481                    responses.push(Ok(Response::UnspecifiedResponse(section)));
1482
1483                    // if resolution strategy is ReturnOnFirstResult, break if any response is received
1484                    if options.resolution_strategy
1485                        == ResponseResolutionStrategy::ReturnOnFirstResult
1486                    {
1487                        break;
1488                    }
1489                }
1490            })
1491            .await;
1492
1493            if res.is_err() {
1494                info!("Timeout waiting for responses");
1495            }
1496
1497            responses
1498        }
1499    }
1500
1501    /// Sends a block to all endpoints specified in the block header.
1502    /// The routing algorithm decides which sockets are used to send the block, based on the endpoint.
1503    /// A block can be sent to multiple endpoints at the same time over a socket or to multiple sockets for each endpoint.
1504    /// The original_socket parameter is used to prevent sending the block back to the sender.
1505    /// When this method is called, the block is queued in the send queue.
1506    /// Returns an Err with a list of unreachable endpoints if the block could not be sent to all endpoints.
1507    pub fn send_block(
1508        &self,
1509        mut block: DXBBlock,
1510        exclude_sockets: Vec<ComInterfaceSocketUUID>,
1511        forked: bool,
1512    ) -> Result<(), Vec<Endpoint>> {
1513        let outbound_receiver_groups =
1514            self.get_outbound_receiver_groups(&block, exclude_sockets);
1515
1516        if outbound_receiver_groups.is_none() {
1517            error!("No outbound receiver groups found for block");
1518            return Err(vec![]);
1519        }
1520
1521        let outbound_receiver_groups = outbound_receiver_groups.unwrap();
1522
1523        let mut unreachable_endpoints = vec![];
1524
1525        // currently only used for trace debugging (TODO: put behind debug flag)
1526        // if more than one addressed block is sent, the block is forked, thus the fork count is set to 0
1527        // for each forked block, the fork count is incremented
1528        // if only one block is sent, the block is just moved and not forked
1529        let mut fork_count = if forked || outbound_receiver_groups.len() > 1 {
1530            Some(0)
1531        } else {
1532            None
1533        };
1534
1535        block.set_bounce_back(false);
1536
1537        for (receiver_socket, endpoints) in outbound_receiver_groups {
1538            if let Some(socket_uuid) = receiver_socket {
1539                self.send_block_to_endpoints_via_socket(
1540                    block.clone(),
1541                    &socket_uuid,
1542                    &endpoints,
1543                    fork_count,
1544                );
1545            } else {
1546                error!(
1547                    "{}: cannot send block, no receiver sockets found for endpoints {:?}",
1548                    self.endpoint,
1549                    endpoints.iter().map(|e| e.to_string()).collect::<Vec<_>>()
1550                );
1551                unreachable_endpoints.extend(endpoints);
1552            }
1553            // increment fork_count if Some
1554            if let Some(count) = fork_count {
1555                fork_count = Some(count + 1);
1556            }
1557        }
1558
1559        if !unreachable_endpoints.is_empty() {
1560            return Err(unreachable_endpoints);
1561        }
1562        Ok(())
1563    }
1564
1565    /// Sends a block via a socket to a list of endpoints.
1566    /// Before the block is sent, it is modified to include the list of endpoints as receivers.
1567    fn send_block_to_endpoints_via_socket(
1568        &self,
1569        mut block: DXBBlock,
1570        socket_uuid: &ComInterfaceSocketUUID,
1571        endpoints: &[Endpoint],
1572        // currently only used for trace debugging (TODO: put behind debug flag)
1573        fork_count: Option<usize>,
1574    ) {
1575        block.set_receivers(endpoints);
1576
1577        // assuming the distance was already increment during redirect, we
1578        // effectively decrement the block distance by 1 if it is a bounce back
1579        if block.is_bounce_back() {
1580            block.routing_header.distance -= 2;
1581        }
1582
1583        // if type is Trace or TraceBack, add the outgoing socket to the hops
1584        match block.block_header.flags_and_timestamp.block_type() {
1585            BlockType::Trace | BlockType::TraceBack => {
1586                let distance = block.routing_header.distance;
1587                let new_fork_nr = self.calculate_fork_nr(&block, fork_count);
1588                let bounce_back = block.is_bounce_back();
1589
1590                self.add_hop_to_block_trace_data(
1591                    &mut block,
1592                    NetworkTraceHop {
1593                        endpoint: self.endpoint.clone(),
1594                        distance,
1595                        socket: NetworkTraceHopSocket::new(
1596                            self.get_com_interface_from_socket_uuid(
1597                                socket_uuid,
1598                            )
1599                            .borrow_mut()
1600                            .get_properties(),
1601                            socket_uuid.clone(),
1602                        ),
1603                        direction: NetworkTraceHopDirection::Outgoing,
1604                        fork_nr: new_fork_nr,
1605                        bounce_back,
1606                    },
1607                );
1608            }
1609            _ => {}
1610        }
1611
1612        let socket = self.get_socket_by_uuid(socket_uuid);
1613        let mut socket_ref = socket.lock().unwrap();
1614
1615        let is_broadcast = endpoints
1616            .iter()
1617            .any(|e| e == &Endpoint::ANY_ALL_INSTANCES || e == &Endpoint::ANY);
1618
1619        if is_broadcast
1620            && let Some(direct_endpoint) = &socket_ref.direct_endpoint
1621            && (direct_endpoint == &self.endpoint
1622                || direct_endpoint == &Endpoint::LOCAL)
1623        {
1624            return;
1625        }
1626
1627        match &block.to_bytes() {
1628            Ok(bytes) => {
1629                info!(
1630                    "Sending block to socket {}: {}",
1631                    socket_uuid,
1632                    endpoints.iter().map(|e| e.to_string()).join(", ")
1633                );
1634
1635                // TODO #190: resend block if socket failed to send
1636                socket_ref.queue_outgoing_block(bytes);
1637            }
1638            Err(err) => {
1639                error!("Failed to convert block to bytes: {err:?}");
1640            }
1641        }
1642    }
1643
1644    /// Updates all interfaces to handle reconnections if the interface can be reconnected
1645    /// or remove the interface if it cannot be reconnected.
1646    fn update_interfaces(&self) {
1647        let mut to_remove = Vec::new();
1648        for (interface, _) in self.interfaces.borrow().values() {
1649            let uuid = interface.borrow().get_uuid().clone();
1650            let state = interface.borrow().get_state();
1651
1652            // If the interface has been proactively destroyed, remove it from the hub
1653            // and clean up the sockets. This happens when the user calls the destroy
1654            // method on the interface and not the remove_interface on the ComHub.
1655            if state.is_destroyed() {
1656                info!("Destroying interface on the ComHub {uuid}");
1657                to_remove.push(uuid);
1658            } else if state.is_not_connected()
1659                && interface.borrow_mut().get_properties().shall_reconnect()
1660            {
1661                // If the interface is disconnected and the interface has
1662                // reconnection enabled, check if the interface should be reconnected
1663                let interface_rc = interface.clone();
1664                let mut interface = interface.borrow_mut();
1665
1666                let already_connecting =
1667                    interface.get_state() == ComInterfaceState::Connecting;
1668
1669                if !already_connecting {
1670                    let config = interface.get_properties_mut();
1671
1672                    let reconnect_now = match &config.reconnection_config {
1673                        ReconnectionConfig::InstantReconnect => true,
1674                        ReconnectionConfig::ReconnectWithTimeout { timeout } => {
1675                            ReconnectionConfig::check_reconnect_timeout(
1676                                config.close_timestamp,
1677                                timeout,
1678                            )
1679                        }
1680                        ReconnectionConfig::ReconnectWithTimeoutAndAttempts {
1681                            timeout,
1682                            attempts,
1683                        } => {
1684                            let max_attempts = attempts;
1685
1686                            // check if the attempts are not exceeded
1687                            let attempts = config.reconnect_attempts.unwrap_or(0);
1688                            let attempts = attempts + 1;
1689                            if attempts > *max_attempts {
1690                                to_remove.push(uuid.clone());
1691                                return;
1692                            }
1693
1694                            config.reconnect_attempts = Some(attempts);
1695
1696                            ReconnectionConfig::check_reconnect_timeout(
1697                                config.close_timestamp,
1698                                timeout,
1699                            )
1700                        }
1701                        ReconnectionConfig::NoReconnect => false,
1702                    };
1703                    if reconnect_now {
1704                        debug!("Reconnecting interface {uuid}");
1705                        interface.set_state(ComInterfaceState::Connecting);
1706                        spawn_with_panic_notify(async move {
1707                            let interface = interface_rc.clone();
1708                            let mut interface = interface.borrow_mut();
1709
1710                            let config = interface.get_properties_mut();
1711                            config.close_timestamp = None;
1712
1713                            let current_attempts =
1714                                config.reconnect_attempts.unwrap_or(0);
1715                            config.reconnect_attempts =
1716                                Some(current_attempts + 1);
1717
1718                            let res = interface.handle_open().await;
1719                            if res {
1720                                interface
1721                                    .set_state(ComInterfaceState::Connected);
1722                                // config.reconnect_attempts = None;
1723                            } else {
1724                                interface
1725                                    .set_state(ComInterfaceState::NotConnected);
1726                            }
1727                        });
1728                    } else {
1729                        debug!("Not reconnecting interface {uuid}");
1730                    }
1731                }
1732            }
1733        }
1734
1735        for uuid in to_remove {
1736            self.cleanup_interface(uuid);
1737        }
1738    }
1739
1740    /// Updates all known sockets for all interfaces to update routing
1741    /// information, remove deleted sockets and add new sockets and endpoint relations
1742    fn update_sockets(&self) {
1743        let mut new_sockets = Vec::new();
1744        let mut deleted_sockets = Vec::new();
1745        let mut registered_sockets = Vec::new();
1746
1747        for (interface, priority) in self.interfaces.borrow().values() {
1748            let socket_updates = interface.clone().borrow().get_sockets();
1749            let mut socket_updates = socket_updates.lock().unwrap();
1750
1751            registered_sockets
1752                .extend(socket_updates.socket_registrations.drain(..));
1753            new_sockets.extend(
1754                socket_updates.new_sockets.drain(..).map(|s| (s, *priority)),
1755            );
1756            deleted_sockets.extend(socket_updates.deleted_sockets.drain(..));
1757        }
1758
1759        for (socket, priority) in new_sockets {
1760            self.add_socket(socket.clone(), priority);
1761        }
1762        for socket_uuid in deleted_sockets {
1763            self.delete_socket(&socket_uuid);
1764        }
1765        for (socket_uuid, distance, endpoint) in registered_sockets {
1766            let socket = self.get_socket_by_uuid(&socket_uuid);
1767            self.register_socket_endpoint(socket, endpoint.clone(), distance)
1768                .unwrap_or_else(|e| {
1769                    error!(
1770                        "Failed to register socket {socket_uuid} for endpoint {endpoint} {e:?}"
1771                    );
1772                });
1773        }
1774    }
1775
1776    /// Collects incoming data slices from all sockets. The sockets will call their
1777    /// BlockCollector to collect the data into blocks.
1778    fn collect_incoming_data(&self) {
1779        // update sockets, collect incoming data into full blocks
1780        for (socket, _) in self.sockets.borrow().values() {
1781            let mut socket_ref = socket.lock().unwrap();
1782            socket_ref.collect_incoming_data();
1783        }
1784    }
1785
1786    /// Collects all blocks from the receive queues of all sockets and process them
1787    /// in the receive_block method.
1788    fn receive_incoming_blocks(&self) {
1789        let mut blocks = vec![];
1790        // iterate over all sockets
1791        for (socket, _) in self.sockets.borrow().values() {
1792            let mut socket_ref = socket.lock().unwrap();
1793            let uuid = socket_ref.uuid.clone();
1794            let block_queue = socket_ref.get_incoming_block_queue();
1795            blocks.push((uuid, block_queue.drain(..).collect::<Vec<_>>()));
1796        }
1797
1798        for (uuid, blocks) in blocks {
1799            for block in blocks.iter() {
1800                self.receive_block(block, uuid.clone());
1801            }
1802        }
1803    }
1804
1805    /// Sends all queued blocks from all interfaces.
1806    fn flush_outgoing_blocks(&self) {
1807        let interfaces = self.interfaces.borrow();
1808        for (interface, _) in interfaces.values() {
1809            com_interface::flush_outgoing_blocks(interface.clone());
1810        }
1811    }
1812}
1813
1814#[derive(Default, PartialEq, Debug)]
1815pub enum ResponseResolutionStrategy {
1816    /// Promise.allSettled
1817    /// - For know fixed receivers:
1818    ///   return after all known sends are finished (either success or error
1819    ///   if block could not be sent / timed out)
1820    /// - For unknown receiver count:
1821    ///   return after timeout
1822    #[default]
1823    ReturnAfterAllSettled,
1824
1825    /// Promise.all
1826    /// - For know fixed receivers:
1827    ///   return after all known sends are finished successfully
1828    ///   return immediately if one send fails early (e.g. endpoint not reachable)
1829    /// - For unknown receiver count:
1830    ///   return after timeout
1831    ///
1832    ReturnOnAnyError,
1833
1834    /// Promise.any
1835    /// Return after first successful response received
1836    ReturnOnFirstResponse,
1837
1838    /// Promise.race
1839    /// Return after first response received (success or error)
1840    ReturnOnFirstResult,
1841}
1842
1843#[derive(Default, Debug)]
1844pub enum ResponseTimeout {
1845    #[default]
1846    Default,
1847    Custom(Duration),
1848}
1849
1850impl ResponseTimeout {
1851    pub fn unwrap_or_default(self, default: Duration) -> Duration {
1852        match self {
1853            ResponseTimeout::Default => default,
1854            ResponseTimeout::Custom(timeout) => timeout,
1855        }
1856    }
1857}
1858
1859#[derive(Default, Debug)]
1860pub struct ResponseOptions {
1861    pub resolution_strategy: ResponseResolutionStrategy,
1862    pub timeout: ResponseTimeout,
1863}
1864
1865impl ResponseOptions {
1866    pub fn new_with_resolution_strategy(
1867        resolution_strategy: ResponseResolutionStrategy,
1868    ) -> Self {
1869        Self {
1870            resolution_strategy,
1871            ..ResponseOptions::default()
1872        }
1873    }
1874
1875    pub fn new_with_timeout(timeout: Duration) -> Self {
1876        Self {
1877            timeout: ResponseTimeout::Custom(timeout),
1878            ..ResponseOptions::default()
1879        }
1880    }
1881}
1882
1883#[derive(Debug)]
1884pub enum Response {
1885    ExactResponse(Endpoint, IncomingSection),
1886    ResolvedResponse(Endpoint, IncomingSection),
1887    UnspecifiedResponse(IncomingSection),
1888}
1889
1890impl Response {
1891    pub fn take_incoming_section(self) -> IncomingSection {
1892        match self {
1893            Response::ExactResponse(_, section) => section,
1894            Response::ResolvedResponse(_, section) => section,
1895            Response::UnspecifiedResponse(section) => section,
1896        }
1897    }
1898}
1899
1900#[derive(Debug)]
1901pub enum ResponseError {
1902    NoResponseAfterTimeout(Endpoint, Duration),
1903    NotReachable(Endpoint),
1904    EarlyAbort(Endpoint),
1905}
1906
1907impl Display for ResponseError {
1908    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1909        match self {
1910            ResponseError::NoResponseAfterTimeout(endpoint, duration) => {
1911                write!(
1912                    f,
1913                    "No response after timeout ({}s) for endpoint {}",
1914                    duration.as_secs(),
1915                    endpoint
1916                )
1917            }
1918            ResponseError::NotReachable(endpoint) => {
1919                write!(f, "Endpoint {endpoint} is not reachable")
1920            }
1921            ResponseError::EarlyAbort(endpoint) => {
1922                write!(f, "Early abort for endpoint {endpoint}")
1923            }
1924        }
1925    }
1926}