datex_core/network/
com_hub.rs

1use crate::global::protocol_structures::block_header::BlockType;
2use crate::global::protocol_structures::routing_header::SignatureType;
3use crate::stdlib::boxed::Box;
4use crate::stdlib::{cell::RefCell, rc::Rc};
5use crate::task::{self, sleep, spawn_with_panic_notify};
6use crate::utils::time::Time;
7use core::prelude::rust_2024::*;
8use core::result::Result;
9use futures::channel::oneshot::Sender;
10use itertools::Itertools;
11use log::{debug, error, info, warn};
12use core::cmp::PartialEq;
13use crate::collections::{HashMap, HashSet};
14use core::fmt::{Debug, Display, Formatter};
15use crate::stdlib::sync::{Arc};
16use crate::std_sync::Mutex;
17use core::time::Duration;
18#[cfg(feature = "tokio_runtime")]
19use tokio::task::yield_now;
20use crate::stdlib::vec::Vec;
21use crate::stdlib::vec;
22use crate::stdlib::string::String;
23use crate::stdlib::string::ToString;
24use super::com_interfaces::com_interface::{
25    self, ComInterfaceError, ComInterfaceState
26};
27use super::com_interfaces::{
28    com_interface::ComInterface, com_interface_socket::ComInterfaceSocket,
29};
30use crate::values::core_values::endpoint::{Endpoint, EndpointInstance};
31use crate::global::dxb_block::{DXBBlock, IncomingSection};
32use crate::network::block_handler::{BlockHandler, BlockHistoryData};
33use crate::network::com_hub_network_tracing::{NetworkTraceHop, NetworkTraceHopDirection, NetworkTraceHopSocket};
34use crate::network::com_interfaces::com_interface::ComInterfaceUUID;
35use crate::network::com_interfaces::com_interface_properties::{
36    InterfaceDirection, ReconnectionConfig,
37};
38use crate::network::com_interfaces::com_interface_socket::ComInterfaceSocketUUID;
39use crate::network::com_interfaces::default_com_interfaces::local_loopback_interface::LocalLoopbackInterface;
40use crate::runtime::AsyncContext;
41use crate::values::value_container::ValueContainer;
42
43#[derive(Debug, Clone)]
44pub struct DynamicEndpointProperties {
45    pub known_since: u64,
46    pub distance: i8,
47    pub is_direct: bool,
48    pub channel_factor: u32,
49    pub direction: InterfaceDirection,
50}
51
52pub type ComInterfaceFactoryFn =
53    fn(
54        setup_data: ValueContainer,
55    ) -> Result<Rc<RefCell<dyn ComInterface>>, ComInterfaceError>;
56
57#[derive(Debug)]
58pub struct ComHubOptions {
59    default_receive_timeout: Duration,
60}
61
62impl Default for ComHubOptions {
63    fn default() -> Self {
64        ComHubOptions {
65            default_receive_timeout: Duration::from_secs(5),
66        }
67    }
68}
69
70type SocketMap = HashMap<
71    ComInterfaceSocketUUID,
72    (Arc<Mutex<ComInterfaceSocket>>, HashSet<Endpoint>),
73>;
74type InterfaceMap = HashMap<
75    ComInterfaceUUID,
76    (Rc<RefCell<dyn ComInterface>>, InterfacePriority),
77>;
78
79pub type IncomingBlockInterceptor =
80    Box<dyn Fn(&DXBBlock, &ComInterfaceSocketUUID) + 'static>;
81
82pub type OutgoingBlockInterceptor =
83    Box<dyn Fn(&DXBBlock, &ComInterfaceSocketUUID, &[Endpoint]) + 'static>;
84
85pub struct ComHub {
86    /// the runtime endpoint of the hub (@me)
87    pub endpoint: Endpoint,
88
89    pub async_context: AsyncContext,
90
91    /// ComHub configuration options
92    pub options: ComHubOptions,
93
94    /// a list of all available interface factories, keyed by their interface type
95    pub interface_factories: RefCell<HashMap<String, ComInterfaceFactoryFn>>,
96
97    /// a list of all available interfaces, keyed by their UUID
98    pub interfaces: RefCell<InterfaceMap>,
99
100    /// a list of all available sockets, keyed by their UUID
101    /// contains the socket itself and a list of endpoints currently associated with it
102    pub sockets: RefCell<SocketMap>,
103
104    /// a blacklist of sockets that are not allowed to be used for a specific endpoint
105    pub endpoint_sockets_blacklist:
106        RefCell<HashMap<Endpoint, HashSet<ComInterfaceSocketUUID>>>,
107
108    /// fallback sockets that are used if no direct endpoint reachable socket is available
109    /// sorted by priority
110    pub fallback_sockets:
111        RefCell<Vec<(ComInterfaceSocketUUID, u16, InterfaceDirection)>>,
112
113    /// a list of all available sockets for each endpoint, with additional
114    /// DynamicEndpointProperties metadata
115    pub endpoint_sockets: RefCell<
116        HashMap<
117            Endpoint,
118            Vec<(ComInterfaceSocketUUID, DynamicEndpointProperties)>,
119        >,
120    >,
121
122    /// set to true if the update loop should be running
123    /// when set to false, the update loop will stop
124    update_loop_running: RefCell<bool>,
125    update_loop_stop_sender: RefCell<Option<Sender<()>>>,
126
127    pub block_handler: BlockHandler,
128
129    incoming_block_interceptors: RefCell<Vec<IncomingBlockInterceptor>>,
130    outgoing_block_interceptors: RefCell<Vec<OutgoingBlockInterceptor>>,
131}
132
133impl Debug for ComHub {
134    fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
135        f.debug_struct("ComHub")
136            .field("endpoint", &self.endpoint)
137            .field("options", &self.options)
138            .field("sockets", &self.sockets)
139            .field(
140                "endpoint_sockets_blacklist",
141                &self.endpoint_sockets_blacklist,
142            )
143            .field("fallback_sockets", &self.fallback_sockets)
144            .field("endpoint_sockets", &self.endpoint_sockets)
145            .finish()
146    }
147}
148
149#[derive(Debug, Clone, Default)]
150struct EndpointIterateOptions<'a> {
151    pub only_direct: bool,
152    pub exact_instance: bool,
153    pub exclude_sockets: &'a [ComInterfaceSocketUUID],
154}
155
156#[derive(Debug, Clone, Copy, Ord, PartialOrd, Eq, PartialEq)]
157pub enum InterfacePriority {
158    /// The interface will not be used for fallback routing if no other interface is available
159    /// This is useful for interfaces which cannot communicate with the outside world or are not
160    /// capable of redirecting large amounts of data
161    None,
162    /// The interface will be used for fallback routing if no other interface is available,
163    /// depending on the defined priority
164    /// A higher number means a higher priority
165    Priority(u16),
166}
167
168impl Default for InterfacePriority {
169    fn default() -> Self {
170        InterfacePriority::Priority(0)
171    }
172}
173
174#[derive(Debug, Clone, PartialEq)]
175pub enum ComHubError {
176    InterfaceError(ComInterfaceError),
177    InterfaceOpenFailed,
178    InterfaceCloseFailed,
179    InterfaceAlreadyExists,
180    InterfaceDoesNotExist,
181    InterfaceNotConnected,
182    InterfaceTypeDoesNotExist,
183    InvalidInterfaceDirectionForFallbackInterface,
184    NoResponse,
185    SignatureError,
186}
187
188impl Display for ComHubError {
189    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
190        match self {
191            ComHubError::InterfaceError(_msg) => {
192                core::write!(f, "ComHubError: ComInterfaceError")
193            }
194            ComHubError::InterfaceCloseFailed => {
195                core::write!(f, "ComHubError: Failed to close interface")
196            }
197            ComHubError::InterfaceNotConnected => {
198                core::write!(f, "ComHubError: Interface not connected")
199            }
200            ComHubError::InterfaceDoesNotExist => {
201                core::write!(f, "ComHubError: Interface does not exit")
202            }
203            ComHubError::InterfaceAlreadyExists => {
204                core::write!(f, "ComHubError: Interface already exists")
205            }
206            ComHubError::InterfaceTypeDoesNotExist => {
207                core::write!(f, "ComHubError: Type of interface does not exist")
208            }
209            ComHubError::InvalidInterfaceDirectionForFallbackInterface => {
210                core::write!(
211                    f,
212                    "ComHubError: Invalid direction for fallback interface"
213                )
214            }
215            ComHubError::NoResponse => {
216                core::write!(f, "ComHubError: No response")
217            }
218            ComHubError::InterfaceOpenFailed => {
219                core::write!(f, "ComHubError: Failed to open interface")
220            }
221            ComHubError::SignatureError => {
222                core::write!(f, "ComHubError: CryptoError")
223            }
224        }
225    }
226}
227
228#[derive(Debug)]
229pub enum SocketEndpointRegistrationError {
230    SocketDisconnected,
231    SocketUninitialized,
232    SocketEndpointAlreadyRegistered,
233}
234
235#[cfg_attr(feature = "embassy_runtime", embassy_executor::task)]
236async fn update_loop_task(self_rc: Rc<ComHub>) {
237    while *self_rc.update_loop_running.borrow() {
238        self_rc.update().await;
239        sleep(Duration::from_millis(1)).await;
240    }
241    if let Some(sender) = self_rc.update_loop_stop_sender.borrow_mut().take() {
242        sender.send(()).expect("Failed to send stop signal");
243    }
244}
245
246#[cfg_attr(feature = "embassy_runtime", embassy_executor::task)]
247async fn reconnect_interface_task(interface_rc: Rc<RefCell<dyn ComInterface>>) {
248    let interface = interface_rc.clone();
249    let mut interface = interface.borrow_mut();
250
251    let config = interface.get_properties_mut();
252    config.close_timestamp = None;
253
254    let current_attempts = config.reconnect_attempts.unwrap_or(0);
255    config.reconnect_attempts = Some(current_attempts + 1);
256
257    let res = interface.handle_open().await;
258    if res {
259        interface.set_state(ComInterfaceState::Connected);
260        // config.reconnect_attempts = None;
261    } else {
262        interface.set_state(ComInterfaceState::NotConnected);
263    }
264}
265
266impl ComHub {
267    pub fn new(
268        endpoint: impl Into<Endpoint>,
269        async_context: AsyncContext,
270    ) -> ComHub {
271        ComHub {
272            endpoint: endpoint.into(),
273            async_context,
274            options: ComHubOptions::default(),
275            interface_factories: RefCell::new(HashMap::new()),
276            interfaces: RefCell::new(HashMap::new()),
277            endpoint_sockets: RefCell::new(HashMap::new()),
278            block_handler: BlockHandler::new(),
279            sockets: RefCell::new(HashMap::new()),
280            fallback_sockets: RefCell::new(Vec::new()),
281            endpoint_sockets_blacklist: RefCell::new(HashMap::new()),
282            update_loop_running: RefCell::new(false),
283            update_loop_stop_sender: RefCell::new(None),
284            incoming_block_interceptors: RefCell::new(Vec::new()),
285            outgoing_block_interceptors: RefCell::new(Vec::new()),
286        }
287    }
288
289    pub async fn init(&self) -> Result<(), ComHubError> {
290        // add default local loopback interface
291        let local_interface = LocalLoopbackInterface::new();
292        self.open_and_add_interface(
293            Rc::new(RefCell::new(local_interface)),
294            InterfacePriority::None,
295        )
296        .await
297    }
298
299    /// Registers a new interface factory for a specific interface implementation.
300    /// This allows the ComHub to create new instances of the interface on demand.
301    pub fn register_interface_factory(
302        &self,
303        interface_type: String,
304        factory: ComInterfaceFactoryFn,
305    ) {
306        self.interface_factories
307            .borrow_mut()
308            .insert(interface_type, factory);
309    }
310
311    /// Creates a new interface instance using the registered factory
312    /// for the specified interface type if it exists.
313    /// The interface is opened and added to the ComHub.
314    pub async fn create_interface(
315        &self,
316        interface_type: &str,
317        setup_data: ValueContainer,
318        priority: InterfacePriority,
319    ) -> Result<Rc<RefCell<dyn ComInterface>>, ComHubError> {
320        info!("creating interface {interface_type}");
321        let interface_factories = self.interface_factories.borrow();
322        if let Some(factory) = interface_factories.get(interface_type) {
323            let interface =
324                factory(setup_data).map_err(ComHubError::InterfaceError)?;
325            drop(interface_factories);
326
327            self.open_and_add_interface(interface.clone(), priority)
328                .await
329                .map(|_| interface)
330        } else {
331            Err(ComHubError::InterfaceTypeDoesNotExist)
332        }
333    }
334
335    fn try_downcast<T: 'static>(
336        input: Rc<RefCell<dyn ComInterface>>,
337    ) -> Option<Rc<RefCell<T>>> {
338        // Try to get a reference to the inner value
339        if input.borrow().as_any().is::<T>() {
340            // SAFETY: We're ensuring T is the correct type via the check
341            let ptr = Rc::into_raw(input) as *const RefCell<T>;
342            unsafe { Some(Rc::from_raw(ptr)) }
343        } else {
344            None
345        }
346    }
347
348    /// Register an incoming block interceptor
349    pub fn register_incoming_block_interceptor<F>(&self, interceptor: F)
350    where
351        F: Fn(&DXBBlock, &ComInterfaceSocketUUID) + 'static,
352    {
353        self.incoming_block_interceptors
354            .borrow_mut()
355            .push(Box::new(interceptor));
356    }
357
358    /// Register an outgoing block interceptor
359    pub fn register_outgoing_block_interceptor<F>(&self, interceptor: F)
360    where
361        F: Fn(&DXBBlock, &ComInterfaceSocketUUID, &[Endpoint]) + 'static,
362    {
363        self.outgoing_block_interceptors
364            .borrow_mut()
365            .push(Box::new(interceptor));
366    }
367
368    pub fn get_interface_by_uuid<T: ComInterface>(
369        &self,
370        interface_uuid: &ComInterfaceUUID,
371    ) -> Option<Rc<RefCell<T>>> {
372        ComHub::try_downcast(
373            self.interfaces.borrow().get(interface_uuid)?.0.clone(),
374        )
375    }
376
377    pub fn has_interface(&self, interface_uuid: &ComInterfaceUUID) -> bool {
378        self.interfaces.borrow().contains_key(interface_uuid)
379    }
380
381    pub fn get_dyn_interface_by_uuid(
382        &self,
383        uuid: &ComInterfaceUUID,
384    ) -> Option<Rc<RefCell<dyn ComInterface>>> {
385        self.interfaces
386            .borrow()
387            .get(uuid)
388            .map(|(interface, _)| interface.clone())
389    }
390
391    pub async fn open_and_add_interface(
392        &self,
393        interface: Rc<RefCell<dyn ComInterface>>,
394        priority: InterfacePriority,
395    ) -> Result<(), ComHubError> {
396        if interface.borrow().get_state() != ComInterfaceState::Connected {
397            // If interface is not connected, open it
398            // and wait for it to be connected
399            // FIXME #240: borrow_mut across await point
400            if !(interface.borrow_mut().handle_open().await) {
401                return Err(ComHubError::InterfaceOpenFailed);
402            }
403        }
404        self.add_interface(interface.clone(), priority)
405    }
406
407    pub fn add_interface(
408        &self,
409        interface: Rc<RefCell<dyn ComInterface>>,
410        priority: InterfacePriority,
411    ) -> Result<(), ComHubError> {
412        let uuid = interface.borrow().get_uuid().clone();
413        let mut interfaces = self.interfaces.borrow_mut();
414        if interfaces.contains_key(&uuid) {
415            return Err(ComHubError::InterfaceAlreadyExists);
416        }
417
418        // make sure the interface can send if a priority is set
419        if priority != InterfacePriority::None
420            && interface.borrow_mut().get_properties().direction
421                == InterfaceDirection::In
422        {
423            return Err(
424                ComHubError::InvalidInterfaceDirectionForFallbackInterface,
425            );
426        }
427
428        interfaces.insert(uuid, (interface, priority));
429        Ok(())
430    }
431
432    /// User can proactively remove an interface from the hub.
433    /// This will destroy the interface and it's sockets (perform deep cleanup)
434    pub async fn remove_interface(
435        &self,
436        interface_uuid: ComInterfaceUUID,
437    ) -> Result<(), ComHubError> {
438        info!("Removing interface {interface_uuid}");
439        let interface = self
440            .interfaces
441            .borrow_mut()
442            .get_mut(&interface_uuid.clone())
443            .ok_or(ComHubError::InterfaceDoesNotExist)?
444            .0
445            .clone();
446        {
447            // Async close the interface (stop tasks, server, cleanup internal data)
448            // FIXME #176: borrow_mut should not be used here
449            let mut interface = interface.borrow_mut();
450            interface.handle_destroy().await;
451        }
452
453        // Remove old sockets from ComHub that have been deleted by the interface destroy_sockets()
454        self.update_sockets().await;
455
456        self.cleanup_interface(interface_uuid)
457            .ok_or(ComHubError::InterfaceDoesNotExist)?;
458
459        Ok(())
460    }
461
462    /// The internal cleanup function that removes the interface from the hub
463    /// and disabled the default interface if it was set to this interface
464    fn cleanup_interface(
465        &self,
466        interface_uuid: ComInterfaceUUID,
467    ) -> Option<Rc<RefCell<dyn ComInterface>>> {
468        let interface = self
469            .interfaces
470            .borrow_mut()
471            .remove(&interface_uuid)
472            .or(None)?
473            .0;
474        Some(interface)
475    }
476
477    pub(crate) async fn receive_block(
478        &self,
479        block: &DXBBlock,
480        socket_uuid: ComInterfaceSocketUUID,
481    ) {
482        info!("{} received block: {}", self.endpoint, block);
483
484        // ignore invalid blocks (e.g. invalid signature)
485        match self.validate_block(block).await {
486            Ok(true) => { /* Ignored */ }
487            Ok(false) => {
488                warn!("Block validation failed. Dropping block...");
489                return;
490            }
491            Err(e) => {
492                warn!("Error in block validation {e}. Dropping block...");
493                return;
494            }
495        }
496
497        for interceptor in self.incoming_block_interceptors.borrow().iter() {
498            interceptor(block, &socket_uuid);
499        }
500
501        let block_type = block.block_header.flags_and_timestamp.block_type();
502
503        // register in block history
504        let is_new_block = !self.block_handler.is_block_in_history(block);
505        // assign endpoint to socket if none is assigned
506        // only if a new block and the sender in not the local endpoint
507        if is_new_block && block.routing_header.sender != self.endpoint {
508            self.register_socket_endpoint_from_incoming_block(
509                socket_uuid.clone(),
510                block,
511            );
512        }
513
514        let receivers = block.receiver_endpoints();
515        if !receivers.is_empty() {
516            let is_for_own = receivers.iter().any(|e| {
517                e == &self.endpoint
518                    || e == &Endpoint::ANY
519                    || e == &Endpoint::ANY_ALL_INSTANCES
520            });
521
522            // handle blocks for own endpoint
523            if is_for_own && block_type != BlockType::Hello {
524                info!("Block is for this endpoint");
525
526                match block_type {
527                    BlockType::Trace => {
528                        self.handle_trace_block(block, socket_uuid.clone())
529                            .await;
530                    }
531                    BlockType::TraceBack => {
532                        self.handle_trace_back_block(
533                            block,
534                            socket_uuid.clone(),
535                        );
536                    }
537                    _ => {
538                        self.block_handler.handle_incoming_block(block.clone());
539                    }
540                };
541            }
542
543            // TODO #177: handle this via TTL, not explicitly for Hello blocks
544            let should_relay =
545                // don't relay "Hello" blocks sent to own endpoint
546                !(
547                    is_for_own && block_type == BlockType::Hello
548                );
549
550            // relay the block to other endpoints
551            if should_relay {
552                // get all receivers that the block must be relayed to
553                let remaining_receivers = if is_for_own {
554                    &self.get_remote_receivers(&receivers)
555                } else {
556                    &receivers
557                };
558
559                // relay the block to all receivers
560                if !remaining_receivers.is_empty() {
561                    match block_type {
562                        BlockType::Trace | BlockType::TraceBack => {
563                            self.redirect_trace_block(
564                                block.clone_with_new_receivers(
565                                    remaining_receivers,
566                                ),
567                                socket_uuid.clone(),
568                                is_for_own,
569                            );
570                        }
571                        _ => {
572                            self.redirect_block(
573                                block.clone_with_new_receivers(
574                                    remaining_receivers,
575                                ),
576                                socket_uuid.clone(),
577                                is_for_own,
578                            );
579                        }
580                    }
581                }
582            }
583        }
584
585        // add to block history
586        if is_new_block {
587            self.block_handler
588                .add_block_to_history(block, Some(socket_uuid));
589        }
590    }
591
592    /// Returns a list of all receivers from a given ReceiverEndpoints
593    /// excluding the local endpoint
594    fn get_remote_receivers(
595        &self,
596        receiver_endpoints: &[Endpoint],
597    ) -> Vec<Endpoint> {
598        receiver_endpoints
599            .iter()
600            .filter(|e| e != &&self.endpoint)
601            .cloned()
602            .collect::<Vec<_>>()
603    }
604
605    /// Registers the socket endpoint from an incoming block
606    /// if the endpoint is not already registered for the socket
607    fn register_socket_endpoint_from_incoming_block(
608        &self,
609        socket_uuid: ComInterfaceSocketUUID,
610        block: &DXBBlock,
611    ) {
612        let socket = self.get_socket_by_uuid(&socket_uuid);
613        let mut socket_ref = socket.try_lock().unwrap();
614
615        let distance = block.routing_header.distance;
616        let sender = block.routing_header.sender.clone();
617
618        // set as direct endpoint if distance = 0
619        if socket_ref.direct_endpoint.is_none() && distance == 1 {
620            info!(
621                "Setting direct endpoint for socket {}: {}",
622                socket_ref.uuid, sender
623            );
624            socket_ref.direct_endpoint = Some(sender.clone());
625        }
626
627        drop(socket_ref);
628        match self.register_socket_endpoint(
629            socket.clone(),
630            sender.clone(),
631            distance,
632        ) {
633            Err(SocketEndpointRegistrationError::SocketEndpointAlreadyRegistered) => {
634                debug!(
635                    "Socket already registered for endpoint {sender}",
636                );
637            }
638            Err(error) => {
639                core::panic!("Failed to register socket endpoint {sender}: {error:?}");
640            },
641            Ok(_) => { }
642        }
643    }
644
645    /// Prepares a block and relays it to the given receivers.
646    /// The routing distance is incremented by 1.
647    pub(crate) fn redirect_block(
648        &self,
649        mut block: DXBBlock,
650        incoming_socket: ComInterfaceSocketUUID,
651        // only for debugging traces
652        forked: bool,
653    ) {
654        let receivers = block.receiver_endpoints();
655
656        // check if block has already passed this endpoint (-> bounced back block)
657        // and add to blacklist for all receiver endpoints
658        let history_block_data =
659            self.block_handler.get_block_data_from_history(&block);
660        if history_block_data.is_some() {
661            for receiver in &receivers {
662                if receiver != &self.endpoint {
663                    info!(
664                        "{}: Adding socket {} to blacklist for receiver {}",
665                        self.endpoint, incoming_socket, receiver
666                    );
667                    self.endpoint_sockets_blacklist
668                        .borrow_mut()
669                        .entry(receiver.clone())
670                        .or_default()
671                        .insert(incoming_socket.clone());
672                }
673            }
674        }
675
676        // increment distance for next hop
677        block.routing_header.distance += 1;
678
679        // ensure ttl is >= 1
680        // decrease TTL by 1
681        if block.routing_header.ttl > 1 {
682            block.routing_header.ttl -= 1;
683        }
684        // if ttl becomes 0 after decrement drop the block
685        else if block.routing_header.ttl == 1 {
686            block.routing_header.ttl -= 1;
687            warn!("Block TTL expired. Dropping block...");
688            return;
689        // else ttl must be zero
690        } else {
691            warn!("Block TTL expired. Dropping block...");
692            return;
693        }
694
695        let mut prefer_incoming_socket_for_bounce_back = false;
696        // if we are the original sender of the block, don't send again (prevent loop) and send
697        // bounce back block with all receivers
698        let res = {
699            if block.routing_header.sender == self.endpoint {
700                // if not bounce back block, directly send back to incoming socket (prevent loop)
701                prefer_incoming_socket_for_bounce_back =
702                    !block.is_bounce_back();
703                Err(receivers.to_vec())
704            } else {
705                let mut excluded_sockets = vec![incoming_socket.clone()];
706                if let Some(BlockHistoryData {
707                    original_socket_uuid: Some(original_socket_uuid),
708                }) = &history_block_data
709                {
710                    excluded_sockets.push(original_socket_uuid.clone())
711                }
712                self.send_block(block.clone(), excluded_sockets, forked)
713            }
714        };
715
716        // send block for unreachable endpoints back to the sender
717        if let Err(unreachable_endpoints) = res {
718            // try to send back to original socket
719            // if already in history, get original socket from history
720            // otherwise, directly send back to the incoming socket
721            let send_back_socket = if !prefer_incoming_socket_for_bounce_back
722                && let Some(history_block_data) = history_block_data
723            {
724                history_block_data.original_socket_uuid
725            } else {
726                Some(incoming_socket.clone())
727            };
728
729            // If a send_back_socket is set, the original block is not from this endpoint,
730            // so we can send it back to the original socket
731            if let Some(send_back_socket) = send_back_socket {
732                // never send a bounce back block back again to the incoming socket
733                if block.is_bounce_back() && send_back_socket == incoming_socket
734                {
735                    warn!(
736                        "{}: Tried to send bounce back block back to incoming socket, but this is not allowed",
737                        self.endpoint
738                    );
739                } else if self
740                    .get_socket_by_uuid(&send_back_socket)
741                    .try_lock()
742                    .unwrap()
743                    .can_send()
744                {
745                    block.set_bounce_back(true);
746                    self.send_block_to_endpoints_via_socket(
747                        block,
748                        &send_back_socket,
749                        &unreachable_endpoints,
750                        if forked { Some(0) } else { None },
751                    )
752                } else {
753                    error!(
754                        "Tried to send bounce back block, but cannot send back to incoming socket"
755                    )
756                }
757            }
758            // Otherwise, the block originated from this endpoint, we can just call send again
759            // and try to send it via other remaining sockets that are not on the blacklist for the
760            // block receiver
761            else {
762                self.send_block(block, vec![], forked).unwrap_or_else(|_| {
763                    error!(
764                        "Failed to send out block to {}",
765                        unreachable_endpoints
766                            .iter()
767                            .map(|e| e.to_string())
768                            .join(",")
769                    );
770                });
771            }
772        }
773    }
774
775    /// Validates a block including it's signature if set
776    /// TODO #378 @Norbert
777    pub async fn validate_block(
778        &self,
779        block: &DXBBlock,
780    ) -> Result<bool, ComHubError> {
781        // TODO #179 check for creation time, withdraw if too old (TBD) or in the future
782
783        let is_signed =
784            block.routing_header.flags.signature_type() != SignatureType::None;
785
786        match is_signed {
787            true => {
788                // TODO #180: verify signature and abort if invalid
789                // Check if signature is following in some later block and add them to
790                // a pool of incoming blocks awaiting some signature
791                cfg_if::cfg_if! {
792                    if #[cfg(feature = "native_crypto")] {
793                        use crate::runtime::global_context::get_global_context;
794                        match block.routing_header.flags.signature_type() {
795                            SignatureType::Encrypted => {
796                                let crypto = get_global_context().crypto;
797                                let raw_sign = block.signature
798                                    .as_ref()
799                                    .ok_or(ComHubError::SignatureError)?;
800                                let (enc_sign, pub_key) = raw_sign.split_at(64);
801                                let hash = crypto.hkdf_sha256(pub_key, &[0u8; 16])
802                                    .await
803                                    .map_err(|_| ComHubError::SignatureError)?;
804                                let signature = crypto
805                                    .aes_ctr_decrypt(&hash, &[0u8; 16], enc_sign)
806                                    .await
807                                    .map_err(|_| ComHubError::SignatureError)?;
808
809                                let raw_signed = [
810                                    pub_key,
811                                    &block.body.clone()
812                                    ]
813                                    .concat();
814                                let hashed_signed = crypto
815                                    .hash_sha256(&raw_signed)
816                                    .await
817                                    .map_err(|_| ComHubError::SignatureError)?;
818
819                                let ver = crypto
820                                    .ver_ed25519(pub_key, &signature, &hashed_signed)
821                                    .await
822                                    .map_err(|_| ComHubError::SignatureError)?;
823                                return Ok(ver);
824                            },
825                            SignatureType::Unencrypted => {
826                                let crypto = get_global_context().crypto;
827                                let raw_sign = block.signature
828                                    .as_ref()
829                                    .ok_or(ComHubError::SignatureError)?;
830                                let (signature, pub_key) = raw_sign.split_at(64);
831
832                                let raw_signed = [
833                                    pub_key,
834                                    &block.body.clone()
835                                    ]
836                                    .concat();
837                                let hashed_signed = crypto
838                                    .hash_sha256(&raw_signed)
839                                    .await
840                                    .map_err(|_| ComHubError::SignatureError)?;
841
842                                let ver = crypto
843                                    .ver_ed25519(pub_key, signature, &hashed_signed)
844                                    .await
845                                    .map_err(|_| ComHubError::SignatureError)?;
846                                return Ok(ver);
847                            },
848                            SignatureType::None => {
849                                unreachable!("If (is_signed == true) => !None");
850                            }
851                        }
852                    }
853                    else {
854                        Ok(true)
855                    }
856                }
857            }
858            false => {
859                let endpoint = block.routing_header.sender.clone();
860                let is_trusted = {
861                    cfg_if::cfg_if! {
862                        if #[cfg(feature = "debug")] {
863                            use crate::runtime::global_context::get_global_context;
864                            get_global_context().debug_flags.allow_unsigned_blocks
865                        }
866                        else {
867                            // TODO #181 Check if the sender is trusted (endpoint + interface) connection
868                            false
869                        }
870                    }
871                };
872                match is_trusted {
873                    true => Ok(true),
874                    false => {
875                        warn!(
876                            "Block received by {endpoint} is not signed. Dropping block..."
877                        );
878                        Ok(false)
879                    }
880                }
881            }
882        }
883    }
884
885    /// Registers a new endpoint that is reachable over the socket if the socket is not
886    /// already registered, it will be added to the socket list.
887    /// If the provided endpoint is not the same as the socket endpoint, it is registered
888    /// as an indirect socket to the endpoint
889    pub fn register_socket_endpoint(
890        &self,
891        socket: Arc<Mutex<ComInterfaceSocket>>,
892        endpoint: Endpoint,
893        distance: i8,
894    ) -> Result<(), SocketEndpointRegistrationError> {
895        log::info!(
896            "{} registering endpoint {} for socket {}",
897            self.endpoint,
898            endpoint,
899            socket.try_lock().unwrap().uuid
900        );
901        let socket_ref = socket.try_lock().unwrap();
902
903        // if the registered endpoint is the same as the socket endpoint,
904        // this is a direct socket to the endpoint
905        let is_direct = socket_ref.direct_endpoint == Some(endpoint.clone());
906
907        // cannot register endpoint if socket is not connected
908        if !socket_ref.state.is_open() {
909            return Err(SocketEndpointRegistrationError::SocketDisconnected);
910        }
911
912        // check if the socket is already registered for the endpoint
913        if let Some(entries) = self.endpoint_sockets.borrow().get(&endpoint)
914            && entries
915                .iter()
916                .any(|(socket_uuid, _)| socket_uuid == &socket_ref.uuid)
917        {
918            return Err(SocketEndpointRegistrationError::SocketEndpointAlreadyRegistered);
919        }
920
921        let socket_uuid = socket_ref.uuid.clone();
922        let channel_factor = socket_ref.channel_factor;
923        let direction = socket_ref.direction.clone();
924        drop(socket_ref);
925
926        // add endpoint to socket endpoint list
927        self.add_socket_endpoint(&socket_uuid, endpoint.clone());
928
929        // add socket to endpoint socket list
930        self.add_endpoint_socket(
931            &endpoint,
932            socket_uuid,
933            distance,
934            is_direct,
935            channel_factor,
936            direction,
937        );
938
939        // resort sockets for endpoint
940        self.sort_sockets(&endpoint);
941
942        Ok(())
943    }
944
945    /// Waits for all background tasks scheduled by the update() function to finish
946    /// This includes block flushes from `flush_outgoing_blocks()`
947    /// and interface (re)-connections from `update_interfaces()`
948    pub async fn wait_for_update_async(&self) {
949        loop {
950            let mut is_done = true;
951            for interface in self.interfaces.borrow().values() {
952                let interface = interface.0.clone();
953                let interface = interface.borrow_mut();
954                let outgoing_blocks_count =
955                    interface.get_info().outgoing_blocks_count.get();
956                // blocks are still sent out on this interface
957                if outgoing_blocks_count > 0 {
958                    is_done = false;
959                    break;
960                }
961                // interface is still in connection task
962                if interface.get_state() == ComInterfaceState::Connecting {
963                    is_done = false;
964                    break;
965                }
966            }
967            if is_done {
968                break;
969            }
970            sleep(Duration::from_millis(10)).await;
971        }
972    }
973
974    /// Updates all sockets and interfaces,
975    /// collecting incoming data and sending out queued blocks.
976    /// This function will wait for all background tasks scheduled by the update() function to finish
977    pub async fn update_async(&self) {
978        self.update().await;
979        self.wait_for_update_async().await;
980    }
981
982    /// Adds a socket to the socket list for a specific endpoint,
983    /// attaching metadata as DynamicEndpointProperties
984    fn add_endpoint_socket(
985        &self,
986        endpoint: &Endpoint,
987        socket_uuid: ComInterfaceSocketUUID,
988        distance: i8,
989        is_direct: bool,
990        channel_factor: u32,
991        direction: InterfaceDirection,
992    ) {
993        let mut endpoint_sockets = self.endpoint_sockets.borrow_mut();
994        if !endpoint_sockets.contains_key(endpoint) {
995            endpoint_sockets.insert(endpoint.clone(), Vec::new());
996        }
997
998        let endpoint_sockets = endpoint_sockets.get_mut(endpoint).unwrap();
999        endpoint_sockets.push((
1000            socket_uuid,
1001            DynamicEndpointProperties {
1002                known_since: Time::now(),
1003                distance,
1004                is_direct,
1005                channel_factor,
1006                direction,
1007            },
1008        ));
1009    }
1010
1011    /// Adds a socket to the socket list.
1012    /// If the priority is not set to `InterfacePriority::None`, the socket
1013    /// is also registered as a fallback socket for outgoing connections with the
1014    /// specified priority.
1015    async fn add_socket(
1016        &self,
1017        socket: Arc<Mutex<ComInterfaceSocket>>,
1018        priority: InterfacePriority,
1019    ) -> Result<(), ComHubError> {
1020        let socket_ref = socket.try_lock().unwrap();
1021        let socket_uuid = socket_ref.uuid.clone();
1022        if self.sockets.borrow().contains_key(&socket_ref.uuid) {
1023            core::panic!("Socket {} already exists in ComHub", socket_ref.uuid);
1024        }
1025
1026        // info!(
1027        //     "Adding socket {} to ComHub with priority {:?}",
1028        //     socket_ref.uuid, priority
1029        // );
1030
1031        if !socket_ref.can_send() && priority != InterfacePriority::None {
1032            core::panic!(
1033                "Socket {} cannot be used for fallback routing, since it has no send capability",
1034                socket_ref.uuid
1035            );
1036        }
1037        let direction = socket_ref.direction.clone();
1038
1039        self.sockets
1040            .borrow_mut()
1041            .insert(socket_ref.uuid.clone(), (socket.clone(), HashSet::new()));
1042
1043        // add outgoing socket to fallback sockets list if they have a priority flag
1044        if socket_ref.can_send() {
1045            match priority {
1046                InterfacePriority::None => {
1047                    // do nothing
1048                }
1049                InterfacePriority::Priority(priority) => {
1050                    // add socket to fallback sockets list
1051                    self.add_fallback_socket(&socket_uuid, priority, direction);
1052                }
1053            }
1054
1055            // send empty block to socket to say hello
1056            let mut block: DXBBlock = DXBBlock::default();
1057            block
1058                .block_header
1059                .flags_and_timestamp
1060                .set_block_type(BlockType::Hello);
1061            block
1062                .routing_header
1063                .flags
1064                .set_signature_type(SignatureType::Unencrypted);
1065            // TODO #182 include fingerprint of the own public key into body
1066
1067            let block = self.prepare_own_block(block).await?;
1068
1069            drop(socket_ref);
1070            self.send_block_to_endpoints_via_socket(
1071                block,
1072                &socket_uuid,
1073                &[Endpoint::ANY],
1074                None,
1075            );
1076        }
1077        Ok(())
1078    }
1079
1080    /// Registers a socket as a fallback socket for outgoing connections
1081    /// that can be used if no known route exists for an endpoint
1082    /// Note: only sockets that support sending data should be used as fallback sockets
1083    fn add_fallback_socket(
1084        &self,
1085        socket_uuid: &ComInterfaceSocketUUID,
1086        priority: u16,
1087        direction: InterfaceDirection,
1088    ) {
1089        // add to vec
1090        let mut fallback_sockets = self.fallback_sockets.borrow_mut();
1091        fallback_sockets.push((socket_uuid.clone(), priority, direction));
1092        // first sort by direction (InOut before Out - only In is not allowed)
1093        // second sort by priority
1094        fallback_sockets.sort_by_key(|(_, priority, direction)| {
1095            let dir_rank = match direction {
1096                InterfaceDirection::InOut => 0,
1097                InterfaceDirection::Out => 1,
1098                InterfaceDirection::In => {
1099                    core::panic!("Socket {socket_uuid} is not allowed to be used as fallback socket")
1100                }
1101            };
1102            (dir_rank, core::cmp::Reverse(*priority))
1103        });
1104    }
1105
1106    /// Removes a socket from the socket list
1107    fn delete_socket(&self, socket_uuid: &ComInterfaceSocketUUID) {
1108        self.sockets.borrow_mut().remove(socket_uuid).or_else(|| {
1109            core::panic!("Socket {socket_uuid} not found in ComHub")
1110        });
1111
1112        // remove socket from endpoint socket list
1113        // remove endpoint key from endpoint_sockets if not sockets present
1114        self.endpoint_sockets.borrow_mut().retain(|_, sockets| {
1115            sockets.retain(|(uuid, _)| uuid != socket_uuid);
1116            !sockets.is_empty()
1117        });
1118
1119        // remove socket if it is the default socket
1120        self.fallback_sockets
1121            .borrow_mut()
1122            .retain(|(uuid, _, _)| uuid != socket_uuid);
1123    }
1124
1125    /// Adds an endpoint to the endpoint list of a specific socket
1126    fn add_socket_endpoint(
1127        &self,
1128        socket_uuid: &ComInterfaceSocketUUID,
1129        endpoint: Endpoint,
1130    ) {
1131        core::assert!(
1132            self.sockets.borrow().contains_key(socket_uuid),
1133            "Socket not found in ComHub"
1134        );
1135        // add endpoint to socket endpoint list
1136        self.sockets
1137            .borrow_mut()
1138            .get_mut(socket_uuid)
1139            .unwrap()
1140            .1
1141            .insert(endpoint.clone());
1142    }
1143
1144    /// Sorts the sockets for an endpoint:
1145    /// - socket with send capability first
1146    /// - then direct sockets
1147    /// - then sort by channel channel_factor (latency, bandwidth)
1148    /// - then sort by socket connect_timestamp
1149    ///
1150    /// When the global debug flag `enable_deterministic_behavior` is set,
1151    /// Sockets are not sorted by their connect_timestamp to make sure that the order of
1152    /// received blocks has no effect on the routing priorities
1153    fn sort_sockets(&self, endpoint: &Endpoint) {
1154        let mut endpoint_sockets = self.endpoint_sockets.borrow_mut();
1155        let sockets = endpoint_sockets.get_mut(endpoint).unwrap();
1156
1157        sockets.sort_by(|(_, a), (_, b)| {
1158            // sort by channel_factor
1159            b.is_direct
1160                .cmp(&a.is_direct)
1161                .then_with(|| b.channel_factor.cmp(&a.channel_factor))
1162                .then_with(|| b.distance.cmp(&a.distance))
1163                .then_with(
1164                    || {
1165                        cfg_if::cfg_if! {
1166                            if #[cfg(feature = "debug")] {
1167                                use crate::runtime::global_context::get_global_context;
1168                                use core::cmp::Ordering;
1169                                if get_global_context().debug_flags.enable_deterministic_behavior {
1170                                    Ordering::Equal
1171                                }
1172                                else {
1173                                    b.known_since.cmp(&a.known_since)
1174                                }
1175                            }
1176                            else {
1177                                b.known_since.cmp(&a.known_since)
1178                            }
1179                        }
1180                    }
1181                )
1182
1183        });
1184    }
1185
1186    /// Returns the socket for a given UUID
1187    /// The socket must be registered in the ComHub,
1188    /// otherwise a panic will be triggered
1189    pub(crate) fn get_socket_by_uuid(
1190        &self,
1191        socket_uuid: &ComInterfaceSocketUUID,
1192    ) -> Arc<Mutex<ComInterfaceSocket>> {
1193        self.sockets
1194            .borrow()
1195            .get(socket_uuid)
1196            .map(|socket| socket.0.clone())
1197            .unwrap_or_else(|| {
1198                core::panic!("Socket for uuid {socket_uuid} not found")
1199            })
1200    }
1201
1202    /// Returns the com interface for a given UUID
1203    /// The interface must be registered in the ComHub,
1204    /// otherwise a panic will be triggered
1205    pub(crate) fn get_com_interface_by_uuid(
1206        &self,
1207        interface_uuid: &ComInterfaceUUID,
1208    ) -> Rc<RefCell<dyn ComInterface>> {
1209        self.interfaces
1210            .borrow()
1211            .get(interface_uuid)
1212            .unwrap_or_else(|| {
1213                core::panic!("Interface for uuid {interface_uuid} not found")
1214            })
1215            .0
1216            .clone()
1217    }
1218
1219    /// Returns the com interface for a given socket UUID
1220    /// The interface and socket must be registered in the ComHub,
1221    /// otherwise a panic will be triggered
1222    pub(crate) fn get_com_interface_from_socket_uuid(
1223        &self,
1224        socket_uuid: &ComInterfaceSocketUUID,
1225    ) -> Rc<RefCell<dyn ComInterface>> {
1226        let socket = self.get_socket_by_uuid(socket_uuid);
1227        let socket = socket.try_lock().unwrap();
1228        self.get_com_interface_by_uuid(&socket.interface_uuid)
1229    }
1230
1231    /// Returns an iterator over all sockets for a given endpoint
1232    /// The sockets are yielded in the order of their priority, starting with the
1233    /// highest priority socket (the best socket for sending data to the endpoint)
1234    fn iterate_endpoint_sockets<'a>(
1235        &'a self,
1236        endpoint: &'a Endpoint,
1237        options: EndpointIterateOptions<'a>,
1238    ) -> impl Iterator<Item = ComInterfaceSocketUUID> + 'a {
1239        core::iter::from_coroutine(
1240            #[coroutine]
1241            move || {
1242                let endpoint_sockets_borrow = self.endpoint_sockets.borrow();
1243                // TODO #183: can we optimize this to avoid cloning the endpoint_sockets vector?
1244                let endpoint_sockets =
1245                    endpoint_sockets_borrow.get(endpoint).cloned();
1246                if endpoint_sockets.is_none() {
1247                    return;
1248                }
1249                for (socket_uuid, _) in endpoint_sockets.unwrap() {
1250                    {
1251                        let socket = self.get_socket_by_uuid(&socket_uuid);
1252                        let socket = socket.try_lock().unwrap();
1253
1254                        // check if only_direct is set and the endpoint equals the direct endpoint of the socket
1255                        if options.only_direct
1256                            && socket.direct_endpoint.is_some()
1257                            && socket.direct_endpoint.as_ref().unwrap()
1258                                == endpoint
1259                        {
1260                            debug!(
1261                                "No direct socket found for endpoint {endpoint}. Skipping..."
1262                            );
1263                            continue;
1264                        }
1265
1266                        // check if the socket is excluded if exclude_socket is set
1267                        if options.exclude_sockets.contains(&socket.uuid) {
1268                            debug!(
1269                                "Socket {} is excluded for endpoint {}. Skipping...",
1270                                socket.uuid, endpoint
1271                            );
1272                            continue;
1273                        }
1274
1275                        // TODO #184 optimize and separate outgoing/non-outgoing sockets for endpoint
1276                        // only yield outgoing sockets
1277                        // if a non-outgoing socket is found, all following sockets
1278                        // will also be non-outgoing
1279                        if !socket.can_send() {
1280                            info!(
1281                                "Socket {} is not outgoing for endpoint {}. Skipping...",
1282                                socket.uuid, endpoint
1283                            );
1284                            return;
1285                        }
1286                    }
1287
1288                    debug!(
1289                        "Found matching socket {socket_uuid} for endpoint {endpoint}"
1290                    );
1291                    yield socket_uuid.clone()
1292                }
1293            },
1294        )
1295    }
1296
1297    /// Finds the best matching socket over which an endpoint is known to be reachable.
1298    fn find_known_endpoint_socket(
1299        &self,
1300        endpoint: &Endpoint,
1301        exclude_socket: &[ComInterfaceSocketUUID],
1302    ) -> Option<ComInterfaceSocketUUID> {
1303        match endpoint.instance {
1304            // find socket for any endpoint instance
1305            EndpointInstance::Any => {
1306                let options = EndpointIterateOptions {
1307                    only_direct: false,
1308                    exact_instance: false,
1309                    exclude_sockets: exclude_socket,
1310                };
1311                if let Some(socket) =
1312                    self.iterate_endpoint_sockets(endpoint, options).next()
1313                {
1314                    return Some(socket);
1315                }
1316                None
1317            }
1318
1319            // find socket for exact instance
1320            EndpointInstance::Instance(_) => {
1321                // iterate over all sockets of all interfaces
1322                let options = EndpointIterateOptions {
1323                    only_direct: false,
1324                    exact_instance: true,
1325                    exclude_sockets: exclude_socket,
1326                };
1327                if let Some(socket) =
1328                    self.iterate_endpoint_sockets(endpoint, options).next()
1329                {
1330                    return Some(socket);
1331                }
1332                None
1333            }
1334            // TODO #185: how to handle broadcasts?
1335            EndpointInstance::All => {
1336                core::todo!("#186 Undescribed by author.")
1337            }
1338        }
1339    }
1340    /// Finds the best socket over which to send a block to an endpoint.
1341    /// If a known socket is found, it is returned, otherwise the default socket is returned, if it
1342    /// exists and is not excluded.
1343    fn find_best_endpoint_socket(
1344        &self,
1345        endpoint: &Endpoint,
1346        exclude_sockets: &[ComInterfaceSocketUUID],
1347    ) -> Option<ComInterfaceSocketUUID> {
1348        // if the endpoint is the same as the hub endpoint, try to find an interface
1349        // that redirects @@local
1350        if endpoint == &self.endpoint
1351            && let Some(socket) = self
1352                .find_known_endpoint_socket(&Endpoint::LOCAL, exclude_sockets)
1353        {
1354            return Some(socket);
1355        }
1356
1357        // find best known socket for endpoint
1358        let matching_socket =
1359            self.find_known_endpoint_socket(endpoint, exclude_sockets);
1360
1361        // if a matching socket is found, return it
1362        if matching_socket.is_some() {
1363            matching_socket
1364        }
1365        // otherwise, return the highest priority socket that is not excluded
1366        else {
1367            let sockets = self.fallback_sockets.borrow();
1368            for (socket_uuid, _, _) in sockets.iter() {
1369                let socket = self.get_socket_by_uuid(socket_uuid);
1370                info!(
1371                    "{}: Find best for {}: {} ({}); excluded:{}",
1372                    self.endpoint,
1373                    endpoint,
1374                    socket_uuid,
1375                    socket
1376                        .try_lock()
1377                        .unwrap()
1378                        .direct_endpoint
1379                        .clone()
1380                        .map(|e| e.to_string())
1381                        .unwrap_or("None".to_string()),
1382                    exclude_sockets.contains(socket_uuid)
1383                );
1384                if !exclude_sockets.contains(socket_uuid) {
1385                    return Some(socket_uuid.clone());
1386                }
1387            }
1388            None
1389        }
1390    }
1391
1392    /// Returns all receivers to which the block has to be sent, grouped by the
1393    /// outbound socket uuids
1394    fn get_outbound_receiver_groups(
1395        &self,
1396        // TODO #187: do we need the block here for additional information (match conditions),
1397        // otherwise receivers are enough
1398        block: &DXBBlock,
1399        mut exclude_sockets: Vec<ComInterfaceSocketUUID>,
1400    ) -> Option<Vec<(Option<ComInterfaceSocketUUID>, Vec<Endpoint>)>> {
1401        let receivers = block.receiver_endpoints();
1402
1403        if !receivers.is_empty() {
1404            let endpoint_sockets = receivers
1405                .iter()
1406                .map(|e| {
1407                    // add sockets from endpoint blacklist
1408                    if let Some(blacklist) =
1409                        self.endpoint_sockets_blacklist.borrow().get(e)
1410                    {
1411                        exclude_sockets.extend(blacklist.iter().cloned());
1412                    }
1413                    let socket =
1414                        self.find_best_endpoint_socket(e, &exclude_sockets);
1415                    (socket, e)
1416                })
1417                .group_by(|(socket, _)| socket.clone())
1418                .into_iter()
1419                .map(|(socket, group)| {
1420                    let endpoints = group
1421                        .map(|(_, endpoint)| endpoint.clone())
1422                        .collect::<Vec<_>>();
1423                    (socket, endpoints)
1424                })
1425                .collect::<Vec<_>>();
1426            Some(endpoint_sockets)
1427        } else {
1428            None
1429        }
1430    }
1431
1432    /// Runs the update loop for the ComHub.
1433    /// This method will continuously handle incoming data, send out
1434    /// queued blocks and update the sockets.
1435    /// This is only used for internal tests - in a full runtime setup, the main runtime update loop triggers
1436    /// ComHub updates.
1437    pub fn _start_update_loop(self_rc: Rc<Self>) {
1438        // if already running, do nothing
1439        if *self_rc.update_loop_running.borrow() {
1440            return;
1441        }
1442
1443        // set update loop running flag
1444        *self_rc.update_loop_running.borrow_mut() = true;
1445
1446        spawn_with_panic_notify(
1447            &self_rc.clone().async_context,
1448            update_loop_task(self_rc),
1449        );
1450    }
1451
1452    /// Update all sockets and interfaces,
1453    /// collecting incoming data and sending out queued blocks.
1454    /// Updates are scheduled in local tasks and are not immediately visible.
1455    /// To wait for the block update to finish, use `wait_for_update_async()`.
1456    pub async fn update(&self) {
1457        // update all interfaces
1458        self.update_interfaces();
1459
1460        // update own socket lists for routing
1461        self.update_sockets().await;
1462
1463        // update sockets block collectors
1464        self.collect_incoming_data().await;
1465
1466        // receive blocks from all sockets
1467        self.receive_incoming_blocks().await;
1468
1469        // send all queued blocks from all interfaces
1470        self.flush_outgoing_blocks();
1471    }
1472
1473    /// Prepares a block for sending out by updating the creation timestamp,
1474    /// sender and add signature and encryption if needed.
1475    /// TODO #379 @Norbert
1476    pub async fn prepare_own_block(
1477        &self,
1478        mut block: DXBBlock,
1479    ) -> Result<DXBBlock, ComHubError> {
1480        // TODO #188 signature & encryption
1481        cfg_if::cfg_if! {
1482            if #[cfg(feature = "native_crypto")] {
1483                use crate::runtime::global_context::get_global_context;
1484                match block.routing_header.flags.signature_type() {
1485                    SignatureType::Encrypted => {
1486                        let crypto = get_global_context().crypto;
1487                        let (pub_key, pri_key) = crypto.gen_ed25519()
1488                                .await
1489                                .map_err(|_| ComHubError::SignatureError)?;
1490
1491                        let raw_signed = [
1492                            pub_key.clone(),
1493                            block.body.clone()
1494                            ]
1495                            .concat();
1496                        let hashed_signed = crypto
1497                            .hash_sha256(&raw_signed)
1498                            .await
1499                            .map_err(|_| ComHubError::SignatureError)?;
1500
1501                        let signature = crypto
1502                            .sig_ed25519(&pri_key, &hashed_signed)
1503                            .await
1504                            .map_err(|_| ComHubError::SignatureError)?;
1505                        let hash = crypto
1506                            .hkdf_sha256(&pub_key, &[0u8; 16])
1507                            .await
1508                            .map_err(|_| ComHubError::SignatureError)?;
1509                        let enc_sig = crypto
1510                            .aes_ctr_encrypt(&hash, &[0u8; 16], &signature)
1511                            .await
1512                            .map_err(|_| ComHubError::SignatureError)?;
1513                        // 64 + 44 = 108
1514                        block.signature =
1515                            Some([enc_sig.to_vec(), pub_key].concat());
1516                    }
1517                    SignatureType::Unencrypted => {
1518                        let crypto = get_global_context().crypto;
1519                        let (pub_key, pri_key) = crypto.gen_ed25519()
1520                            .await
1521                            .map_err(|_| ComHubError::SignatureError)?;
1522
1523                        let raw_signed = [
1524                            pub_key.clone(),
1525                            block.body.clone()
1526                            ]
1527                            .concat();
1528                        let hashed_signed = crypto
1529                            .hash_sha256(&raw_signed)
1530                            .await
1531                            .map_err(|_| ComHubError::SignatureError)?;
1532
1533                        let signature = crypto
1534                            .sig_ed25519(&pri_key, &hashed_signed)
1535                            .await
1536                            .map_err(|_| ComHubError::SignatureError)?;
1537                        // 64 + 44 = 108
1538                        block.signature =
1539                            Some([signature.to_vec(), pub_key].concat());
1540                    }
1541                    SignatureType::None => {
1542                        /* Ignored */
1543                    }
1544                }
1545            }
1546        }
1547
1548        let now = Time::now();
1549        block.routing_header.sender = self.endpoint.clone();
1550        block
1551            .block_header
1552            .flags_and_timestamp
1553            .set_creation_timestamp(now);
1554
1555        // set distance to 1
1556        block.routing_header.distance = 1;
1557        Ok(block)
1558    }
1559
1560    /// Public method to send an outgoing block from this endpoint. Called by the runtime.
1561    pub async fn send_own_block(
1562        &self,
1563        mut block: DXBBlock,
1564    ) -> Result<(), Vec<Endpoint>> {
1565        block = self.prepare_own_block(block).await.map_err(|_| vec![])?;
1566        // add own outgoing block to history
1567        self.block_handler.add_block_to_history(&block, None);
1568        self.send_block(block, vec![], false)
1569    }
1570
1571    /// Sends a block and wait for a response block.
1572    /// Fix number of exact endpoints -> Expected responses are known at send time.
1573    /// TODO #189: make sure that mutating blocks are always send to specific endpoint instances (@jonas/0001), not generic endpoints like @jonas.
1574    /// @jonas -> response comes from a specific instance of @jonas/0001
1575    pub async fn send_own_block_await_response(
1576        &self,
1577        block: DXBBlock,
1578        options: ResponseOptions,
1579    ) -> Vec<Result<Response, ResponseError>> {
1580        let context_id = block.block_header.context_id;
1581        let section_index = block.block_header.section_index;
1582
1583        let has_exact_receiver_count = block.has_exact_receiver_count();
1584        let receivers = block.receiver_endpoints();
1585
1586        let res = self.send_own_block(block).await;
1587        let failed_endpoints = res.err().unwrap_or_default();
1588
1589        // yield
1590        #[cfg(feature = "tokio_runtime")]
1591        yield_now().await;
1592
1593        let timeout = options
1594            .timeout
1595            .unwrap_or_default(self.options.default_receive_timeout);
1596
1597        // return fixed number of responses
1598        if has_exact_receiver_count {
1599            // if resolution strategy is ReturnOnAnyError or ReturnOnFirstResult, directly return if any endpoint failed
1600            if (options.resolution_strategy
1601                == ResponseResolutionStrategy::ReturnOnAnyError
1602                || options.resolution_strategy
1603                    == ResponseResolutionStrategy::ReturnOnFirstResult)
1604                && !failed_endpoints.is_empty()
1605            {
1606                // for each failed endpoint, set NotReachable error, for all others EarlyAbort
1607                return receivers
1608                    .iter()
1609                    .map(|receiver| {
1610                        if failed_endpoints.contains(receiver) {
1611                            Err(ResponseError::NotReachable(receiver.clone()))
1612                        } else {
1613                            Err(ResponseError::EarlyAbort(receiver.clone()))
1614                        }
1615                    })
1616                    .collect::<Vec<_>>();
1617            }
1618
1619            // store received responses in map for all receivers
1620            let mut responses = HashMap::new();
1621            let mut missing_response_count = receivers.len();
1622            for receiver in &receivers {
1623                responses.insert(
1624                    receiver.clone(),
1625                    if failed_endpoints.contains(receiver) {
1626                        Err(ResponseError::NotReachable(receiver.clone()))
1627                    } else {
1628                        Err(ResponseError::NoResponseAfterTimeout(
1629                            receiver.clone(),
1630                            timeout,
1631                        ))
1632                    },
1633                );
1634            }
1635            // directly subtract number of already failed endpoints from missing responses
1636            missing_response_count -= failed_endpoints.len();
1637
1638            info!(
1639                "Waiting for responses from receivers {}",
1640                receivers
1641                    .iter()
1642                    .map(|e| e.to_string())
1643                    .collect::<Vec<_>>()
1644                    .join(",")
1645            );
1646
1647            let mut rx = self
1648                .block_handler
1649                .register_incoming_block_observer(context_id, section_index);
1650
1651            let res = task::timeout(timeout, async {
1652                while let Some(section) = rx.next().await {
1653                    let mut received_response = false;
1654                    // get sender
1655                    let mut sender = section.get_sender();
1656                    // add to response for exactly matching endpoint instance
1657                    if let Some(response) = responses.get_mut(&sender) {
1658                        // check if the receiver is already set (= current set response is Err)
1659                        if response.is_err() {
1660                            *response = Ok(Response::ExactResponse(sender.clone(), section));
1661                            missing_response_count -= 1;
1662                            info!("Received expected response from {sender}");
1663                            received_response = true;
1664                        }
1665                        // already received a response from this exact sender - this should not happen
1666                        else {
1667                            error!("Received multiple responses from the same sender: {sender}");
1668                        }
1669                    }
1670                    // add to response for matching endpoint
1671                    else if let Some(response) = responses.get_mut(&sender.any_instance_endpoint()) {
1672                        info!("Received resolved response from {} -> {}", &sender, &sender.any_instance_endpoint());
1673                        sender = sender.any_instance_endpoint();
1674                        // check if the receiver is already set (= current set response is Err)
1675                        if response.is_err() {
1676                            *response = Ok(Response::ResolvedResponse(sender.clone(), section));
1677                            missing_response_count -= 1;
1678                            received_response = true;
1679                        }
1680                        // already received a response from a matching endpoint - ignore
1681                        else {
1682                            info!("Received multiple resolved responses from the {}", &sender);
1683                        }
1684                    }
1685                    // response from unexpected sender
1686                    else {
1687                        error!("Received response from unexpected sender: {}", &sender);
1688                    }
1689
1690                    // if resolution strategy is ReturnOnFirstResult, break if any response is received
1691                    if received_response && options.resolution_strategy == ResponseResolutionStrategy::ReturnOnFirstResult {
1692                        // set all other responses to EarlyAbort
1693                        for (receiver, response) in responses.iter_mut() {
1694                            if receiver != &sender {
1695                                *response = Err(ResponseError::EarlyAbort(receiver.clone()));
1696                            }
1697                        }
1698                        break;
1699                    }
1700
1701                    // if all responses are received, break
1702                    if missing_response_count == 0 {
1703                        break;
1704                    }
1705                }
1706            }).await;
1707
1708            if res.is_err() {
1709                error!("Timeout waiting for responses");
1710            }
1711
1712            // return responses as vector
1713            responses.into_values().collect::<Vec<_>>()
1714        }
1715        // return all received responses
1716        else {
1717            let mut responses = vec![];
1718
1719            let res = task::timeout(timeout, async {
1720                let mut rx =
1721                    self.block_handler.register_incoming_block_observer(
1722                        context_id,
1723                        section_index,
1724                    );
1725                while let Some(section) = rx.next().await {
1726                    // get sender
1727                    let sender = section.get_sender();
1728                    info!("Received response from {sender}");
1729                    // add to response for exactly matching endpoint instance
1730                    responses.push(Ok(Response::UnspecifiedResponse(section)));
1731
1732                    // if resolution strategy is ReturnOnFirstResult, break if any response is received
1733                    if options.resolution_strategy
1734                        == ResponseResolutionStrategy::ReturnOnFirstResult
1735                    {
1736                        break;
1737                    }
1738                }
1739            })
1740            .await;
1741
1742            if res.is_err() {
1743                info!("Timeout waiting for responses");
1744            }
1745
1746            responses
1747        }
1748    }
1749
1750    /// Sends a block to all endpoints specified in the block header.
1751    /// The routing algorithm decides which sockets are used to send the block, based on the endpoint.
1752    /// A block can be sent to multiple endpoints at the same time over a socket or to multiple sockets for each endpoint.
1753    /// The original_socket parameter is used to prevent sending the block back to the sender.
1754    /// When this method is called, the block is queued in the send queue.
1755    /// Returns an Err with a list of unreachable endpoints if the block could not be sent to all endpoints.
1756    pub fn send_block(
1757        &self,
1758        mut block: DXBBlock,
1759        exclude_sockets: Vec<ComInterfaceSocketUUID>,
1760        forked: bool,
1761    ) -> Result<(), Vec<Endpoint>> {
1762        let outbound_receiver_groups =
1763            self.get_outbound_receiver_groups(&block, exclude_sockets);
1764
1765        if outbound_receiver_groups.is_none() {
1766            error!("No outbound receiver groups found for block");
1767            return Err(vec![]);
1768        }
1769
1770        let outbound_receiver_groups = outbound_receiver_groups.unwrap();
1771
1772        let mut unreachable_endpoints = vec![];
1773
1774        // currently only used for trace debugging (TODO: put behind debug flag)
1775        // if more than one addressed block is sent, the block is forked, thus the fork count is set to 0
1776        // for each forked block, the fork count is incremented
1777        // if only one block is sent, the block is just moved and not forked
1778        let mut fork_count = if forked || outbound_receiver_groups.len() > 1 {
1779            Some(0)
1780        } else {
1781            None
1782        };
1783
1784        block.set_bounce_back(false);
1785
1786        for (receiver_socket, endpoints) in outbound_receiver_groups {
1787            if let Some(socket_uuid) = receiver_socket {
1788                self.send_block_to_endpoints_via_socket(
1789                    block.clone(),
1790                    &socket_uuid,
1791                    &endpoints,
1792                    fork_count,
1793                );
1794            } else {
1795                error!(
1796                    "{}: cannot send block, no receiver sockets found for endpoints {:?}",
1797                    self.endpoint,
1798                    endpoints.iter().map(|e| e.to_string()).collect::<Vec<_>>()
1799                );
1800                unreachable_endpoints.extend(endpoints);
1801            }
1802            // increment fork_count if Some
1803            if let Some(count) = fork_count {
1804                fork_count = Some(count + 1);
1805            }
1806        }
1807
1808        if !unreachable_endpoints.is_empty() {
1809            return Err(unreachable_endpoints);
1810        }
1811        Ok(())
1812    }
1813
1814    /// Sends a block via a socket to a list of endpoints.
1815    /// Before the block is sent, it is modified to include the list of endpoints as receivers.
1816    fn send_block_to_endpoints_via_socket(
1817        &self,
1818        mut block: DXBBlock,
1819        socket_uuid: &ComInterfaceSocketUUID,
1820        endpoints: &[Endpoint],
1821        // currently only used for trace debugging (TODO: put behind debug flag)
1822        fork_count: Option<usize>,
1823    ) {
1824        block.set_receivers(endpoints);
1825
1826        // assuming the distance was already increment during redirect, we
1827        // effectively decrement the block distance by 1 if it is a bounce back
1828        if block.is_bounce_back() {
1829            block.routing_header.distance -= 2;
1830        }
1831
1832        // if type is Trace or TraceBack, add the outgoing socket to the hops
1833        match block.block_header.flags_and_timestamp.block_type() {
1834            BlockType::Trace | BlockType::TraceBack => {
1835                let distance = block.routing_header.distance;
1836                let new_fork_nr = self.calculate_fork_nr(&block, fork_count);
1837                let bounce_back = block.is_bounce_back();
1838
1839                self.add_hop_to_block_trace_data(
1840                    &mut block,
1841                    NetworkTraceHop {
1842                        endpoint: self.endpoint.clone(),
1843                        distance,
1844                        socket: NetworkTraceHopSocket::new(
1845                            self.get_com_interface_from_socket_uuid(
1846                                socket_uuid,
1847                            )
1848                            .borrow_mut()
1849                            .get_properties(),
1850                            socket_uuid.clone(),
1851                        ),
1852                        direction: NetworkTraceHopDirection::Outgoing,
1853                        fork_nr: new_fork_nr,
1854                        bounce_back,
1855                    },
1856                );
1857            }
1858            _ => {}
1859        }
1860
1861        let socket = self.get_socket_by_uuid(socket_uuid);
1862        let mut socket_ref = socket.try_lock().unwrap();
1863
1864        let is_broadcast = endpoints
1865            .iter()
1866            .any(|e| e == &Endpoint::ANY_ALL_INSTANCES || e == &Endpoint::ANY);
1867
1868        if is_broadcast
1869            && let Some(direct_endpoint) = &socket_ref.direct_endpoint
1870            && (direct_endpoint == &self.endpoint
1871                || direct_endpoint == &Endpoint::LOCAL)
1872        {
1873            return;
1874        }
1875        for interceptor in self.outgoing_block_interceptors.borrow().iter() {
1876            interceptor(&block, socket_uuid, endpoints);
1877        }
1878        match &block.to_bytes() {
1879            Ok(bytes) => {
1880                info!(
1881                    "Sending block to socket {}: {}",
1882                    socket_uuid,
1883                    endpoints.iter().map(|e| e.to_string()).join(", ")
1884                );
1885
1886                // TODO #190: resend block if socket failed to send
1887                socket_ref.queue_outgoing_block(bytes);
1888            }
1889            Err(err) => {
1890                error!("Failed to convert block to bytes: {err:?}");
1891            }
1892        }
1893    }
1894
1895    /// Updates all interfaces to handle reconnections if the interface can be reconnected
1896    /// or remove the interface if it cannot be reconnected.
1897    fn update_interfaces(&self) {
1898        let mut to_remove = Vec::new();
1899        for (interface, _) in self.interfaces.borrow().values() {
1900            let uuid = interface.borrow().get_uuid().clone();
1901            let state = interface.borrow().get_state();
1902
1903            // If the interface has been proactively destroyed, remove it from the hub
1904            // and clean up the sockets. This happens when the user calls the destroy
1905            // method on the interface and not the remove_interface on the ComHub.
1906            if state.is_destroyed() {
1907                info!("Destroying interface on the ComHub {uuid}");
1908                to_remove.push(uuid);
1909            } else if state.is_not_connected()
1910                && interface.borrow_mut().get_properties().shall_reconnect()
1911            {
1912                // If the interface is disconnected and the interface has
1913                // reconnection enabled, check if the interface should be reconnected
1914                let interface_rc = interface.clone();
1915                let mut interface = interface.borrow_mut();
1916
1917                let already_connecting =
1918                    interface.get_state() == ComInterfaceState::Connecting;
1919
1920                if !already_connecting {
1921                    let config = interface.get_properties_mut();
1922
1923                    let reconnect_now = match &config.reconnection_config {
1924                        ReconnectionConfig::InstantReconnect => true,
1925                        ReconnectionConfig::ReconnectWithTimeout { timeout } => {
1926                            ReconnectionConfig::check_reconnect_timeout(
1927                                config.close_timestamp,
1928                                timeout,
1929                            )
1930                        }
1931                        ReconnectionConfig::ReconnectWithTimeoutAndAttempts {
1932                            timeout,
1933                            attempts,
1934                        } => {
1935                            let max_attempts = attempts;
1936
1937                            // check if the attempts are not exceeded
1938                            let attempts = config.reconnect_attempts.unwrap_or(0);
1939                            let attempts = attempts + 1;
1940                            if attempts > *max_attempts {
1941                                to_remove.push(uuid.clone());
1942                                return;
1943                            }
1944
1945                            config.reconnect_attempts = Some(attempts);
1946
1947                            ReconnectionConfig::check_reconnect_timeout(
1948                                config.close_timestamp,
1949                                timeout,
1950                            )
1951                        }
1952                        ReconnectionConfig::NoReconnect => false,
1953                    };
1954                    if reconnect_now {
1955                        debug!("Reconnecting interface {uuid}");
1956                        interface.set_state(ComInterfaceState::Connecting);
1957                        spawn_with_panic_notify(
1958                            &self.async_context,
1959                            reconnect_interface_task(interface_rc),
1960                        );
1961                    } else {
1962                        debug!("Not reconnecting interface {uuid}");
1963                    }
1964                }
1965            }
1966        }
1967
1968        for uuid in to_remove {
1969            self.cleanup_interface(uuid);
1970        }
1971    }
1972
1973    /// Updates all known sockets for all interfaces to update routing
1974    /// information, remove deleted sockets and add new sockets and endpoint relations
1975    async fn update_sockets(&self) {
1976        let mut new_sockets = Vec::new();
1977        let mut deleted_sockets = Vec::new();
1978        let mut registered_sockets = Vec::new();
1979
1980        for (interface, priority) in self.interfaces.borrow().values() {
1981            let socket_updates = interface.clone().borrow().get_sockets();
1982            let mut socket_updates = socket_updates.try_lock().unwrap();
1983
1984            registered_sockets
1985                .extend(socket_updates.socket_registrations.drain(..));
1986            new_sockets.extend(
1987                socket_updates.new_sockets.drain(..).map(|s| (s, *priority)),
1988            );
1989            deleted_sockets.extend(socket_updates.deleted_sockets.drain(..));
1990        }
1991
1992        for (socket, priority) in new_sockets {
1993            self.add_socket(socket.clone(), priority).await;
1994        }
1995        for socket_uuid in deleted_sockets {
1996            self.delete_socket(&socket_uuid);
1997        }
1998        for (socket_uuid, distance, endpoint) in registered_sockets {
1999            let socket = self.get_socket_by_uuid(&socket_uuid);
2000            self.register_socket_endpoint(socket, endpoint.clone(), distance)
2001                .unwrap_or_else(|e| {
2002                    error!(
2003                        "Failed to register socket {socket_uuid} for endpoint {endpoint} {e:?}"
2004                    );
2005                });
2006        }
2007    }
2008
2009    /// Collects incoming data slices from all sockets. The sockets will call their
2010    /// BlockCollector to collect the data into blocks.
2011    async fn collect_incoming_data(&self) {
2012        // update sockets, collect incoming data into full blocks
2013        for (socket, _) in self.sockets.borrow().values() {
2014            let mut socket_ref = socket.try_lock().unwrap();
2015            socket_ref.collect_incoming_data().await;
2016        }
2017    }
2018
2019    /// Collects all blocks from the receive queues of all sockets and process them
2020    /// in the receive_block method.
2021    async fn receive_incoming_blocks(&self) {
2022        let mut blocks = vec![];
2023        // iterate over all sockets
2024        for (socket, _) in self.sockets.borrow().values() {
2025            let mut socket_ref = socket.try_lock().unwrap();
2026            let uuid = socket_ref.uuid.clone();
2027            let block_queue = socket_ref.get_incoming_block_queue();
2028            blocks.push((uuid, block_queue.drain(..).collect::<Vec<_>>()));
2029        }
2030
2031        for (uuid, blocks) in blocks {
2032            for block in blocks.iter() {
2033                self.receive_block(block, uuid.clone()).await;
2034            }
2035        }
2036    }
2037
2038    /// Sends all queued blocks from all interfaces.
2039    fn flush_outgoing_blocks(&self) {
2040        let interfaces = self.interfaces.borrow();
2041        for (interface, _) in interfaces.values() {
2042            com_interface::flush_outgoing_blocks(
2043                interface.clone(),
2044                &self.async_context,
2045            );
2046        }
2047    }
2048}
2049
2050#[derive(Default, PartialEq, Debug)]
2051pub enum ResponseResolutionStrategy {
2052    /// Promise.allSettled
2053    /// - For know fixed receivers:
2054    ///   return after all known sends are finished (either success or error
2055    ///   if block could not be sent / timed out)
2056    /// - For unknown receiver count:
2057    ///   return after timeout
2058    #[default]
2059    ReturnAfterAllSettled,
2060
2061    /// Promise.all
2062    /// - For know fixed receivers:
2063    ///   return after all known sends are finished successfully
2064    ///   return immediately if one send fails early (e.g. endpoint not reachable)
2065    /// - For unknown receiver count:
2066    ///   return after timeout
2067    ///
2068    ReturnOnAnyError,
2069
2070    /// Promise.any
2071    /// Return after first successful response received
2072    ReturnOnFirstResponse,
2073
2074    /// Promise.race
2075    /// Return after first response received (success or error)
2076    ReturnOnFirstResult,
2077}
2078
2079#[derive(Default, Debug)]
2080pub enum ResponseTimeout {
2081    #[default]
2082    Default,
2083    Custom(Duration),
2084}
2085
2086impl ResponseTimeout {
2087    pub fn unwrap_or_default(self, default: Duration) -> Duration {
2088        match self {
2089            ResponseTimeout::Default => default,
2090            ResponseTimeout::Custom(timeout) => timeout,
2091        }
2092    }
2093}
2094
2095#[derive(Default, Debug)]
2096pub struct ResponseOptions {
2097    pub resolution_strategy: ResponseResolutionStrategy,
2098    pub timeout: ResponseTimeout,
2099}
2100
2101impl ResponseOptions {
2102    pub fn new_with_resolution_strategy(
2103        resolution_strategy: ResponseResolutionStrategy,
2104    ) -> Self {
2105        Self {
2106            resolution_strategy,
2107            ..ResponseOptions::default()
2108        }
2109    }
2110
2111    pub fn new_with_timeout(timeout: Duration) -> Self {
2112        Self {
2113            timeout: ResponseTimeout::Custom(timeout),
2114            ..ResponseOptions::default()
2115        }
2116    }
2117}
2118
2119#[derive(Debug)]
2120pub enum Response {
2121    ExactResponse(Endpoint, IncomingSection),
2122    ResolvedResponse(Endpoint, IncomingSection),
2123    UnspecifiedResponse(IncomingSection),
2124}
2125
2126impl Response {
2127    pub fn take_incoming_section(self) -> IncomingSection {
2128        match self {
2129            Response::ExactResponse(_, section) => section,
2130            Response::ResolvedResponse(_, section) => section,
2131            Response::UnspecifiedResponse(section) => section,
2132        }
2133    }
2134}
2135
2136#[derive(Debug)]
2137pub enum ResponseError {
2138    NoResponseAfterTimeout(Endpoint, Duration),
2139    NotReachable(Endpoint),
2140    EarlyAbort(Endpoint),
2141}
2142
2143impl Display for ResponseError {
2144    fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
2145        match self {
2146            ResponseError::NoResponseAfterTimeout(endpoint, duration) => {
2147                core::write!(
2148                    f,
2149                    "No response after timeout ({}s) for endpoint {}",
2150                    duration.as_secs(),
2151                    endpoint
2152                )
2153            }
2154            ResponseError::NotReachable(endpoint) => {
2155                core::write!(f, "Endpoint {endpoint} is not reachable")
2156            }
2157            ResponseError::EarlyAbort(endpoint) => {
2158                core::write!(f, "Early abort for endpoint {endpoint}")
2159            }
2160        }
2161    }
2162}