Skip to main content

datex_core/network/com_hub/
mod.rs

1use crate::{
2    channel::mpsc::UnboundedSender,
3    collections::HashMap,
4    global::protocol_structures::{
5        block_header::BlockType, routing_header::SignatureType,
6    },
7    network::com_hub::{
8        errors::{ComHubError, SocketEndpointRegistrationError},
9        managers::com_interface_manager::ComInterfaceManager,
10        network_response::{
11            Response, ResponseError, ResponseOptions,
12            ResponseResolutionStrategy,
13        },
14        options::ComHubOptions,
15    },
16    task,
17    utils::maybe_async::SyncOrAsyncResolved,
18};
19
20use crate::prelude::*;
21
22pub mod managers;
23
24pub mod metadata;
25use crate::network::com_hub::managers::socket_manager::{
26    ComInterfaceSocketManager, SocketCloseReceiver, SocketData,
27};
28
29pub mod errors;
30pub mod network_response;
31
32pub mod network_tracing;
33use crate::network::com_interfaces::com_interface::socket::ComInterfaceSocketUUID;
34use core::{
35    cell::RefCell,
36    cmp::PartialEq,
37    fmt::{Debug, Formatter},
38    panic,
39    pin::Pin,
40    result::Result,
41};
42use itertools::Itertools;
43use log::{debug, error, info, warn};
44use serde::{Deserialize, Serialize};
45
46pub mod options;
47use crate::{
48    global::dxb_block::{DXBBlock, IncomingSection},
49    network::{
50        block_handler::{BlockHandler, BlockHistoryData},
51        com_hub::network_tracing::{
52            NetworkTraceHop, NetworkTraceHopDirection, NetworkTraceHopSocket,
53        },
54    },
55    values::core_values::endpoint::Endpoint,
56};
57pub mod com_hub_interface;
58#[cfg(test)]
59pub mod test_utils;
60mod com_hub_socket;
61
62use crate::{
63    collections::HashSet,
64    crypto::CryptoImpl,
65    global::dxb_block::{BlockId, SignatureValidationError},
66    network::{
67        com_hub::managers::com_interface_manager::InterfaceCloseReceiver,
68        com_interfaces::{
69            block_collector::BlockCollector,
70            com_interface::{
71                ComInterfaceUUID,
72                factory::{
73                    CloseAsyncCallback, ComInterfaceConfiguration,
74                    SendCallback, SendFailure, SendSuccess, SocketDataIterator,
75                    SocketProperties,
76                },
77                properties::InterfaceDirection,
78            },
79        },
80    },
81    utils::{
82        async_iterators::async_next_pin_box,
83        maybe_async::{MaybeAsync, SyncOrAsync, SyncOrAsyncResult},
84        task_manager::TaskManager,
85    },
86};
87use async_select::select;
88use datex_crypto_facade::crypto::Crypto;
89use futures::channel::{oneshot, oneshot::Sender};
90use futures_util::FutureExt;
91use crate::time::now_ms;
92
93pub type IncomingBlockInterceptor =
94    Box<dyn Fn(&DXBBlock, &ComInterfaceSocketUUID) + 'static>;
95
96pub type OutgoingBlockInterceptor =
97    Box<dyn Fn(&DXBBlock, &ComInterfaceSocketUUID, &[Endpoint]) + 'static>;
98
99pub struct ComHub {
100    /// the runtime endpoint of the hub (@me)
101    pub endpoint: Endpoint,
102
103    /// ComHub configuration options
104    pub options: ComHubOptions,
105
106    socket_manager: ComInterfaceSocketManager,
107    interfaces_manager: ComInterfaceManager,
108
109    pub block_handler: BlockHandler,
110
111    incoming_block_interceptors: RefCell<Vec<IncomingBlockInterceptor>>,
112    outgoing_block_interceptors: RefCell<Vec<OutgoingBlockInterceptor>>,
113
114    pub task_manager: TaskManager,
115}
116
117impl Debug for ComHub {
118    fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
119        f.debug_struct("ComHub")
120            .field("endpoint", &self.endpoint)
121            .field("options", &self.options)
122            .finish()
123    }
124}
125
126#[derive(
127    Debug, Clone, Copy, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize,
128)]
129#[cfg_attr(feature = "wasm_runtime", derive(tsify::Tsify))]
130pub enum InterfacePriority {
131    /// The interface will not be used for fallback routing if no other interface is available
132    /// This is useful for interfaces which cannot communicate with the outside world or are not
133    /// capable of redirecting large amounts of data
134    None,
135    /// The interface will be used for fallback routing if no other interface is available,
136    /// depending on the defined priority
137    /// A higher number means a higher priority
138    Priority(u16),
139}
140
141impl From<Option<u16>> for InterfacePriority {
142    fn from(value: Option<u16>) -> Self {
143        match value {
144            Some(priority) => InterfacePriority::Priority(priority),
145            None => InterfacePriority::default(),
146        }
147    }
148}
149
150impl Default for InterfacePriority {
151    fn default() -> Self {
152        InterfacePriority::Priority(0)
153    }
154}
155
156pub struct ReceiveBlockPreprocessResult {
157    relayed_block: Option<DXBBlock>,
158    own_received_block: Option<DXBBlock>,
159    block_id_for_history: Option<BlockId>,
160    is_for_own: bool,
161}
162
163pub type BlockSendSyncOrAsyncResult<F> =
164    SyncOrAsyncResult<Option<Vec<Vec<u8>>>, (), Vec<Endpoint>, F>;
165
166pub type PrepareOwnBlockFuture<'a> =
167    Pin<Box<dyn Future<Output = Result<DXBBlock, ComHubError>> + 'a>>;
168
169pub type PrepareOwnBlockResult<'a> = SyncOrAsyncResult<
170    DXBBlock,
171    DXBBlock,
172    ComHubError,
173    PrepareOwnBlockFuture<'a>,
174>;
175
176pub type ReceiveBlockResult =
177    Result<Option<DXBBlock>, SignatureValidationError>;
178
179impl ComHub {
180    pub fn create(
181        endpoint: impl Into<Endpoint>,
182        incoming_sections_sender: UnboundedSender<IncomingSection>,
183    ) -> (Rc<ComHub>, impl Future<Output = ()>) {
184        let (task_manager, task_future) = TaskManager::create();
185
186        let block_handler = BlockHandler::init(incoming_sections_sender);
187        let com_hub = Rc::new(ComHub {
188            endpoint: endpoint.into(),
189            options: ComHubOptions::default(),
190            block_handler,
191            socket_manager: ComInterfaceSocketManager::new(),
192            interfaces_manager: ComInterfaceManager::default(),
193            incoming_block_interceptors: RefCell::new(Vec::new()),
194            outgoing_block_interceptors: RefCell::new(Vec::new()),
195            task_manager,
196        });
197
198        (com_hub, task_future)
199    }
200
201    /// Registers the handle_sockets_task for the given ComInterfaceConfiguration
202    pub(crate) fn register_com_interface_handler(
203        self: Rc<Self>,
204        com_interface_configuration: ComInterfaceConfiguration,
205        priority: InterfacePriority,
206        close_receiver: InterfaceCloseReceiver,
207    ) -> Option<impl Future<Output = Result<(), ()>>> {
208        let (socket_ready_senders, socket_ready_fut) =
209            if com_interface_configuration.has_single_socket {
210                let (r, f) = ComHub::get_ready_senders();
211                (Some(r), Some(f))
212            } else {
213                (None, None)
214            };
215
216        self.task_manager
217            .register_task(self.clone().handle_sockets_task(
218                com_interface_configuration,
219                priority,
220                close_receiver,
221                socket_ready_senders,
222            ));
223        socket_ready_fut
224    }
225
226    /// Sets up two oneshot receivers:
227    /// The first one should resolve once the socket is fully connected, meaning the peer endpoint of the socket is known
228    /// The second one should resolve once the local endpoint has sent a Hello block
229    /// Returns a future that resolves once both oneshot receivers have resolved
230    fn get_ready_senders() -> (
231        (Sender<Result<(), ()>>, Sender<Result<(), ()>>),
232        impl Future<Output = Result<(), ()>>,
233    ) {
234        let (socket_ready_sender, socket_ready_receiver) = oneshot::channel();
235        let (socket_hello_sent_sender, socket_hello_sent_receiver) =
236            oneshot::channel();
237        let fut = async move {
238            match socket_ready_receiver.await {
239                Ok(Ok(())) => match socket_hello_sent_receiver.await {
240                    Ok(Ok(())) => Ok(()),
241                    Ok(Err(())) => Err(()),
242                    Err(_) => Err(()),
243                },
244                Ok(Err(())) => Err(()),
245                Err(_) => Err(()),
246            }
247        };
248        ((socket_ready_sender, socket_hello_sent_sender), fut)
249    }
250
251    /// Iterates over the given NewSocketsIterator for an interface and handles each socket
252    async fn handle_sockets_task(
253        self: Rc<Self>,
254        com_interface_configuration: ComInterfaceConfiguration,
255        interface_priority: InterfacePriority,
256        interface_close_receiver: InterfaceCloseReceiver,
257        ready_senders: Option<(Sender<Result<(), ()>>, Sender<Result<(), ()>>)>,
258    ) {
259        let com_interface_uuid = com_interface_configuration.uuid();
260        let mut iterator = com_interface_configuration.new_sockets_iterator;
261        let com_interface_properties =
262            com_interface_configuration.properties.clone();
263        let cleanup_callback = com_interface_configuration.close_async_callback;
264
265        let (mut socket_ready_sender, mut hello_sent_sender) =
266            match ready_senders {
267                Some((x, y)) => (Some(x), Some(y)),
268                None => (None, None),
269            };
270
271        let closed_sender = select!(
272            _ = async {
273                while let Some(socket) = async_next_pin_box(&mut iterator).await {
274
275                    if !self.interfaces_manager.has_interface(&com_interface_uuid) {
276                        info!("Interface {} was removed while waiting for new socket connections, stopping socket handler task", com_interface_uuid);
277                        break;
278                    }
279
280                    match socket {
281                        Ok(socket_configuration) => {
282                            let socket_iterator = socket_configuration.iterator;
283                            let send_callback = socket_configuration.send_callback;
284                            let cleanup_callback = socket_configuration.close_async_callback;
285                            let socket_properties = socket_configuration.properties;
286                            let _socket_uuid = socket_properties.uuid();
287                            let _socket_direction = socket_properties.direction.clone();
288                            let (socket_close_sender, socket_close_receiver) = oneshot::channel();
289
290                            // store socket info
291                            let _res = self.socket_manager.register_socket(
292                                SocketData {
293                                    socket_properties: socket_properties.clone(),
294                                    interface_uuid: com_interface_uuid.clone(),
295                                    interface_properties: com_interface_properties
296                                        .clone(),
297                                    send_callback,
298                                    endpoints: HashSet::new(),
299                                    close_sender: Some(socket_close_sender),
300                                    socket_ready_sender: socket_ready_sender.take(),
301                                },
302                                interface_priority,
303                            );
304                            // TODO #729: handle error
305
306                            let self_clone = self.clone();
307                            if let Some(socket_iterator) = socket_iterator {
308                                self_clone.task_manager.register_task(
309                                    self_clone.clone().handle_socket_task(
310                                        socket_properties,
311                                        socket_iterator,
312                                        com_interface_uuid.clone(),
313                                        com_interface_properties.auto_identify,
314                                        socket_close_receiver,
315                                        cleanup_callback,
316                                        hello_sent_sender.take()
317                                    ),
318                                );
319                            }
320                        }
321                        Err(e) => {
322                            error!("Error creating socket from iterator: {:?}", e);
323                            break;
324                        }
325                    }
326                }
327            } => {
328                None
329            },
330            sender = interface_close_receiver => {
331                Some(sender.unwrap())
332            }
333        );
334
335        // call cleanup callback for interface if defined
336        if let Some(cleanup_callback) = cleanup_callback {
337            cleanup_callback().await;
338        }
339
340        // only if interface still exists
341        if self.interfaces_manager.has_interface(&com_interface_uuid) {
342            // indicate that interface is no longer waiting for new socket connections (e.g. for single socket interfaces)
343            self.interfaces_manager
344                .set_interface_waiting_for_socket_connections(
345                    &com_interface_uuid,
346                    false,
347                );
348
349            // if interface has no sockets, it can be destroyed
350            if !self
351                .socket_manager
352                .are_sockets_registered_for_interface(&com_interface_uuid)
353            {
354                self.interfaces_manager
355                    .cleanup_interface(&com_interface_uuid)
356                    .unwrap();
357                info!(
358                    "Destroyed interface {} as it has no sockets registered",
359                    com_interface_uuid
360                );
361            }
362        }
363
364        if let Some(closed_sender) = closed_sender {
365            closed_sender.send(()).unwrap();
366        }
367
368        // if socket_ready_sender is still Some, socket was never connected, send Error
369        if let Some(socket_ready_sender) = socket_ready_sender {
370            let _ = socket_ready_sender.send(Err(()));
371        }
372        // if hello_sent_sender is still Some, hello was never sent, send Error
373        if let Some(hello_sent_sender) = hello_sent_sender {
374            let _ = hello_sent_sender.send(Err(()));
375        }
376    }
377
378    /// Sends a hello block via the given socket if the socket direction allows sending and auto_identify is enabled for the interface
379    async fn send_socket_hello(
380        self: Rc<Self>,
381        socket_uuid: ComInterfaceSocketUUID,
382        socket_direction: InterfaceDirection,
383        auto_identify: bool,
384        hello_sent_sender: Option<Sender<Result<(), ()>>>,
385    ) {
386        let send_hello = socket_direction.can_send() && auto_identify; // Only send hello if auto_identify is enabled
387
388        if send_hello {
389            info!("Saying hello to {}", socket_uuid);
390            if let Err(err) = self.send_hello_block(socket_uuid).await {
391                error!("Failed to send hello block: {:?}", err);
392                hello_sent_sender.map(|sender| sender.send(Err(())));
393            } else {
394                hello_sent_sender.map(|sender| sender.send(Ok(())));
395            }
396        } else {
397            hello_sent_sender.map(|sender| sender.send(Ok(())));
398        }
399    }
400
401    /// Handles incoming data from the given SocketDataIterator
402    async fn handle_socket_task(
403        self: Rc<Self>,
404        socket_properties: SocketProperties,
405        mut socket_iterator: SocketDataIterator,
406        com_interface_uuid: ComInterfaceUUID,
407        auto_identify: bool,
408        close_receiver: SocketCloseReceiver,
409        cleanup_callback: Option<CloseAsyncCallback>,
410        hello_sent_sender: Option<Sender<Result<(), ()>>>,
411    ) {
412        info!("start handle socket task");
413
414        // send hello block in background task
415        self.task_manager
416            .register_task(self.clone().send_socket_hello(
417                socket_properties.uuid(),
418                socket_properties.direction.clone(),
419                auto_identify,
420                hello_sent_sender,
421            ));
422
423        let (mut bytes_sender, block_iterator) = BlockCollector::create();
424        let mut block_iterator = Box::pin(block_iterator);
425
426        let closed_sender = select!(
427            _ = async {
428                loop {
429                    select! {
430                        // receive new block data from socket
431                        data = async_next_pin_box(&mut socket_iterator).fuse() => {
432                            match data {
433
434                                // next data block
435                                Some(Ok(data)) => {
436                                    // send data to block collector
437                                    if let Err(e) = bytes_sender.start_send(data) {
438                                        error!("Error sending data to BlockCollector: {:?}", e);
439                                        break;
440                                    }
441                                }
442
443                                // got error
444                                Some(Err(_)) => {
445                                    error!("Socket {} closed, removing socket", socket_properties.uuid());
446                                    break;
447                                }
448
449                                // no more data, gracefull exit
450                                None => {
451                                    error!("Socket {} closed (iterator finished), removing socket", socket_properties.uuid());
452                                    break;
453                                }
454                            }
455                        },
456                        // receive new blocks from block collector
457                        Some(block) = async_next_pin_box(&mut block_iterator).fuse() => {
458                            // spawn as separate task to handle incoming block
459                            // this improves performance as multiple blocks can be handled in parallel
460                            // it's also required because the size of this future gets to big for embedded targets (heap allocation fails)
461                            self.clone().handle_incoming_block_async(block, socket_properties.uuid()).await;
462                        },
463
464                        complete => break,
465                    }
466                }
467            } => None,
468            closed_sender = close_receiver => {
469                info!("received socket close signal for socket removing socket");
470                Some(closed_sender.unwrap())
471            }
472        );
473
474        // call cleanup callback for socket if defined
475        if let Some(cleanup_callback) = cleanup_callback {
476            cleanup_callback().await;
477        }
478
479        // socket closed, remove
480        self.socket_manager
481            .cleanup_socket(&socket_properties.uuid());
482
483        // only if interface still exists
484        if self.interfaces_manager.has_interface(&com_interface_uuid) {
485            // TODO #730: check if any other sockets are still registered for the interface, if not
486            // and if interface is no longer waiting for new socket connections (e.g. single socket interface), also remove the interface
487            if !self
488                .interfaces_manager
489                .is_interface_waiting_for_socket_connections(
490                    &com_interface_uuid,
491                )
492            {
493                self.interfaces_manager
494                    .cleanup_interface(&com_interface_uuid)
495                    .unwrap();
496                info!(
497                    "Destroyed interface {} as it is no longer waiting for socket connections",
498                    com_interface_uuid
499                );
500                // TODO #731: reconnect logic?
501            }
502        }
503
504        if let Some(closed_sender) = closed_sender {
505            closed_sender.send(()).unwrap();
506        }
507    }
508
509    /// Handles an incoming block from a socket (async)
510    async fn handle_incoming_block_async(
511        self: Rc<Self>,
512        block: DXBBlock,
513        socket_uuid: ComInterfaceSocketUUID,
514    ) {
515        // handle incoming block
516        let receive_block_result = self
517            .clone()
518            .receive_block(block, socket_uuid)
519            .into_future()
520            .await;
521
522        let own_received_block = match receive_block_result {
523            Ok(own_received_block) => own_received_block,
524            Err(e) => {
525                error!("Failed to validate block signature: {:?}", e);
526                return;
527            }
528        };
529
530        // handle own block if some
531        if let Some(own_received_block) = own_received_block {
532            self.block_handler.handle_incoming_block(own_received_block);
533        }
534    }
535
536    /// Checks if the given endpoint is the local endpoint, matching instances as well
537    pub fn is_local_endpoint_exact(&self, endpoint: &Endpoint) -> bool {
538        &self.endpoint == endpoint || endpoint.is_local()
539    }
540
541    /// Register an incoming block interceptor
542    pub fn register_incoming_block_interceptor<F>(&self, interceptor: F)
543    where
544        F: Fn(&DXBBlock, &ComInterfaceSocketUUID) + 'static,
545    {
546        self.incoming_block_interceptors
547            .borrow_mut()
548            .push(Box::new(interceptor));
549    }
550
551    /// Register an outgoing block interceptor
552    pub fn register_outgoing_block_interceptor<F>(&self, interceptor: F)
553    where
554        F: Fn(&DXBBlock, &ComInterfaceSocketUUID, &[Endpoint]) + 'static,
555    {
556        self.outgoing_block_interceptors
557            .borrow_mut()
558            .push(Box::new(interceptor));
559    }
560
561    /// Receives a block from a socket and handles it accordingly
562    /// Returns a MaybeAsync which is async in the following cases:
563    /// * the block signature is validated because the block is for own endpoint and signature is set
564    /// * the block needs to be relayed to other endpoints or is a trace block, which requires async handling for the relay/trace logic
565    /// The MaybeAsync returns the own received block if the block is for own endpoint and signature is valid, otherwise None
566    // FIXME #732: this seams to generate a very big future which causes heap allocation to fail on embedded targets (!?)
567    pub(crate) fn receive_block(
568        self: Rc<Self>,
569        block: DXBBlock,
570        socket_uuid: ComInterfaceSocketUUID,
571    ) -> MaybeAsync<ReceiveBlockResult, impl Future<Output = ReceiveBlockResult>>
572    {
573        // preprocess the block and get relay receivers and own received block if any
574        let preprocess_result =
575            self.receive_block_preprocess(&socket_uuid, block);
576
577        let self_clone = self.clone();
578
579        // validate block signature if sent to own endpoint
580        let validation_result = match preprocess_result.own_received_block {
581            // if block is for own endpoint, validate signature if set
582            Some(block) => block
583                .validate_signature()
584                .map(|validation| validation.map(Some)),
585            // if block is not for own endpoint, don't validate signature and don't return own received block
586            None => MaybeAsync::Sync(Ok(None)),
587        };
588
589        validation_result
590            .map(move |validation_result| {
591                // handle async logic for received blocks if needed
592                let own_received_block = match validation_result {
593                    Ok(block) => block,
594                    Err(e) => return MaybeAsync::Sync(Err(e)),
595                };
596
597                let (trace_block, own_block) = match own_received_block {
598                    Some(block) => {
599                        let block_type = block.block_type();
600                        match block_type {
601                            BlockType::Trace | BlockType::TraceBack => {
602                                (Some(block), None)
603                            }
604                            _ => (None, Some(block)),
605                        }
606                    }
607                    None => (None, None),
608                };
609
610                if preprocess_result.relayed_block.is_some()
611                    || trace_block.is_some()
612                {
613                    MaybeAsync::Async(async move {
614                        self_clone
615                            .receive_block_async(
616                                trace_block,
617                                preprocess_result.relayed_block,
618                                preprocess_result.block_id_for_history,
619                                socket_uuid,
620                                preprocess_result.is_for_own,
621                            )
622                            .await;
623
624                        Ok(own_block)
625                    })
626                }
627                // otherwise, return directly without async handler
628                else {
629                    MaybeAsync::Sync(Ok(own_block))
630                }
631            })
632            .flatten()
633    }
634
635    /// Preprocesses a received block and returns relay receivers and own received block if any
636    fn receive_block_preprocess(
637        &self,
638        socket_uuid: &ComInterfaceSocketUUID,
639        block: DXBBlock,
640    ) -> ReceiveBlockPreprocessResult {
641        info!("{} received block: {}", self.endpoint, block);
642
643        for interceptor in self.incoming_block_interceptors.borrow().iter() {
644            interceptor(&block, socket_uuid);
645        }
646
647        let block_type = block.block_header.flags_and_timestamp.block_type();
648
649        // register in block history
650        let is_new_block = !self.block_handler.is_block_in_history(&block);
651
652        // assign endpoint to socket if none is assigned
653        // only if a new block and the sender in not the local endpoint
654        if is_new_block
655            && !self.is_local_endpoint_exact(&block.routing_header.sender)
656        {
657            self.register_socket_endpoint_from_incoming_block(
658                socket_uuid.clone(),
659                &block,
660            );
661        }
662
663        let all_receivers = block.receiver_endpoints();
664        let (relayed_block, own_received_block, is_for_own) =
665            if !all_receivers.is_empty() {
666                let is_for_own = all_receivers.iter().any(|e| {
667                    self.is_local_endpoint_exact(e)
668                        || e == &Endpoint::ANY
669                        || e == &Endpoint::ANY_ALL_INSTANCES
670                });
671
672                // handle blocks for own endpoint
673                let own_received_block =
674                    if is_for_own && block_type != BlockType::Hello {
675                        info!("Block is for this endpoint");
676
677                        Some(block.clone()) // FIXME #733: no clone
678                    } else {
679                        None
680                    };
681
682                // TODO #177: handle this via TTL, not explicitly for Hello blocks
683                let relay_receivers = {
684                    let should_relay =
685                    // don't relay "Hello" blocks sent to own endpoint
686                    !(
687                        is_for_own && block_type == BlockType::Hello
688                    );
689
690                    // relay the block to other endpoints
691                    if should_relay {
692                        let relay_receivers = if is_for_own {
693                            // get all receivers that the block must be relayed to
694                            self.get_remote_receivers(&all_receivers)
695                        } else {
696                            all_receivers
697                        };
698                        if relay_receivers.is_empty() {
699                            None
700                        } else {
701                            Some(relay_receivers)
702                        }
703                    } else {
704                        None
705                    }
706                };
707
708                let relayed_block = relay_receivers
709                    .map(|receivers| block.clone_with_new_receivers(receivers));
710
711                (relayed_block, own_received_block, is_for_own)
712            } else {
713                (None, None, false)
714            };
715
716        // add to block history
717        let block_id_for_history = if is_new_block {
718            Some(block.get_block_id())
719        } else {
720            None
721        };
722
723        ReceiveBlockPreprocessResult {
724            relayed_block,
725            own_received_block,
726            block_id_for_history,
727            is_for_own,
728        }
729    }
730
731    /// Handles async logic for received blocks (trace blocks, redirects to other endpoints)
732    pub(crate) async fn receive_block_async(
733        self: Rc<Self>,
734        trace_block: Option<DXBBlock>,
735        relayed_block: Option<DXBBlock>,
736        block_id_for_history: Option<BlockId>,
737        socket_uuid: ComInterfaceSocketUUID,
738        is_for_own: bool,
739    ) {
740        // handle trace block asynchronously
741        if let Some(block) = trace_block {
742            info!("Handling trace block asynchronously");
743
744            match block.block_type() {
745                BlockType::Trace => {
746                    self.handle_trace_block(&block, socket_uuid.clone()).await;
747                }
748                BlockType::TraceBack => {
749                    self.handle_trace_back_block(&block, socket_uuid.clone());
750                }
751                _ => unreachable!(), // not a trace block, should never happen
752            }
753        }
754
755        // redirect block to other endpoints
756        if let Some(block) = relayed_block {
757            match block.block_type() {
758                BlockType::Trace | BlockType::TraceBack => {
759                    self.redirect_trace_block(
760                        block,
761                        socket_uuid.clone(),
762                        is_for_own,
763                    )
764                    .await;
765                }
766                _ => {
767                    self.redirect_block(block, socket_uuid.clone(), is_for_own)
768                        .await
769                        .unwrap(); // TODO #734: handle error
770                }
771            }
772        }
773
774        // add to block history
775        if let Some(block_id) = block_id_for_history {
776            self.block_handler
777                .add_block_id_to_history(block_id, Some(socket_uuid));
778        }
779    }
780
781    /// Returns a list of all receivers from a given ReceiverEndpoints
782    /// excluding the local endpoint
783    fn get_remote_receivers(
784        &self,
785        receiver_endpoints: &[Endpoint],
786    ) -> Vec<Endpoint> {
787        receiver_endpoints
788            .iter()
789            .filter(|e| !self.is_local_endpoint_exact(e))
790            .cloned()
791            .collect::<Vec<_>>()
792    }
793
794    /// Registers the socket endpoint from an incoming block
795    /// if the endpoint is not already registered for the socket
796    fn register_socket_endpoint_from_incoming_block(
797        &self,
798        socket_uuid: ComInterfaceSocketUUID,
799        block: &DXBBlock,
800    ) {
801        let mut socket =
802            self.socket_manager.get_socket_by_uuid_mut(&socket_uuid);
803
804        let distance = block.routing_header.distance;
805        let sender = block.routing_header.sender.clone();
806
807        // set as direct endpoint if distance = 0
808        if socket.socket_properties.direct_endpoint.is_none() && distance == 1 {
809            info!(
810                "Setting direct endpoint for socket {}: {}",
811                socket.socket_properties.uuid(),
812                sender
813            );
814            socket.socket_properties.direct_endpoint = Some(sender.clone());
815        }
816        let uuid = socket.socket_properties.uuid().clone();
817
818        drop(socket);
819
820        match self.socket_manager.register_socket_endpoint(
821            uuid,
822            sender.clone(),
823            distance,
824        ) {
825            Err(SocketEndpointRegistrationError::SocketEndpointAlreadyRegistered) => {
826                debug!(
827                    "Socket already registered for endpoint {sender}",
828                );
829            }
830            Err(error) => {
831                core::panic!("Failed to register socket endpoint {sender}: {error:?}");
832            },
833            Ok(_) => { }
834        }
835    }
836
837    /// Prepares a block and relays it to the given receivers.
838    /// The routing distance is incremented by 1.
839    pub(crate) async fn redirect_block(
840        &self,
841        mut block: DXBBlock,
842        incoming_socket: ComInterfaceSocketUUID,
843        // only for debugging traces
844        forked: bool,
845    ) -> Result<(), Vec<Endpoint>> {
846        let receivers = block.receiver_endpoints();
847
848        // check if block has already passed this endpoint (-> bounced back block)
849        // and add to blacklist for all receiver endpoints
850        let history_block_data =
851            self.block_handler.get_block_data_from_history(&block);
852        if history_block_data.is_some() {
853            for receiver in &receivers {
854                if !self.is_local_endpoint_exact(receiver) {
855                    info!(
856                        "{}: Adding socket {} to blacklist for receiver {}",
857                        self.endpoint, incoming_socket, receiver
858                    );
859                    self.socket_manager.add_to_endpoint_blocklist(
860                        receiver.clone(),
861                        &incoming_socket,
862                    );
863                }
864            }
865        }
866
867        // increment distance for next hop
868        block.routing_header.distance += 1;
869
870        // ensure ttl is >= 1
871        // decrease TTL by 1
872        if block.routing_header.ttl > 1 {
873            block.routing_header.ttl -= 1;
874        }
875        // if ttl becomes 0 after decrement drop the block
876        else if block.routing_header.ttl == 1 {
877            block.routing_header.ttl -= 1;
878            warn!("Block TTL expired. Dropping block...");
879            return Ok(());
880        // else ttl must be zero
881        } else {
882            warn!("Block TTL expired. Dropping block...");
883            return Ok(());
884        }
885
886        let mut prefer_incoming_socket_for_bounce_back = false;
887        // if we are the original sender of the block, don't send again (prevent loop) and send
888        // bounce back block with all receivers
889        let res = {
890            if self.is_local_endpoint_exact(&block.routing_header.sender) {
891                // if not bounce back block, directly send back to incoming socket (prevent loop)
892                prefer_incoming_socket_for_bounce_back =
893                    !block.is_bounce_back();
894                Err(receivers.to_vec())
895            } else {
896                let mut excluded_sockets = vec![incoming_socket.clone()];
897                if let Some(BlockHistoryData {
898                    original_socket_uuid: Some(original_socket_uuid),
899                }) = &history_block_data
900                {
901                    excluded_sockets.push(original_socket_uuid.clone())
902                }
903                self.send_block_async(block.clone(), excluded_sockets, forked)
904                    .await
905            }
906        };
907
908        // send block for unreachable endpoints back to the sender
909        if let Err(unreachable_endpoints) = res {
910            // try to send back to original socket
911            // if already in history, get original socket from history
912            // otherwise, directly send back to the incoming socket
913            let send_back_socket = if !prefer_incoming_socket_for_bounce_back
914                && let Some(history_block_data) = history_block_data
915            {
916                history_block_data.original_socket_uuid
917            } else {
918                Some(incoming_socket.clone())
919            };
920
921            // If a send_back_socket is set, the original block is not from this endpoint,
922            // so we can send it back to the original socket
923            if let Some(send_back_socket) = send_back_socket {
924                // never send a bounce back block back again to the incoming socket
925                if block.is_bounce_back() && send_back_socket == incoming_socket
926                {
927                    warn!(
928                        "{}: Tried to send bounce back block back to incoming socket, but this is not allowed",
929                        self.endpoint
930                    );
931                    Ok(())
932                } else if let Some(socket) =
933                    self.socket_manager.get_socket_by_uuid(&send_back_socket)
934                    && socket.socket_properties.direction.can_send()
935                {
936                    block.set_bounce_back(true);
937                    self
938                        .send_block_to_endpoints_via_socket(
939                            block,
940                            send_back_socket,
941                            unreachable_endpoints.clone(),
942                            if forked { Some(0) } else { None },
943                        )
944                        .into_error_future()
945                        .await
946                    .map_or(Ok(()), |e| {
947                        error!(
948                            "{}: Failed to send bounce back block to socket: {:?}",
949                            self.endpoint, e
950                        );
951                        Err(unreachable_endpoints)
952                    })
953                } else {
954                    error!(
955                        "Tried to send bounce back block, but cannot send back to incoming socket"
956                    );
957                    Err(unreachable_endpoints)
958                }
959            }
960            // Otherwise, the block originated from this endpoint, we can just call send again
961            // and try to send it via other remaining sockets that are not on the blacklist for the
962            // block receiver
963            else {
964                self.send_block_async(block, vec![], forked).await.map_or(Ok(()), |e| {
965                    error!(
966                        "{}: Failed to send bounce back block to socket: {:?}",
967                        self.endpoint, e
968                    );
969                    Err(unreachable_endpoints)
970                })
971            }
972        } else {
973            Ok(())
974        }
975    }
976
977    /// Prepares an own block for sending by setting sender, timestamp, distance and signing if needed.
978    /// Will return either synchronously or asynchronously depending on the signature type.
979    pub fn prepare_own_block(
980        &self,
981        mut block: DXBBlock,
982    ) -> PrepareOwnBlockResult<'_> {
983        /// Updates the sender and timestamp of the block
984        fn update_sender_and_timestamp(
985            mut block: DXBBlock,
986            endpoint: Endpoint,
987        ) -> Result<DXBBlock, ComHubError> {
988            let now = now_ms();
989            block.routing_header.sender = endpoint;
990            block
991                .block_header
992                .flags_and_timestamp
993                .set_creation_timestamp(now);
994            block.routing_header.distance = 1;
995            Ok(block)
996        }
997
998        match block.routing_header.flags.signature_type() {
999            // SignatureType::None can be handled synchronously
1000            SignatureType::None => SyncOrAsync::Sync(
1001                update_sender_and_timestamp(block, self.endpoint.clone()),
1002            ),
1003
1004            // SignatureType::Unencrypted and SignatureType::Encrypted require async signing
1005            sig_ty => {
1006                let endpoint = self.endpoint.clone();
1007
1008                SyncOrAsync::Async(Box::pin(async move {
1009                    let (pub_key, pri_key) = CryptoImpl::gen_ed25519()
1010                        .await
1011                        .map_err(|_| ComHubError::SignatureCreationError)?;
1012
1013                    let raw_signed =
1014                        [pub_key.clone(), block.body.clone()].concat();
1015
1016                    let hashed_signed = CryptoImpl::hash_sha256(&raw_signed)
1017                        .await
1018                        .map_err(|_| ComHubError::SignatureCreationError)?;
1019
1020                    let signature =
1021                        CryptoImpl::sig_ed25519(&pri_key, &hashed_signed)
1022                            .await
1023                            .map_err(|_| ComHubError::SignatureCreationError)?;
1024
1025                    let sig_bytes: Vec<u8> = match sig_ty {
1026                        SignatureType::Unencrypted => signature.to_vec(),
1027
1028                        SignatureType::Encrypted => {
1029                            let hash =
1030                                CryptoImpl::hkdf_sha256(&pub_key, &[0u8; 16])
1031                                    .await
1032                                    .map_err(|_| {
1033                                        ComHubError::SignatureCreationError
1034                                    })?;
1035
1036                            CryptoImpl::aes_ctr_encrypt(
1037                                &hash, &[0u8; 16], &signature,
1038                            )
1039                            .await
1040                            .map_err(|_| ComHubError::SignatureCreationError)?
1041                            .to_vec()
1042                        }
1043
1044                        SignatureType::None => unreachable!("handled above"),
1045                    };
1046
1047                    block.signature = Some([sig_bytes, pub_key].concat());
1048                    update_sender_and_timestamp(block, endpoint)
1049                }))
1050            }
1051        }
1052    }
1053
1054    /// Public method to send an outgoing block from this endpoint. Called by the runtime.
1055    pub async fn send_own_block_async(
1056        &self,
1057        mut block: DXBBlock,
1058    ) -> Result<(), Vec<Endpoint>> {
1059        block = self
1060            .prepare_own_block(block)
1061            .into_result()
1062            .await
1063            .unwrap_or_else(|e| {
1064                panic!("Error preparing own block for sending: {:?}", e)
1065            });
1066
1067        // add own outgoing block to history
1068        self.block_handler
1069            .add_block_id_to_history(block.get_block_id(), None);
1070        self.send_block_async(block, vec![], false).await
1071    }
1072
1073    /// Sends a block from this endpoint synchronously.
1074    /// If any endpoint can not be reached synchronously, an Err with the list of all endpoints is returned.
1075    /// Otherwise, Ok with optional list of responses is returned.
1076    pub fn send_own_block(
1077        &self,
1078        mut block: DXBBlock,
1079    ) -> Result<Option<Vec<Vec<u8>>>, Vec<Endpoint>> {
1080        let receivers = block.receiver_endpoints();
1081        block = match self.prepare_own_block(block) {
1082            SyncOrAsync::Sync(res) => res.unwrap_or_else(|e| {
1083                panic!("Error preparing own block for sending: {:?}", e)
1084            }),
1085            SyncOrAsync::Async(_) => {
1086                return Err(receivers);
1087            }
1088        };
1089        self.block_handler
1090            .add_block_id_to_history(block.get_block_id(), None);
1091        match self.send_block(block, vec![], false) {
1092            BlockSendSyncOrAsyncResult::Sync(res) => res,
1093            BlockSendSyncOrAsyncResult::Async(_) => Err(receivers),
1094        }
1095    }
1096
1097    /// Sends a block and wait for a response block.
1098    /// Fix number of exact endpoints -> Expected responses are known at send time.
1099    /// TODO #189: make sure that mutating blocks are always send to specific endpoint instances (@jonas/0001), not generic endpoints like @jonas.
1100    /// @jonas -> response comes from a specific instance of @jonas/0001
1101    pub async fn send_own_block_await_response(
1102        &self,
1103        block: DXBBlock,
1104        options: ResponseOptions,
1105    ) -> Vec<Result<Response, ResponseError>> {
1106        let context_id = block.block_header.context_id;
1107        let section_index = block.block_header.section_index;
1108
1109        let mut rx = self
1110            .block_handler
1111            .register_incoming_block_observer(context_id, section_index);
1112
1113        let has_exact_receiver_count = block.has_exact_receiver_count();
1114        let receivers = block.receiver_endpoints();
1115
1116        let res = self.send_own_block_async(block).await;
1117        let failed_endpoints = res.err().unwrap_or_default();
1118
1119        let timeout = options
1120            .timeout
1121            .unwrap_or_default(self.options.default_receive_timeout);
1122
1123        // return fixed number of responses
1124        if has_exact_receiver_count {
1125            // if resolution strategy is ReturnOnAnyError or ReturnOnFirstResult, directly return if any endpoint failed
1126            if (options.resolution_strategy
1127                == ResponseResolutionStrategy::ReturnOnAnyError
1128                || options.resolution_strategy
1129                    == ResponseResolutionStrategy::ReturnOnFirstResult)
1130                && !failed_endpoints.is_empty()
1131            {
1132                // for each failed endpoint, set NotReachable error, for all others EarlyAbort
1133                return receivers
1134                    .iter()
1135                    .map(|receiver| {
1136                        if failed_endpoints.contains(receiver) {
1137                            Err(ResponseError::NotReachable(receiver.clone()))
1138                        } else {
1139                            Err(ResponseError::EarlyAbort(receiver.clone()))
1140                        }
1141                    })
1142                    .collect::<Vec<_>>();
1143            }
1144
1145            // store received responses in map for all receivers
1146            let mut responses = HashMap::new();
1147            let mut missing_response_count = receivers.len();
1148            for receiver in &receivers {
1149                responses.insert(
1150                    receiver.clone(),
1151                    if failed_endpoints.contains(receiver) {
1152                        Err(ResponseError::NotReachable(receiver.clone()))
1153                    } else {
1154                        Err(ResponseError::NoResponseAfterTimeout(
1155                            receiver.clone(),
1156                            timeout,
1157                        ))
1158                    },
1159                );
1160            }
1161            // directly subtract number of already failed endpoints from missing responses
1162            missing_response_count -= failed_endpoints.len();
1163
1164            info!(
1165                "Waiting for responses from receivers {}",
1166                receivers
1167                    .iter()
1168                    .map(|e| e.to_string())
1169                    .collect::<Vec<_>>()
1170                    .join(",")
1171            );
1172
1173            let res = task::timeout(timeout, async {
1174                while let Some(section) = rx.next().await {
1175                    let mut received_response = false;
1176                    // get sender
1177                    let mut sender = section.get_sender();
1178                    // add to response for exactly matching endpoint instance
1179                    if let Some(response) = responses.get_mut(&sender) {
1180                        // check if the receiver is already set (= current set response is Err)
1181                        if response.is_err() {
1182                            *response = Ok(Response::ExactResponse(sender.clone(), section));
1183                            missing_response_count -= 1;
1184                            info!("Received expected response from {sender}");
1185                            received_response = true;
1186                        }
1187                        // already received a response from this exact sender - this should not happen
1188                        else {
1189                            error!("Received multiple responses from the same sender: {sender}");
1190                        }
1191                    }
1192                    // add to response for matching endpoint
1193                    else if let Some(matches_endpoint) = self.try_match_sender(&mut responses, &sender) {
1194                        let response = responses.get_mut(&matches_endpoint).unwrap();
1195                        info!("Received resolved response from {} -> {}", &sender, &sender.any_instance_endpoint());
1196                        sender = sender.any_instance_endpoint();
1197                        // check if the receiver is already set (= current set response is Err)
1198                        if response.is_err() {
1199                            *response = Ok(Response::ResolvedResponse(sender.clone(), section));
1200                            missing_response_count -= 1;
1201                            received_response = true;
1202                        }
1203                        // already received a response from a matching endpoint - ignore
1204                        else {
1205                            info!("Received multiple resolved responses from the {}", &sender);
1206                        }
1207                    }
1208                    // response from unexpected sender
1209                    else {
1210                        error!("Received response from unexpected sender: {}", &sender);
1211                    }
1212
1213                    // if resolution strategy is ReturnOnFirstResult, break if any response is received
1214                    if received_response && options.resolution_strategy == ResponseResolutionStrategy::ReturnOnFirstResult {
1215                        // set all other responses to EarlyAbort
1216                        for (receiver, response) in responses.iter_mut() {
1217                            if receiver != &sender {
1218                                *response = Err(ResponseError::EarlyAbort(receiver.clone()));
1219                            }
1220                        }
1221                        break;
1222                    }
1223
1224                    // if all responses are received, break
1225                    if missing_response_count == 0 {
1226                        break;
1227                    }
1228                }
1229            }).await;
1230
1231            if res.is_err() {
1232                error!("Timeout waiting for responses");
1233            }
1234
1235            // return responses as vector
1236            responses.into_values().collect::<Vec<_>>()
1237        }
1238        // return all received responses
1239        else {
1240            let mut responses = vec![];
1241
1242            let res = task::timeout(timeout, async {
1243                let mut rx =
1244                    self.block_handler.register_incoming_block_observer(
1245                        context_id,
1246                        section_index,
1247                    );
1248                while let Some(section) = rx.next().await {
1249                    // get sender
1250                    let sender = section.get_sender();
1251                    info!("Received response from {sender}");
1252                    // add to response for exactly matching endpoint instance
1253                    responses.push(Ok(Response::UnspecifiedResponse(section)));
1254
1255                    // if resolution strategy is ReturnOnFirstResult, break if any response is received
1256                    if options.resolution_strategy
1257                        == ResponseResolutionStrategy::ReturnOnFirstResult
1258                    {
1259                        break;
1260                    }
1261                }
1262            })
1263            .await;
1264
1265            if res.is_err() {
1266                info!("Timeout waiting for responses");
1267            }
1268
1269            responses
1270        }
1271    }
1272
1273    /// Tries to match the sender endpoint to a more generic endpoint in the responses map (e.g., @jonas/0001 -> @jonas or @@local/0001 -> @xyz) and returns the matching endpoint if found.
1274    fn try_match_sender(
1275        &self,
1276        responses: &mut HashMap<Endpoint, Result<Response, ResponseError>>,
1277        sender: &Endpoint,
1278    ) -> Option<Endpoint> {
1279        let matches = gen {
1280            // match sender but with any wildcard instance
1281            yield sender.any_instance_endpoint();
1282            // match @@local if endpoint is local endpoint
1283            if self.is_local_endpoint_exact(sender) {
1284                yield Endpoint::LOCAL;
1285                yield Endpoint::LOCAL_ALL_INSTANCES;
1286            }
1287        }
1288        .collect::<Vec<Endpoint>>();
1289        for try_match_sender in matches {
1290            let res = responses.get(&try_match_sender);
1291            if let Some(_response) = res {
1292                return Some(try_match_sender);
1293            }
1294        }
1295        None
1296    }
1297
1298    /// Sends a block to all endpoints specified in the block header.
1299    /// Awaits the result if any block was sent via an async interface.
1300    /// See `send_block` for details.
1301    pub async fn send_block_async(
1302        &self,
1303        block: DXBBlock,
1304        exclude_sockets: Vec<ComInterfaceSocketUUID>,
1305        forked: bool,
1306    ) -> Result<(), Vec<Endpoint>> {
1307        match self.send_block(block, exclude_sockets, forked) {
1308            SyncOrAsyncResult::Sync(res) => {
1309                // TODO #735: handle received blocks
1310                res.map(|_| ())
1311            }
1312            SyncOrAsyncResult::Async(fut) => fut.await,
1313        }
1314    }
1315
1316    /// Sends a block to all endpoints specified in the block header.
1317    /// The routing algorithm decides which sockets are used to send the block, based on the endpoint.
1318    /// A block can be sent to multiple endpoints at the same time over a socket or to multiple sockets for each endpoint.
1319    /// The original_socket parameter is used to prevent sending the block back to the sender.
1320    /// When this method is called, the block is queued in the send queue.
1321    /// Returns a SyncOrAsyncResult:
1322    ///  - if all blocks were sent via sync interfaces, returns Sync with Ok containing an optional vector of received blocks (if any), or Err with a list of unreachable endpoints
1323    ///  - if any block was sent via an async interface, returns Async with a Future that resolves to Ok(()) or Err with a list of unreachable endpoints
1324    pub fn send_block(
1325        &self,
1326        mut block: DXBBlock,
1327        exclude_sockets: Vec<ComInterfaceSocketUUID>,
1328        forked: bool,
1329    ) -> BlockSendSyncOrAsyncResult<
1330        impl Future<Output = Result<(), Vec<Endpoint>>>,
1331    > {
1332        let outbound_receiver_groups =
1333            self.socket_manager.get_outbound_receiver_groups(
1334                &self.endpoint,
1335                &block.receiver_endpoints(),
1336                exclude_sockets,
1337            );
1338
1339        if outbound_receiver_groups.is_none() {
1340            error!("No outbound receiver groups found for block");
1341            return SyncOrAsyncResult::Sync(Err(vec![]));
1342        }
1343
1344        let outbound_receiver_groups = outbound_receiver_groups.unwrap();
1345
1346        let mut unreachable_endpoints = vec![];
1347
1348        // currently only used for trace debugging (TODO: put behind debug flag)
1349        // if more than one addressed block is sent, the block is forked, thus the fork count is set to 0
1350        // for each forked block, the fork count is incremented
1351        // if only one block is sent, the block is just moved and not forked
1352        let mut fork_count = if forked || outbound_receiver_groups.len() > 1 {
1353            Some(0)
1354        } else {
1355            None
1356        };
1357
1358        block.set_bounce_back(false);
1359
1360        let mut results = Vec::new();
1361
1362        for (receiver_socket, endpoints) in outbound_receiver_groups {
1363            if let Some(socket_uuid) = receiver_socket {
1364                results.push((
1365                    endpoints.clone(),
1366                    self.send_block_to_endpoints_via_socket(
1367                        block.clone(),
1368                        socket_uuid,
1369                        endpoints,
1370                        fork_count,
1371                    ),
1372                ));
1373            } else {
1374                error!(
1375                    "{}: cannot send block, no receiver sockets found for endpoints {:?}",
1376                    self.endpoint,
1377                    endpoints.iter().map(|e| e.to_string()).collect::<Vec<_>>()
1378                );
1379                unreachable_endpoints.extend(endpoints);
1380            }
1381            // increment fork_count if Some
1382            if let Some(count) = fork_count {
1383                fork_count = Some(count + 1);
1384            }
1385        }
1386
1387        // return error if any unreachable endpoints
1388        if !unreachable_endpoints.is_empty() {
1389            return SyncOrAsyncResult::Sync(Err(unreachable_endpoints));
1390        }
1391
1392        // if all results are sync, return sync
1393        if results
1394            .iter()
1395            .all(|(_, res)| matches!(res, SyncOrAsync::Sync(_)))
1396        {
1397            let mut received_blocks = Vec::new();
1398            for (endpoints, res) in results {
1399                match res {
1400                    SyncOrAsync::Sync(r) => {
1401                        match r {
1402                            Ok(Some(data)) => {
1403                                received_blocks.push(data); // TODO #736: already DXBBlocks here?
1404                            }
1405                            Ok(None) => { /* no data */ }
1406                            Err(_) => {
1407                                unreachable_endpoints.extend(endpoints);
1408                            }
1409                        }
1410                    }
1411                    _ => unreachable!(),
1412                }
1413            }
1414            if !unreachable_endpoints.is_empty() {
1415                SyncOrAsyncResult::Sync(Err(unreachable_endpoints))
1416            } else {
1417                SyncOrAsyncResult::Sync(Ok(Some(received_blocks)))
1418            }
1419        }
1420        // otherwise return async
1421        else {
1422            SyncOrAsyncResult::Async(async move {
1423                let futures =
1424                    results.into_iter().map(|(endpoints, res)| async move {
1425                        match res {
1426                            SyncOrAsync::Sync(r) => {
1427                                // TODO #737 directly process received blocks
1428                                r.map(|_data| ()).map_err(|_| endpoints)
1429                            }
1430                            SyncOrAsync::Async(fut) => {
1431                                fut.await.map_err(|_| endpoints)
1432                            }
1433                        }
1434                    });
1435
1436                let res = futures::future::join_all(futures).await;
1437                // merge all unreachable endpoints and return err if any
1438                let all_unreachable_endpoints = res
1439                    .into_iter()
1440                    .filter_map(|r| r.err())
1441                    .flatten()
1442                    .collect::<Vec<_>>();
1443                if !all_unreachable_endpoints.is_empty() {
1444                    Err(all_unreachable_endpoints)
1445                } else {
1446                    Ok(())
1447                }
1448            })
1449        }
1450    }
1451
1452    /// Sends a block via a socket to a list of endpoints.
1453    /// Before the block is sent, it is modified to include the list of endpoints as receivers.
1454    fn send_block_to_endpoints_via_socket(
1455        &self,
1456        mut block: DXBBlock,
1457        socket_uuid: ComInterfaceSocketUUID,
1458        endpoints: Vec<Endpoint>,
1459        // currently only used for trace debugging (TODO: put behind debug flag)
1460        fork_count: Option<usize>,
1461    ) -> SyncOrAsyncResult<
1462        Option<Vec<u8>>,
1463        (),
1464        SendFailure,
1465        impl Future<Output = Result<(), SendFailure>>,
1466    > {
1467        block.set_receivers(&endpoints);
1468
1469        let socket_data =
1470            match self.socket_manager.get_socket_by_uuid(&socket_uuid) {
1471                Some(socket_data) => socket_data,
1472                None => {
1473                    return SyncOrAsyncResult::Sync(Err(SendFailure(
1474                        Box::new(block),
1475                    )));
1476                }
1477            };
1478
1479        // assuming the distance was already increment during redirect, we
1480        // effectively decrement the block distance by 1 if it is a bounce back
1481        if block.is_bounce_back() {
1482            block.routing_header.distance -= 2;
1483        }
1484
1485        // if type is Trace or TraceBack, add the outgoing socket to the hops
1486        // NOTE: the block body is modified here for trace blocks, which would invalidate any signature
1487        // generated for the block. However, trace blocks are only used for debugging purposes and not expected to be signed.
1488        match block.block_header.flags_and_timestamp.block_type() {
1489            BlockType::Trace | BlockType::TraceBack => {
1490                let distance = block.routing_header.distance;
1491                let new_fork_nr = self.calculate_fork_nr(&block, fork_count);
1492                let bounce_back = block.is_bounce_back();
1493
1494                self.add_hop_to_block_trace_data(
1495                    &mut block,
1496                    NetworkTraceHop {
1497                        endpoint: self.endpoint.clone(),
1498                        distance,
1499                        socket: NetworkTraceHopSocket::new(
1500                            &socket_data.interface_properties,
1501                            socket_uuid.clone(),
1502                        ),
1503                        direction: NetworkTraceHopDirection::Outgoing,
1504                        fork_nr: new_fork_nr,
1505                        bounce_back,
1506                    },
1507                );
1508            }
1509            _ => {}
1510        }
1511
1512        let is_broadcast = endpoints
1513            .iter()
1514            .any(|e| e == &Endpoint::ANY_ALL_INSTANCES || e == &Endpoint::ANY);
1515
1516        // Break loop and don't relay broadcast blocks back to socket with direct endpoint set to self
1517        if is_broadcast
1518            && let Some(direct_endpoint) =
1519                &socket_data.socket_properties.direct_endpoint
1520            && self.is_local_endpoint_exact(direct_endpoint)
1521        {
1522            return SyncOrAsyncResult::Sync(Ok(None));
1523        }
1524        for interceptor in self.outgoing_block_interceptors.borrow().iter() {
1525            interceptor(&block, &socket_uuid, &endpoints);
1526        }
1527        info!(
1528            "Sending block to socket {}: {}",
1529            socket_uuid,
1530            endpoints.iter().map(|e| e.to_string()).join(", ")
1531        );
1532
1533        // TODO #190: resend block if socket failed to send
1534        if let Some(send_callback) = socket_data.send_callback.clone() {
1535            match send_callback {
1536                SendCallback::Sync(callback)
1537                | SendCallback::SyncOnce(callback) => SyncOrAsyncResult::Sync(
1538                    callback(block).map(|send_success| match send_success {
1539                        SendSuccess::SentWithNewIncomingData(data) => {
1540                            Some(data)
1541                        }
1542                        _ => None,
1543                    }),
1544                ),
1545                SendCallback::Async(callback) => {
1546                    SyncOrAsyncResult::Async(async move {
1547                        callback.call(block).await.map(|_| ())
1548                    })
1549                }
1550            }
1551        } else {
1552            panic!("No send callback registered for socket {}", socket_uuid);
1553        }
1554    }
1555
1556    /// Sends a hello block via the specified socket.
1557    /// Returns Ok(()) if the block was sent successfully, or Err(SendFailure) if sending failed.
1558    pub async fn send_hello_block(
1559        &self,
1560        socket_uuid: ComInterfaceSocketUUID,
1561    ) -> Result<(), SendFailure> {
1562        let mut block: DXBBlock = DXBBlock::default();
1563        block
1564            .block_header
1565            .flags_and_timestamp
1566            .set_block_type(BlockType::Hello);
1567        // TODO #182 include fingerprint of the own public key into body
1568
1569        let block = self
1570            .prepare_own_block(block)
1571            .into_result()
1572            .await
1573            .unwrap_or_else(|e| {
1574                panic!("Error preparing own block for sending: {:?}", e)
1575            });
1576
1577        match self
1578            .send_block_to_endpoints_via_socket(
1579                block,
1580                socket_uuid.clone(),
1581                vec![Endpoint::ANY],
1582                None,
1583            )
1584            .into_future()
1585            .await
1586        {
1587            SyncOrAsyncResolved::Sync(r) => r.map(|_| ()),
1588            SyncOrAsyncResolved::Async(fut) => fut,
1589        }
1590    }
1591
1592    pub fn clear_endpoint_blacklist(&self) {
1593        self.socket_manager
1594            .endpoint_sockets_blacklist
1595            .borrow_mut()
1596            .clear();
1597    }
1598
1599    pub fn interfaces_manager(&self) -> &ComInterfaceManager {
1600        &self.interfaces_manager
1601    }
1602
1603    pub fn socket_manager(&self) -> &ComInterfaceSocketManager {
1604        &self.socket_manager
1605    }
1606}
1607
1608#[cfg(test)]
1609#[cfg(feature = "crypto_enabled")]
1610pub mod tests {
1611    use crate::{
1612        channel::mpsc::{
1613            UnboundedReceiver, UnboundedSender, create_unbounded_channel,
1614        },
1615        global::{
1616            dxb_block::{DXBBlock, IncomingSection},
1617            protocol_structures::{
1618                block_header::{BlockHeader, FlagsAndTimestamp},
1619                routing_header::RoutingHeader,
1620            },
1621        },
1622        network::{
1623            com_hub::{
1624                ComHub, InterfacePriority,
1625                errors::ComInterfaceCreateError,
1626                managers::com_interface_manager::{
1627                    ComInterfaceAsyncFactoryResult, ComInterfaceManager,
1628                },
1629                test_utils::{
1630                    TEST_ENDPOINT_A, TEST_ENDPOINT_B, TEST_ENDPOINT_C,
1631                    get_endpoints_from_com_hub_metadata,
1632                    run_with_coupled_com_hubs,
1633                },
1634            },
1635            com_interfaces::com_interface::{
1636                factory::{
1637                    ComInterfaceAsyncFactory, ComInterfaceConfiguration,
1638                    ComInterfaceSyncFactory, SendCallback, SendSuccess,
1639                    SocketConfiguration, SocketProperties,
1640                },
1641                properties::{ComInterfaceProperties, InterfaceDirection},
1642            },
1643        },
1644        prelude::*,
1645        std_sync::Mutex,
1646        values::core_values::endpoint::Endpoint,
1647    };
1648    use alloc::rc::Rc;
1649    use async_select::select;
1650    use futures_util::FutureExt;
1651    use log::info;
1652    use serde::Deserialize;
1653    use tokio::task::yield_now;
1654
1655    /// Creates a mock ComHub for testing without a connected channel
1656    async fn run_with_com_hub<AppReturn, AppFuture>(
1657        app_logic: impl FnOnce(
1658            Rc<ComHub>,
1659            UnboundedReceiver<IncomingSection>,
1660        ) -> AppFuture,
1661    ) -> AppReturn
1662    where
1663        AppFuture: Future<Output = AppReturn>,
1664    {
1665        let (sender, receiver) = create_unbounded_channel();
1666        let (com_hub, com_hub_future) =
1667            ComHub::create(TEST_ENDPOINT_A.clone(), sender);
1668        select! {
1669            app_result = app_logic(com_hub, receiver).fuse() => app_result,
1670            _ = com_hub_future.fuse() => panic!("ComHub future should not complete during the test"),
1671        }
1672    }
1673
1674    async fn add_proxy_interface_to_com_hub(
1675        com_hub: Rc<ComHub>,
1676        endpoint: Endpoint,
1677    ) -> (UnboundedSender<Vec<u8>>, UnboundedReceiver<DXBBlock>) {
1678        let (outgoing_block_sender, outgoing_block_receiver) =
1679            create_unbounded_channel();
1680        let (incoming_data_sender, mut incoming_data_receiver) =
1681            create_unbounded_channel::<Vec<u8>>();
1682        let outgoing_block_sender = Rc::new(Mutex::new(outgoing_block_sender));
1683
1684        let proxy_interface_configuration =
1685            ComInterfaceConfiguration::new_single_socket(
1686                ComInterfaceProperties {
1687                    interface_type: "proxy".to_string(),
1688                    channel: "proxy".to_string(),
1689                    name: Some("proxy".to_string()),
1690                    ..Default::default()
1691                },
1692                SocketConfiguration::new_in_out(
1693                    SocketProperties::new_with_direct_endpoint(
1694                        InterfaceDirection::InOut,
1695                        1,
1696                        endpoint,
1697                    ),
1698                    async gen move {
1699                        while let Some(block) =
1700                            incoming_data_receiver.next().await
1701                        {
1702                            yield Ok(block)
1703                        }
1704                    },
1705                    SendCallback::new_sync(move |block| {
1706                        outgoing_block_sender
1707                            .try_lock()
1708                            .unwrap()
1709                            .start_send(block)
1710                            .unwrap();
1711                        Ok(SendSuccess::Sent)
1712                    }),
1713                ),
1714            );
1715        com_hub
1716            .clone()
1717            .add_interface_from_configuration(
1718                proxy_interface_configuration,
1719                InterfacePriority::None,
1720            )
1721            .unwrap()
1722            .unwrap()
1723            .await
1724            .unwrap();
1725
1726        (incoming_data_sender, outgoing_block_receiver)
1727    }
1728
1729    async fn run_with_com_hub_and_proxy_interface<AppReturn, AppFuture>(
1730        app_logic: impl FnOnce(
1731            Rc<ComHub>,
1732            UnboundedSender<Vec<u8>>,
1733            UnboundedReceiver<DXBBlock>,
1734            UnboundedReceiver<IncomingSection>,
1735        ) -> AppFuture,
1736    ) -> AppReturn
1737    where
1738        AppFuture: Future<Output = AppReturn>,
1739    {
1740        run_with_com_hub(|com_hub, incoming_sections_receiver| async move {
1741            let (incoming_data_sender, outgoing_block_receiver) =
1742                add_proxy_interface_to_com_hub(
1743                    com_hub.clone(),
1744                    TEST_ENDPOINT_B.clone(),
1745                )
1746                .await;
1747            app_logic(
1748                com_hub,
1749                incoming_data_sender,
1750                outgoing_block_receiver,
1751                incoming_sections_receiver,
1752            )
1753            .await
1754        })
1755        .await
1756    }
1757
1758    async fn send_blocks_to_endpoint(
1759        com_hub: Rc<ComHub>,
1760        incoming_data_sender: &mut UnboundedSender<Vec<u8>>,
1761        endpoint: Endpoint,
1762        blocks: &mut Vec<DXBBlock>,
1763    ) {
1764        for mut block in blocks {
1765            block.set_receivers(vec![endpoint.clone()]);
1766
1767            *block = com_hub
1768                .prepare_own_block(block.clone())
1769                .into_result()
1770                .await
1771                .unwrap();
1772
1773            let block_bytes = block.to_bytes();
1774            incoming_data_sender
1775                .start_send(block_bytes.as_slice().to_vec())
1776                .unwrap();
1777        }
1778    }
1779
1780    #[derive(Clone, Copy, PartialEq, Eq, Debug, Default)]
1781    pub enum CollectedBlockType {
1782        #[default]
1783        All,
1784        SingleBocks,
1785        BlockStream,
1786    }
1787
1788    impl CollectedBlockType {
1789        pub fn matches_section(&self, section: &IncomingSection) -> bool {
1790            match self {
1791                CollectedBlockType::SingleBocks => {
1792                    matches!(section, IncomingSection::SingleBlock(_))
1793                }
1794                CollectedBlockType::BlockStream => {
1795                    matches!(section, IncomingSection::BlockStream(_))
1796                }
1797                CollectedBlockType::All => true,
1798            }
1799        }
1800    }
1801
1802    pub async fn get_collected_received_blocks_from_receiver(
1803        sections_receiver: &mut UnboundedReceiver<IncomingSection>,
1804        collected_type: CollectedBlockType,
1805        count: usize,
1806    ) -> Vec<DXBBlock> {
1807        let mut blocks = vec![];
1808
1809        let mut received_count = 0;
1810
1811        while let Some(section) = sections_receiver.next().await {
1812            if !collected_type.matches_section(&section) {
1813                panic!(
1814                    "Received section does not match collected block type {:?}",
1815                    collected_type
1816                );
1817            }
1818
1819            match section {
1820                IncomingSection::SingleBlock((Some(block), ..)) => {
1821                    blocks.push(block.clone());
1822                    received_count += 1;
1823                    info!("Received single block");
1824                }
1825                IncomingSection::BlockStream((Some(mut block_stream), ..)) => {
1826                    info!("[START] block stream");
1827                    while let Some(block) = block_stream.next().await {
1828                        received_count += 1;
1829                        blocks.push(block.clone());
1830                        info!("Received block from stream");
1831
1832                        if received_count >= count {
1833                            break;
1834                        }
1835                    }
1836                    info!("[END] receiving block stream");
1837                }
1838                _ => {
1839                    panic!("Received section does not contain a block");
1840                }
1841            }
1842
1843            if received_count >= count {
1844                break;
1845            }
1846        }
1847
1848        if blocks.len() != count {
1849            panic!(
1850                "Expected to receive {} blocks, but got {}",
1851                count,
1852                blocks.len()
1853            );
1854        }
1855
1856        blocks
1857    }
1858
1859    #[derive(Debug, Clone, Deserialize)]
1860    struct MockupInterfaceSetupData {
1861        pub name: String,
1862    }
1863    impl MockupInterfaceSetupData {
1864        pub fn new(name: &str) -> Self {
1865            Self {
1866                name: name.to_string(),
1867            }
1868        }
1869    }
1870
1871    impl ComInterfaceSyncFactory for MockupInterfaceSetupData {
1872        fn create_interface(
1873            self,
1874        ) -> Result<ComInterfaceConfiguration, ComInterfaceCreateError>
1875        {
1876            Ok(ComInterfaceConfiguration::new_single_socket(
1877                ComInterfaceProperties::default(),
1878                SocketConfiguration::new_in_out(
1879                    SocketProperties::new(InterfaceDirection::InOut, 1),
1880                    async gen move {
1881                        loop {
1882                            yield Ok(vec![]);
1883                        }
1884                    },
1885                    SendCallback::new_sync(|_| Ok(SendSuccess::Sent)),
1886                ),
1887            ))
1888        }
1889
1890        fn get_default_properties() -> ComInterfaceProperties {
1891            ComInterfaceProperties {
1892                name: Some("mockup".to_string()),
1893                ..Default::default()
1894            }
1895        }
1896    }
1897
1898    impl ComInterfaceAsyncFactory for MockupInterfaceSetupData {
1899        fn create_interface(self) -> ComInterfaceAsyncFactoryResult {
1900            Box::pin(
1901                async move { ComInterfaceSyncFactory::create_interface(self) },
1902            )
1903        }
1904        fn get_default_properties() -> ComInterfaceProperties {
1905            <MockupInterfaceSetupData as ComInterfaceSyncFactory>::get_default_properties()
1906        }
1907    }
1908
1909    #[tokio::test]
1910    pub async fn create_from_sync_factory() {
1911        run_with_com_hub(|com_hub, _| async move {
1912            let interface_configuration =
1913                ComInterfaceManager::create_interface_sync_from_setup_data(
1914                    MockupInterfaceSetupData::new("test"),
1915                )
1916                .unwrap();
1917            let uuid = interface_configuration.uuid().clone();
1918
1919            com_hub
1920                .clone()
1921                .add_interface_from_configuration(
1922                    interface_configuration,
1923                    InterfacePriority::default(),
1924                )
1925                .unwrap();
1926
1927            assert!(com_hub.remove_interface(uuid).await.is_ok());
1928        })
1929        .await;
1930    }
1931
1932    #[tokio::test]
1933    pub async fn create_from_async_factory() {
1934        run_with_com_hub(|com_hub, _| async move {
1935            let interface_configuration =
1936                ComInterfaceManager::create_interface_async_from_setup_data(
1937                    MockupInterfaceSetupData::new("test"),
1938                )
1939                .await
1940                .unwrap();
1941            let uuid = interface_configuration.uuid().clone();
1942
1943            com_hub
1944                .clone()
1945                .add_interface_from_configuration(
1946                    interface_configuration,
1947                    InterfacePriority::default(),
1948                )
1949                .unwrap();
1950
1951            assert!(com_hub.remove_interface(uuid).await.is_ok());
1952        })
1953        .await;
1954    }
1955
1956    #[tokio::test]
1957    async fn create_hello_connection() {
1958        run_with_coupled_com_hubs(async |a, b| {
1959            let com_hub_a_sockets =
1960                get_endpoints_from_com_hub_metadata(a.com_hub.get_metadata());
1961            assert!(
1962                com_hub_a_sockets
1963                    .contains(&(Some(TEST_ENDPOINT_B.clone()), Some(1)))
1964            );
1965
1966            let com_hub_b_sockets =
1967                get_endpoints_from_com_hub_metadata(b.com_hub.get_metadata());
1968            assert!(
1969                com_hub_b_sockets
1970                    .contains(&(Some(TEST_ENDPOINT_A.clone()), Some(1)))
1971            );
1972        })
1973        .await;
1974    }
1975
1976    #[tokio::test]
1977    pub async fn test_send() {
1978        run_with_com_hub_and_proxy_interface(
1979            async move |com_hub, _, mut outgoing_block_receiver, _| {
1980                // send block via com hub to proxy interface
1981                let mut block = DXBBlock::new_with_body(b"Hello world!");
1982                block.set_receivers(vec![TEST_ENDPOINT_B.clone()]);
1983                com_hub.send_own_block_async(block).await.unwrap();
1984
1985                // hello block, skip
1986                outgoing_block_receiver.next().await.unwrap();
1987
1988                // get next outgoing block that was sent via the proxy interface
1989                let outgoing_block =
1990                    outgoing_block_receiver.next().await.unwrap();
1991                assert_eq!(outgoing_block.body, b"Hello world!");
1992            },
1993        )
1994        .await;
1995    }
1996
1997    #[tokio::test]
1998    pub async fn test_send_between_com_hubs() {
1999        run_with_coupled_com_hubs(async |a, mut b| {
2000            // send block via com hub to proxy interface
2001            let mut block = DXBBlock::new_with_body(b"Hello world!");
2002            block.set_receivers(vec![TEST_ENDPOINT_B.clone()]);
2003            a.com_hub.send_own_block_async(block).await.unwrap();
2004
2005            // get received single block on com hub b
2006            let next_block = b.incoming_sections_receiver.next().await.unwrap();
2007            if let IncomingSection::SingleBlock((Some(block), _)) = next_block {
2008                assert_eq!(block.body, b"Hello world!");
2009            } else {
2010                panic!("Expected single block section");
2011            }
2012        })
2013        .await;
2014    }
2015
2016    #[tokio::test]
2017    pub async fn test_send_block_to_invalid_receiver() {
2018        run_with_com_hub_and_proxy_interface(async move |com_hub, _, _, _| {
2019            let mut block = DXBBlock::new_with_body(b"Hello world!");
2020            // cannot send to endpoint c, since only interface is not a fallback interface and only knows endpoint b
2021            block.set_receivers(vec![TEST_ENDPOINT_C.clone()]);
2022            let res = com_hub.send_own_block_async(block).await;
2023            assert!(res.is_err());
2024        })
2025        .await;
2026    }
2027
2028    #[tokio::test]
2029    pub async fn send_block_via_multiple_interfaces() {
2030        run_with_com_hub(|com_hub, _| async move {
2031            let (_sender_b, mut outgoing_block_receiver_b) =
2032                add_proxy_interface_to_com_hub(
2033                    com_hub.clone(),
2034                    TEST_ENDPOINT_B.clone(),
2035                )
2036                .await;
2037            let (_sender_c, mut outgoing_block_receiver_c) =
2038                add_proxy_interface_to_com_hub(
2039                    com_hub.clone(),
2040                    TEST_ENDPOINT_C.clone(),
2041                )
2042                .await;
2043            com_hub.print_metadata();
2044
2045            let mut block = DXBBlock::new_with_body(b"Hello world!");
2046            block.set_receivers(vec![
2047                TEST_ENDPOINT_B.clone(),
2048                TEST_ENDPOINT_C.clone(),
2049            ]);
2050            com_hub.send_own_block_async(block).await.unwrap();
2051
2052            // skip hello blocks
2053            outgoing_block_receiver_b.next().await.unwrap();
2054            outgoing_block_receiver_c.next().await.unwrap();
2055
2056            // block should be sent via both interfaces
2057            let outgoing_block_b =
2058                outgoing_block_receiver_b.next().await.unwrap();
2059            let outgoing_block_c =
2060                outgoing_block_receiver_c.next().await.unwrap();
2061
2062            info!("block sender b: {}", outgoing_block_b.sender());
2063            info!("block sender c: {}", outgoing_block_c.sender());
2064            assert_eq!(outgoing_block_b.body, b"Hello world!");
2065            assert_eq!(outgoing_block_c.body, b"Hello world!");
2066        })
2067        .await
2068    }
2069
2070    #[tokio::test]
2071    pub async fn test_receive() {
2072        flexi_logger::init();
2073        run_with_com_hub_and_proxy_interface(
2074            async move |com_hub,
2075                        mut incoming_data_sender,
2076                        _,
2077                        mut incoming_sections_receiver| {
2078                // create block and send it via the incoming data sender to the com hub
2079                let mut block = DXBBlock::new_with_body(b"Hello world!");
2080                block.set_receivers(vec![TEST_ENDPOINT_A.clone()]);
2081
2082                let block = com_hub
2083                    .prepare_own_block(block)
2084                    .into_result()
2085                    .await
2086                    .unwrap();
2087
2088                let block_bytes = block.to_bytes();
2089                incoming_data_sender
2090                    .start_send(block_bytes.as_slice().to_vec())
2091                    .unwrap();
2092
2093                let incoming_section =
2094                    incoming_sections_receiver.next().await.unwrap();
2095                if let IncomingSection::SingleBlock((Some(block), _)) =
2096                    incoming_section
2097                {
2098                    assert_eq!(block.raw_bytes.clone().unwrap(), block_bytes);
2099                } else {
2100                    panic!("Expected single block section");
2101                }
2102            },
2103        )
2104        .await;
2105    }
2106
2107    #[tokio::test]
2108    pub async fn test_receive_multiple_blocks_single_section() {
2109        run_with_com_hub_and_proxy_interface(
2110            async move |com_hub,
2111                        mut incoming_data_sender,
2112                        _,
2113                        mut incoming_sections_receiver| {
2114                let mut blocks = vec![
2115                    DXBBlock {
2116                        routing_header: RoutingHeader::default(),
2117                        block_header: BlockHeader {
2118                            section_index: 0,
2119                            block_number: 0,
2120                            flags_and_timestamp: FlagsAndTimestamp::new()
2121                                .with_is_end_of_section(false)
2122                                .with_is_end_of_context(false),
2123                            ..Default::default()
2124                        },
2125                        ..Default::default()
2126                    },
2127                    DXBBlock {
2128                        routing_header: RoutingHeader::default(),
2129                        block_header: BlockHeader {
2130                            section_index: 0,
2131                            block_number: 1,
2132                            flags_and_timestamp: FlagsAndTimestamp::new()
2133                                .with_is_end_of_section(false)
2134                                .with_is_end_of_context(false),
2135                            ..Default::default()
2136                        },
2137                        ..Default::default()
2138                    },
2139                    DXBBlock {
2140                        routing_header: RoutingHeader::default(),
2141                        block_header: BlockHeader {
2142                            section_index: 0,
2143                            block_number: 2,
2144                            flags_and_timestamp: FlagsAndTimestamp::new()
2145                                .with_is_end_of_section(true)
2146                                .with_is_end_of_context(true),
2147                            ..Default::default()
2148                        },
2149                        ..Default::default()
2150                    },
2151                ];
2152                let blocks_count = blocks.len();
2153
2154                send_blocks_to_endpoint(
2155                    com_hub.clone(),
2156                    &mut incoming_data_sender,
2157                    TEST_ENDPOINT_A.clone(),
2158                    &mut blocks,
2159                )
2160                .await;
2161
2162                let incoming_blocks =
2163                    get_collected_received_blocks_from_receiver(
2164                        &mut incoming_sections_receiver,
2165                        CollectedBlockType::BlockStream,
2166                        blocks_count,
2167                    )
2168                    .await;
2169
2170                for (incoming_block, block) in
2171                    incoming_blocks.iter().zip(blocks.iter())
2172                {
2173                    assert_eq!(
2174                        incoming_block.raw_bytes.clone().unwrap(),
2175                        block.to_bytes()
2176                    );
2177                }
2178            },
2179        )
2180        .await;
2181    }
2182
2183    #[tokio::test]
2184    pub async fn test_receive_multiple_separate_blocks() {
2185        run_with_com_hub_and_proxy_interface(
2186            async move |com_hub,
2187                        mut incoming_data_sender,
2188                        _,
2189                        mut incoming_sections_receiver| {
2190                let mut blocks = vec![
2191                    DXBBlock {
2192                        routing_header: RoutingHeader::default(),
2193                        block_header: BlockHeader {
2194                            section_index: 1,
2195                            block_number: 0,
2196                            ..Default::default()
2197                        },
2198                        ..Default::default()
2199                    },
2200                    DXBBlock {
2201                        routing_header: RoutingHeader::default(),
2202                        block_header: BlockHeader {
2203                            section_index: 2,
2204                            block_number: 0,
2205                            ..Default::default()
2206                        },
2207                        ..Default::default()
2208                    },
2209                    DXBBlock {
2210                        routing_header: RoutingHeader::default(),
2211                        block_header: BlockHeader {
2212                            section_index: 3,
2213                            block_number: 0,
2214                            ..Default::default()
2215                        },
2216                        ..Default::default()
2217                    },
2218                ];
2219                let blocks_count = blocks.len();
2220
2221                send_blocks_to_endpoint(
2222                    com_hub.clone(),
2223                    &mut incoming_data_sender,
2224                    TEST_ENDPOINT_A.clone(),
2225                    &mut blocks,
2226                )
2227                .await;
2228
2229                let incoming_blocks =
2230                    get_collected_received_blocks_from_receiver(
2231                        &mut incoming_sections_receiver,
2232                        CollectedBlockType::SingleBocks,
2233                        blocks_count,
2234                    )
2235                    .await;
2236
2237                for (incoming_block, block) in
2238                    incoming_blocks.iter().zip(blocks.iter())
2239                {
2240                    assert_eq!(
2241                        incoming_block.raw_bytes.clone().unwrap(),
2242                        block.to_bytes()
2243                    );
2244                }
2245            },
2246        )
2247        .await;
2248    }
2249
2250    // #[async_test]
2251    // pub async fn unencrypted_signature_prepare_block_com_hub() {
2252    //     let (com_hub, interface_proxy, mut incoming_sections_receiver) =
2253    //         get_default_mock_setup_with_com_hub().await;
2254    //
2255    //     // receive block
2256    //     let mut block = DXBBlock {
2257    //         body: vec![0x01, 0x02, 0x03],
2258    //         encrypted_header: EncryptedHeader {
2259    //             flags: encrypted_header::Flags::new()
2260    //                 .with_user_agent(encrypted_header::UserAgent::Unused11),
2261    //             ..Default::default()
2262    //         },
2263    //         ..DXBBlock::default()
2264    //     };
2265    //     block.set_receivers(vec![TEST_ENDPOINT_ORIGIN.clone()]);
2266    //     block.recalculate_struct();
2267    //     block
2268    //         .routing_header
2269    //         .flags
2270    //         .set_signature_type(SignatureType::Unencrypted);
2271    //     block = com_hub.prepare_own_block(block).await.unwrap();
2272    //     let block_bytes = block.to_bytes();
2273    //
2274    //     let (_, mut incoming_blocks_sender) =
2275    //         interface_proxy.create_and_init_socket(InterfaceDirection::In, 0);
2276    //     yield_now().await;
2277    //
2278    //     incoming_blocks_sender
2279    //         .start_send(block_bytes.as_slice().to_vec())
2280    //         .unwrap();
2281    //
2282    //     yield_now().await;
2283    //
2284    //     let last_block = get_next_received_single_block_from_receiver(
2285    //         &mut incoming_sections_receiver,
2286    //     )
2287    //     .await;
2288    //     assert_eq!(last_block.raw_bytes.clone().unwrap(), block_bytes);
2289    //     assert_eq!(block.signature, last_block.signature);
2290    //
2291    //     assert!(com_hub.validate_block(&last_block).await.unwrap());
2292    // }
2293    //
2294    // #[async_test]
2295    // pub async fn encrypted_signature_prepare_block_com_hub() {
2296    //     let (com_hub, interface_proxy, mut incoming_sections_receiver) =
2297    //         get_default_mock_setup_with_com_hub().await;
2298    //
2299    //     // receive block
2300    //     let mut block = DXBBlock {
2301    //         body: vec![0x01, 0x02, 0x03],
2302    //         encrypted_header: EncryptedHeader {
2303    //             flags: encrypted_header::Flags::new()
2304    //                 .with_user_agent(encrypted_header::UserAgent::Unused11),
2305    //             ..Default::default()
2306    //         },
2307    //         ..DXBBlock::default()
2308    //     };
2309    //
2310    //     block.set_receivers(vec![TEST_ENDPOINT_ORIGIN.clone()]);
2311    //     block.recalculate_struct();
2312    //
2313    //     block
2314    //         .routing_header
2315    //         .flags
2316    //         .set_signature_type(SignatureType::Encrypted);
2317    //     block = com_hub.prepare_own_block(block).await.unwrap();
2318    //     let block_bytes = block.to_bytes();
2319    //
2320    //     let (_, mut incoming_blocks_sender) =
2321    //         interface_proxy.create_and_init_socket(InterfaceDirection::In, 0);
2322    //     yield_now().await;
2323    //
2324    //     incoming_blocks_sender
2325    //         .start_send(block_bytes.as_slice().to_vec())
2326    //         .unwrap();
2327    //
2328    //     yield_now().await;
2329    //
2330    //     let last_block = get_next_received_single_block_from_receiver(
2331    //         &mut incoming_sections_receiver,
2332    //     )
2333    //     .await;
2334    //     assert_eq!(last_block.raw_bytes.clone().unwrap(), block_bytes);
2335    //     assert_eq!(block.signature, last_block.signature);
2336    //
2337    //     assert!(com_hub.validate_block(&last_block).await.unwrap());
2338    // }
2339    //
2340    //
2341    //
2342    // #[async_test]
2343    // pub async fn test_reconnect() {
2344    //     let com_hub = create_mock_com_hub();
2345    //
2346    //     // TODO #738: refactor using proxy
2347    //
2348    //     // create a new interface, open it and add it to the com_hub
2349    //     let (base_interface, interface_with_receivers) =
2350    //         ComInterfaceProxy::create_interface(ComInterfaceProperties::default());
2351    //
2352    //     // add base_interface to com_hub
2353    //     com_hub
2354    //         ._register_com_interface(
2355    //             interface_with_receivers,
2356    //             InterfacePriority::default(),
2357    //         )
2358    //         .unwrap();
2359    //
2360    //     // check that the interface is connected
2361    //     assert_eq!(
2362    //         base_interface.state.lock().unwrap().get(),
2363    //         ComInterfaceState::Connected
2364    //     );
2365    //
2366    //     // check that the interface is in the com_hub
2367    //     assert_eq!(com_hub.interface_manager().borrow().interfaces.len(), 2); // loopback + base_interface
2368    //     assert!(com_hub.has_interface(&base_interface.uuid));
2369    //
2370    //     // simulate a disconnection by closing the interface
2371    //     // This action is normally done by the interface itself
2372    //     // but we do it manually here to test the reconnection
2373    //     // TODO #739: reconnect
2374    //     // // check that the interface is not connected
2375    //     // // and that the close_timestamp is set
2376    //     // assert_eq!(
2377    //     //     base_interface.state.lock().unwrap().get(),
2378    //     //     ComInterfaceState::NotConnected
2379    //     // );
2380    //     //
2381    //     // assert!(
2382    //     //     base_interface
2383    //     //         .com_interface
2384    //     //         .properties()
2385    //     //         .close_timestamp
2386    //     //         .is_some()
2387    //     // );
2388    //     //
2389    //     // // the interface should not be reconnected yet
2390    //     // yield_now().await;
2391    //     //
2392    //     // assert_eq!(
2393    //     //     base_interface.com_interface.current_state(),
2394    //     //     ComInterfaceState::NotConnected
2395    //     // );
2396    //     //
2397    //     // // wait for the reconnection to happen
2398    //     // tokio::time::sleep(Duration::from_secs(1)).await;
2399    //     //
2400    //     // // check that the interface is connected again
2401    //     // // and that the close_timestamp is reset
2402    //     // yield_now().await;
2403    //     //
2404    //     // assert_eq!(
2405    //     //     base_interface.com_interface.current_state(),
2406    //     //     ComInterfaceState::Connected
2407    //     // );
2408    // }
2409}