1use crate::{
2 channel::mpsc::UnboundedSender,
3 collections::HashMap,
4 global::protocol_structures::{
5 block_header::BlockType, routing_header::SignatureType,
6 },
7 network::com_hub::{
8 errors::{ComHubError, SocketEndpointRegistrationError},
9 managers::com_interface_manager::ComInterfaceManager,
10 network_response::{
11 Response, ResponseError, ResponseOptions,
12 ResponseResolutionStrategy,
13 },
14 options::ComHubOptions,
15 },
16 task,
17 utils::maybe_async::SyncOrAsyncResolved,
18};
19
20use crate::prelude::*;
21
22pub mod managers;
23
24pub mod metadata;
25use crate::network::com_hub::managers::socket_manager::{
26 ComInterfaceSocketManager, SocketCloseReceiver, SocketData,
27};
28
29pub mod errors;
30pub mod network_response;
31
32pub mod network_tracing;
33use crate::network::com_interfaces::com_interface::socket::ComInterfaceSocketUUID;
34use core::{
35 cell::RefCell,
36 cmp::PartialEq,
37 fmt::{Debug, Formatter},
38 panic,
39 pin::Pin,
40 result::Result,
41};
42use itertools::Itertools;
43use log::{debug, error, info, warn};
44use serde::{Deserialize, Serialize};
45
46pub mod options;
47use crate::{
48 global::dxb_block::{DXBBlock, IncomingSection},
49 network::{
50 block_handler::{BlockHandler, BlockHistoryData},
51 com_hub::network_tracing::{
52 NetworkTraceHop, NetworkTraceHopDirection, NetworkTraceHopSocket,
53 },
54 },
55 values::core_values::endpoint::Endpoint,
56};
57pub mod com_hub_interface;
58#[cfg(test)]
59pub mod test_utils;
60mod com_hub_socket;
61
62use crate::{
63 collections::HashSet,
64 crypto::CryptoImpl,
65 global::dxb_block::{BlockId, SignatureValidationError},
66 network::{
67 com_hub::managers::com_interface_manager::InterfaceCloseReceiver,
68 com_interfaces::{
69 block_collector::BlockCollector,
70 com_interface::{
71 ComInterfaceUUID,
72 factory::{
73 CloseAsyncCallback, ComInterfaceConfiguration,
74 SendCallback, SendFailure, SendSuccess, SocketDataIterator,
75 SocketProperties,
76 },
77 properties::InterfaceDirection,
78 },
79 },
80 },
81 utils::{
82 async_iterators::async_next_pin_box,
83 maybe_async::{MaybeAsync, SyncOrAsync, SyncOrAsyncResult},
84 task_manager::TaskManager,
85 },
86};
87use async_select::select;
88use datex_crypto_facade::crypto::Crypto;
89use futures::channel::{oneshot, oneshot::Sender};
90use futures_util::FutureExt;
91use crate::time::now_ms;
92
93pub type IncomingBlockInterceptor =
94 Box<dyn Fn(&DXBBlock, &ComInterfaceSocketUUID) + 'static>;
95
96pub type OutgoingBlockInterceptor =
97 Box<dyn Fn(&DXBBlock, &ComInterfaceSocketUUID, &[Endpoint]) + 'static>;
98
99pub struct ComHub {
100 pub endpoint: Endpoint,
102
103 pub options: ComHubOptions,
105
106 socket_manager: ComInterfaceSocketManager,
107 interfaces_manager: ComInterfaceManager,
108
109 pub block_handler: BlockHandler,
110
111 incoming_block_interceptors: RefCell<Vec<IncomingBlockInterceptor>>,
112 outgoing_block_interceptors: RefCell<Vec<OutgoingBlockInterceptor>>,
113
114 pub task_manager: TaskManager,
115}
116
117impl Debug for ComHub {
118 fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
119 f.debug_struct("ComHub")
120 .field("endpoint", &self.endpoint)
121 .field("options", &self.options)
122 .finish()
123 }
124}
125
126#[derive(
127 Debug, Clone, Copy, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize,
128)]
129#[cfg_attr(feature = "wasm_runtime", derive(tsify::Tsify))]
130pub enum InterfacePriority {
131 None,
135 Priority(u16),
139}
140
141impl From<Option<u16>> for InterfacePriority {
142 fn from(value: Option<u16>) -> Self {
143 match value {
144 Some(priority) => InterfacePriority::Priority(priority),
145 None => InterfacePriority::default(),
146 }
147 }
148}
149
150impl Default for InterfacePriority {
151 fn default() -> Self {
152 InterfacePriority::Priority(0)
153 }
154}
155
156pub struct ReceiveBlockPreprocessResult {
157 relayed_block: Option<DXBBlock>,
158 own_received_block: Option<DXBBlock>,
159 block_id_for_history: Option<BlockId>,
160 is_for_own: bool,
161}
162
163pub type BlockSendSyncOrAsyncResult<F> =
164 SyncOrAsyncResult<Option<Vec<Vec<u8>>>, (), Vec<Endpoint>, F>;
165
166pub type PrepareOwnBlockFuture<'a> =
167 Pin<Box<dyn Future<Output = Result<DXBBlock, ComHubError>> + 'a>>;
168
169pub type PrepareOwnBlockResult<'a> = SyncOrAsyncResult<
170 DXBBlock,
171 DXBBlock,
172 ComHubError,
173 PrepareOwnBlockFuture<'a>,
174>;
175
176pub type ReceiveBlockResult =
177 Result<Option<DXBBlock>, SignatureValidationError>;
178
179impl ComHub {
180 pub fn create(
181 endpoint: impl Into<Endpoint>,
182 incoming_sections_sender: UnboundedSender<IncomingSection>,
183 ) -> (Rc<ComHub>, impl Future<Output = ()>) {
184 let (task_manager, task_future) = TaskManager::create();
185
186 let block_handler = BlockHandler::init(incoming_sections_sender);
187 let com_hub = Rc::new(ComHub {
188 endpoint: endpoint.into(),
189 options: ComHubOptions::default(),
190 block_handler,
191 socket_manager: ComInterfaceSocketManager::new(),
192 interfaces_manager: ComInterfaceManager::default(),
193 incoming_block_interceptors: RefCell::new(Vec::new()),
194 outgoing_block_interceptors: RefCell::new(Vec::new()),
195 task_manager,
196 });
197
198 (com_hub, task_future)
199 }
200
201 pub(crate) fn register_com_interface_handler(
203 self: Rc<Self>,
204 com_interface_configuration: ComInterfaceConfiguration,
205 priority: InterfacePriority,
206 close_receiver: InterfaceCloseReceiver,
207 ) -> Option<impl Future<Output = Result<(), ()>>> {
208 let (socket_ready_senders, socket_ready_fut) =
209 if com_interface_configuration.has_single_socket {
210 let (r, f) = ComHub::get_ready_senders();
211 (Some(r), Some(f))
212 } else {
213 (None, None)
214 };
215
216 self.task_manager
217 .register_task(self.clone().handle_sockets_task(
218 com_interface_configuration,
219 priority,
220 close_receiver,
221 socket_ready_senders,
222 ));
223 socket_ready_fut
224 }
225
226 fn get_ready_senders() -> (
231 (Sender<Result<(), ()>>, Sender<Result<(), ()>>),
232 impl Future<Output = Result<(), ()>>,
233 ) {
234 let (socket_ready_sender, socket_ready_receiver) = oneshot::channel();
235 let (socket_hello_sent_sender, socket_hello_sent_receiver) =
236 oneshot::channel();
237 let fut = async move {
238 match socket_ready_receiver.await {
239 Ok(Ok(())) => match socket_hello_sent_receiver.await {
240 Ok(Ok(())) => Ok(()),
241 Ok(Err(())) => Err(()),
242 Err(_) => Err(()),
243 },
244 Ok(Err(())) => Err(()),
245 Err(_) => Err(()),
246 }
247 };
248 ((socket_ready_sender, socket_hello_sent_sender), fut)
249 }
250
251 async fn handle_sockets_task(
253 self: Rc<Self>,
254 com_interface_configuration: ComInterfaceConfiguration,
255 interface_priority: InterfacePriority,
256 interface_close_receiver: InterfaceCloseReceiver,
257 ready_senders: Option<(Sender<Result<(), ()>>, Sender<Result<(), ()>>)>,
258 ) {
259 let com_interface_uuid = com_interface_configuration.uuid();
260 let mut iterator = com_interface_configuration.new_sockets_iterator;
261 let com_interface_properties =
262 com_interface_configuration.properties.clone();
263 let cleanup_callback = com_interface_configuration.close_async_callback;
264
265 let (mut socket_ready_sender, mut hello_sent_sender) =
266 match ready_senders {
267 Some((x, y)) => (Some(x), Some(y)),
268 None => (None, None),
269 };
270
271 let closed_sender = select!(
272 _ = async {
273 while let Some(socket) = async_next_pin_box(&mut iterator).await {
274
275 if !self.interfaces_manager.has_interface(&com_interface_uuid) {
276 info!("Interface {} was removed while waiting for new socket connections, stopping socket handler task", com_interface_uuid);
277 break;
278 }
279
280 match socket {
281 Ok(socket_configuration) => {
282 let socket_iterator = socket_configuration.iterator;
283 let send_callback = socket_configuration.send_callback;
284 let cleanup_callback = socket_configuration.close_async_callback;
285 let socket_properties = socket_configuration.properties;
286 let _socket_uuid = socket_properties.uuid();
287 let _socket_direction = socket_properties.direction.clone();
288 let (socket_close_sender, socket_close_receiver) = oneshot::channel();
289
290 let _res = self.socket_manager.register_socket(
292 SocketData {
293 socket_properties: socket_properties.clone(),
294 interface_uuid: com_interface_uuid.clone(),
295 interface_properties: com_interface_properties
296 .clone(),
297 send_callback,
298 endpoints: HashSet::new(),
299 close_sender: Some(socket_close_sender),
300 socket_ready_sender: socket_ready_sender.take(),
301 },
302 interface_priority,
303 );
304 let self_clone = self.clone();
307 if let Some(socket_iterator) = socket_iterator {
308 self_clone.task_manager.register_task(
309 self_clone.clone().handle_socket_task(
310 socket_properties,
311 socket_iterator,
312 com_interface_uuid.clone(),
313 com_interface_properties.auto_identify,
314 socket_close_receiver,
315 cleanup_callback,
316 hello_sent_sender.take()
317 ),
318 );
319 }
320 }
321 Err(e) => {
322 error!("Error creating socket from iterator: {:?}", e);
323 break;
324 }
325 }
326 }
327 } => {
328 None
329 },
330 sender = interface_close_receiver => {
331 Some(sender.unwrap())
332 }
333 );
334
335 if let Some(cleanup_callback) = cleanup_callback {
337 cleanup_callback().await;
338 }
339
340 if self.interfaces_manager.has_interface(&com_interface_uuid) {
342 self.interfaces_manager
344 .set_interface_waiting_for_socket_connections(
345 &com_interface_uuid,
346 false,
347 );
348
349 if !self
351 .socket_manager
352 .are_sockets_registered_for_interface(&com_interface_uuid)
353 {
354 self.interfaces_manager
355 .cleanup_interface(&com_interface_uuid)
356 .unwrap();
357 info!(
358 "Destroyed interface {} as it has no sockets registered",
359 com_interface_uuid
360 );
361 }
362 }
363
364 if let Some(closed_sender) = closed_sender {
365 closed_sender.send(()).unwrap();
366 }
367
368 if let Some(socket_ready_sender) = socket_ready_sender {
370 let _ = socket_ready_sender.send(Err(()));
371 }
372 if let Some(hello_sent_sender) = hello_sent_sender {
374 let _ = hello_sent_sender.send(Err(()));
375 }
376 }
377
378 async fn send_socket_hello(
380 self: Rc<Self>,
381 socket_uuid: ComInterfaceSocketUUID,
382 socket_direction: InterfaceDirection,
383 auto_identify: bool,
384 hello_sent_sender: Option<Sender<Result<(), ()>>>,
385 ) {
386 let send_hello = socket_direction.can_send() && auto_identify; if send_hello {
389 info!("Saying hello to {}", socket_uuid);
390 if let Err(err) = self.send_hello_block(socket_uuid).await {
391 error!("Failed to send hello block: {:?}", err);
392 hello_sent_sender.map(|sender| sender.send(Err(())));
393 } else {
394 hello_sent_sender.map(|sender| sender.send(Ok(())));
395 }
396 } else {
397 hello_sent_sender.map(|sender| sender.send(Ok(())));
398 }
399 }
400
401 async fn handle_socket_task(
403 self: Rc<Self>,
404 socket_properties: SocketProperties,
405 mut socket_iterator: SocketDataIterator,
406 com_interface_uuid: ComInterfaceUUID,
407 auto_identify: bool,
408 close_receiver: SocketCloseReceiver,
409 cleanup_callback: Option<CloseAsyncCallback>,
410 hello_sent_sender: Option<Sender<Result<(), ()>>>,
411 ) {
412 info!("start handle socket task");
413
414 self.task_manager
416 .register_task(self.clone().send_socket_hello(
417 socket_properties.uuid(),
418 socket_properties.direction.clone(),
419 auto_identify,
420 hello_sent_sender,
421 ));
422
423 let (mut bytes_sender, block_iterator) = BlockCollector::create();
424 let mut block_iterator = Box::pin(block_iterator);
425
426 let closed_sender = select!(
427 _ = async {
428 loop {
429 select! {
430 data = async_next_pin_box(&mut socket_iterator).fuse() => {
432 match data {
433
434 Some(Ok(data)) => {
436 if let Err(e) = bytes_sender.start_send(data) {
438 error!("Error sending data to BlockCollector: {:?}", e);
439 break;
440 }
441 }
442
443 Some(Err(_)) => {
445 error!("Socket {} closed, removing socket", socket_properties.uuid());
446 break;
447 }
448
449 None => {
451 error!("Socket {} closed (iterator finished), removing socket", socket_properties.uuid());
452 break;
453 }
454 }
455 },
456 Some(block) = async_next_pin_box(&mut block_iterator).fuse() => {
458 self.clone().handle_incoming_block_async(block, socket_properties.uuid()).await;
462 },
463
464 complete => break,
465 }
466 }
467 } => None,
468 closed_sender = close_receiver => {
469 info!("received socket close signal for socket removing socket");
470 Some(closed_sender.unwrap())
471 }
472 );
473
474 if let Some(cleanup_callback) = cleanup_callback {
476 cleanup_callback().await;
477 }
478
479 self.socket_manager
481 .cleanup_socket(&socket_properties.uuid());
482
483 if self.interfaces_manager.has_interface(&com_interface_uuid) {
485 if !self
488 .interfaces_manager
489 .is_interface_waiting_for_socket_connections(
490 &com_interface_uuid,
491 )
492 {
493 self.interfaces_manager
494 .cleanup_interface(&com_interface_uuid)
495 .unwrap();
496 info!(
497 "Destroyed interface {} as it is no longer waiting for socket connections",
498 com_interface_uuid
499 );
500 }
502 }
503
504 if let Some(closed_sender) = closed_sender {
505 closed_sender.send(()).unwrap();
506 }
507 }
508
509 async fn handle_incoming_block_async(
511 self: Rc<Self>,
512 block: DXBBlock,
513 socket_uuid: ComInterfaceSocketUUID,
514 ) {
515 let receive_block_result = self
517 .clone()
518 .receive_block(block, socket_uuid)
519 .into_future()
520 .await;
521
522 let own_received_block = match receive_block_result {
523 Ok(own_received_block) => own_received_block,
524 Err(e) => {
525 error!("Failed to validate block signature: {:?}", e);
526 return;
527 }
528 };
529
530 if let Some(own_received_block) = own_received_block {
532 self.block_handler.handle_incoming_block(own_received_block);
533 }
534 }
535
536 pub fn is_local_endpoint_exact(&self, endpoint: &Endpoint) -> bool {
538 &self.endpoint == endpoint || endpoint.is_local()
539 }
540
541 pub fn register_incoming_block_interceptor<F>(&self, interceptor: F)
543 where
544 F: Fn(&DXBBlock, &ComInterfaceSocketUUID) + 'static,
545 {
546 self.incoming_block_interceptors
547 .borrow_mut()
548 .push(Box::new(interceptor));
549 }
550
551 pub fn register_outgoing_block_interceptor<F>(&self, interceptor: F)
553 where
554 F: Fn(&DXBBlock, &ComInterfaceSocketUUID, &[Endpoint]) + 'static,
555 {
556 self.outgoing_block_interceptors
557 .borrow_mut()
558 .push(Box::new(interceptor));
559 }
560
561 pub(crate) fn receive_block(
568 self: Rc<Self>,
569 block: DXBBlock,
570 socket_uuid: ComInterfaceSocketUUID,
571 ) -> MaybeAsync<ReceiveBlockResult, impl Future<Output = ReceiveBlockResult>>
572 {
573 let preprocess_result =
575 self.receive_block_preprocess(&socket_uuid, block);
576
577 let self_clone = self.clone();
578
579 let validation_result = match preprocess_result.own_received_block {
581 Some(block) => block
583 .validate_signature()
584 .map(|validation| validation.map(Some)),
585 None => MaybeAsync::Sync(Ok(None)),
587 };
588
589 validation_result
590 .map(move |validation_result| {
591 let own_received_block = match validation_result {
593 Ok(block) => block,
594 Err(e) => return MaybeAsync::Sync(Err(e)),
595 };
596
597 let (trace_block, own_block) = match own_received_block {
598 Some(block) => {
599 let block_type = block.block_type();
600 match block_type {
601 BlockType::Trace | BlockType::TraceBack => {
602 (Some(block), None)
603 }
604 _ => (None, Some(block)),
605 }
606 }
607 None => (None, None),
608 };
609
610 if preprocess_result.relayed_block.is_some()
611 || trace_block.is_some()
612 {
613 MaybeAsync::Async(async move {
614 self_clone
615 .receive_block_async(
616 trace_block,
617 preprocess_result.relayed_block,
618 preprocess_result.block_id_for_history,
619 socket_uuid,
620 preprocess_result.is_for_own,
621 )
622 .await;
623
624 Ok(own_block)
625 })
626 }
627 else {
629 MaybeAsync::Sync(Ok(own_block))
630 }
631 })
632 .flatten()
633 }
634
635 fn receive_block_preprocess(
637 &self,
638 socket_uuid: &ComInterfaceSocketUUID,
639 block: DXBBlock,
640 ) -> ReceiveBlockPreprocessResult {
641 info!("{} received block: {}", self.endpoint, block);
642
643 for interceptor in self.incoming_block_interceptors.borrow().iter() {
644 interceptor(&block, socket_uuid);
645 }
646
647 let block_type = block.block_header.flags_and_timestamp.block_type();
648
649 let is_new_block = !self.block_handler.is_block_in_history(&block);
651
652 if is_new_block
655 && !self.is_local_endpoint_exact(&block.routing_header.sender)
656 {
657 self.register_socket_endpoint_from_incoming_block(
658 socket_uuid.clone(),
659 &block,
660 );
661 }
662
663 let all_receivers = block.receiver_endpoints();
664 let (relayed_block, own_received_block, is_for_own) =
665 if !all_receivers.is_empty() {
666 let is_for_own = all_receivers.iter().any(|e| {
667 self.is_local_endpoint_exact(e)
668 || e == &Endpoint::ANY
669 || e == &Endpoint::ANY_ALL_INSTANCES
670 });
671
672 let own_received_block =
674 if is_for_own && block_type != BlockType::Hello {
675 info!("Block is for this endpoint");
676
677 Some(block.clone()) } else {
679 None
680 };
681
682 let relay_receivers = {
684 let should_relay =
685 !(
687 is_for_own && block_type == BlockType::Hello
688 );
689
690 if should_relay {
692 let relay_receivers = if is_for_own {
693 self.get_remote_receivers(&all_receivers)
695 } else {
696 all_receivers
697 };
698 if relay_receivers.is_empty() {
699 None
700 } else {
701 Some(relay_receivers)
702 }
703 } else {
704 None
705 }
706 };
707
708 let relayed_block = relay_receivers
709 .map(|receivers| block.clone_with_new_receivers(receivers));
710
711 (relayed_block, own_received_block, is_for_own)
712 } else {
713 (None, None, false)
714 };
715
716 let block_id_for_history = if is_new_block {
718 Some(block.get_block_id())
719 } else {
720 None
721 };
722
723 ReceiveBlockPreprocessResult {
724 relayed_block,
725 own_received_block,
726 block_id_for_history,
727 is_for_own,
728 }
729 }
730
731 pub(crate) async fn receive_block_async(
733 self: Rc<Self>,
734 trace_block: Option<DXBBlock>,
735 relayed_block: Option<DXBBlock>,
736 block_id_for_history: Option<BlockId>,
737 socket_uuid: ComInterfaceSocketUUID,
738 is_for_own: bool,
739 ) {
740 if let Some(block) = trace_block {
742 info!("Handling trace block asynchronously");
743
744 match block.block_type() {
745 BlockType::Trace => {
746 self.handle_trace_block(&block, socket_uuid.clone()).await;
747 }
748 BlockType::TraceBack => {
749 self.handle_trace_back_block(&block, socket_uuid.clone());
750 }
751 _ => unreachable!(), }
753 }
754
755 if let Some(block) = relayed_block {
757 match block.block_type() {
758 BlockType::Trace | BlockType::TraceBack => {
759 self.redirect_trace_block(
760 block,
761 socket_uuid.clone(),
762 is_for_own,
763 )
764 .await;
765 }
766 _ => {
767 self.redirect_block(block, socket_uuid.clone(), is_for_own)
768 .await
769 .unwrap(); }
771 }
772 }
773
774 if let Some(block_id) = block_id_for_history {
776 self.block_handler
777 .add_block_id_to_history(block_id, Some(socket_uuid));
778 }
779 }
780
781 fn get_remote_receivers(
784 &self,
785 receiver_endpoints: &[Endpoint],
786 ) -> Vec<Endpoint> {
787 receiver_endpoints
788 .iter()
789 .filter(|e| !self.is_local_endpoint_exact(e))
790 .cloned()
791 .collect::<Vec<_>>()
792 }
793
794 fn register_socket_endpoint_from_incoming_block(
797 &self,
798 socket_uuid: ComInterfaceSocketUUID,
799 block: &DXBBlock,
800 ) {
801 let mut socket =
802 self.socket_manager.get_socket_by_uuid_mut(&socket_uuid);
803
804 let distance = block.routing_header.distance;
805 let sender = block.routing_header.sender.clone();
806
807 if socket.socket_properties.direct_endpoint.is_none() && distance == 1 {
809 info!(
810 "Setting direct endpoint for socket {}: {}",
811 socket.socket_properties.uuid(),
812 sender
813 );
814 socket.socket_properties.direct_endpoint = Some(sender.clone());
815 }
816 let uuid = socket.socket_properties.uuid().clone();
817
818 drop(socket);
819
820 match self.socket_manager.register_socket_endpoint(
821 uuid,
822 sender.clone(),
823 distance,
824 ) {
825 Err(SocketEndpointRegistrationError::SocketEndpointAlreadyRegistered) => {
826 debug!(
827 "Socket already registered for endpoint {sender}",
828 );
829 }
830 Err(error) => {
831 core::panic!("Failed to register socket endpoint {sender}: {error:?}");
832 },
833 Ok(_) => { }
834 }
835 }
836
837 pub(crate) async fn redirect_block(
840 &self,
841 mut block: DXBBlock,
842 incoming_socket: ComInterfaceSocketUUID,
843 forked: bool,
845 ) -> Result<(), Vec<Endpoint>> {
846 let receivers = block.receiver_endpoints();
847
848 let history_block_data =
851 self.block_handler.get_block_data_from_history(&block);
852 if history_block_data.is_some() {
853 for receiver in &receivers {
854 if !self.is_local_endpoint_exact(receiver) {
855 info!(
856 "{}: Adding socket {} to blacklist for receiver {}",
857 self.endpoint, incoming_socket, receiver
858 );
859 self.socket_manager.add_to_endpoint_blocklist(
860 receiver.clone(),
861 &incoming_socket,
862 );
863 }
864 }
865 }
866
867 block.routing_header.distance += 1;
869
870 if block.routing_header.ttl > 1 {
873 block.routing_header.ttl -= 1;
874 }
875 else if block.routing_header.ttl == 1 {
877 block.routing_header.ttl -= 1;
878 warn!("Block TTL expired. Dropping block...");
879 return Ok(());
880 } else {
882 warn!("Block TTL expired. Dropping block...");
883 return Ok(());
884 }
885
886 let mut prefer_incoming_socket_for_bounce_back = false;
887 let res = {
890 if self.is_local_endpoint_exact(&block.routing_header.sender) {
891 prefer_incoming_socket_for_bounce_back =
893 !block.is_bounce_back();
894 Err(receivers.to_vec())
895 } else {
896 let mut excluded_sockets = vec![incoming_socket.clone()];
897 if let Some(BlockHistoryData {
898 original_socket_uuid: Some(original_socket_uuid),
899 }) = &history_block_data
900 {
901 excluded_sockets.push(original_socket_uuid.clone())
902 }
903 self.send_block_async(block.clone(), excluded_sockets, forked)
904 .await
905 }
906 };
907
908 if let Err(unreachable_endpoints) = res {
910 let send_back_socket = if !prefer_incoming_socket_for_bounce_back
914 && let Some(history_block_data) = history_block_data
915 {
916 history_block_data.original_socket_uuid
917 } else {
918 Some(incoming_socket.clone())
919 };
920
921 if let Some(send_back_socket) = send_back_socket {
924 if block.is_bounce_back() && send_back_socket == incoming_socket
926 {
927 warn!(
928 "{}: Tried to send bounce back block back to incoming socket, but this is not allowed",
929 self.endpoint
930 );
931 Ok(())
932 } else if let Some(socket) =
933 self.socket_manager.get_socket_by_uuid(&send_back_socket)
934 && socket.socket_properties.direction.can_send()
935 {
936 block.set_bounce_back(true);
937 self
938 .send_block_to_endpoints_via_socket(
939 block,
940 send_back_socket,
941 unreachable_endpoints.clone(),
942 if forked { Some(0) } else { None },
943 )
944 .into_error_future()
945 .await
946 .map_or(Ok(()), |e| {
947 error!(
948 "{}: Failed to send bounce back block to socket: {:?}",
949 self.endpoint, e
950 );
951 Err(unreachable_endpoints)
952 })
953 } else {
954 error!(
955 "Tried to send bounce back block, but cannot send back to incoming socket"
956 );
957 Err(unreachable_endpoints)
958 }
959 }
960 else {
964 self.send_block_async(block, vec![], forked).await.map_or(Ok(()), |e| {
965 error!(
966 "{}: Failed to send bounce back block to socket: {:?}",
967 self.endpoint, e
968 );
969 Err(unreachable_endpoints)
970 })
971 }
972 } else {
973 Ok(())
974 }
975 }
976
977 pub fn prepare_own_block(
980 &self,
981 mut block: DXBBlock,
982 ) -> PrepareOwnBlockResult<'_> {
983 fn update_sender_and_timestamp(
985 mut block: DXBBlock,
986 endpoint: Endpoint,
987 ) -> Result<DXBBlock, ComHubError> {
988 let now = now_ms();
989 block.routing_header.sender = endpoint;
990 block
991 .block_header
992 .flags_and_timestamp
993 .set_creation_timestamp(now);
994 block.routing_header.distance = 1;
995 Ok(block)
996 }
997
998 match block.routing_header.flags.signature_type() {
999 SignatureType::None => SyncOrAsync::Sync(
1001 update_sender_and_timestamp(block, self.endpoint.clone()),
1002 ),
1003
1004 sig_ty => {
1006 let endpoint = self.endpoint.clone();
1007
1008 SyncOrAsync::Async(Box::pin(async move {
1009 let (pub_key, pri_key) = CryptoImpl::gen_ed25519()
1010 .await
1011 .map_err(|_| ComHubError::SignatureCreationError)?;
1012
1013 let raw_signed =
1014 [pub_key.clone(), block.body.clone()].concat();
1015
1016 let hashed_signed = CryptoImpl::hash_sha256(&raw_signed)
1017 .await
1018 .map_err(|_| ComHubError::SignatureCreationError)?;
1019
1020 let signature =
1021 CryptoImpl::sig_ed25519(&pri_key, &hashed_signed)
1022 .await
1023 .map_err(|_| ComHubError::SignatureCreationError)?;
1024
1025 let sig_bytes: Vec<u8> = match sig_ty {
1026 SignatureType::Unencrypted => signature.to_vec(),
1027
1028 SignatureType::Encrypted => {
1029 let hash =
1030 CryptoImpl::hkdf_sha256(&pub_key, &[0u8; 16])
1031 .await
1032 .map_err(|_| {
1033 ComHubError::SignatureCreationError
1034 })?;
1035
1036 CryptoImpl::aes_ctr_encrypt(
1037 &hash, &[0u8; 16], &signature,
1038 )
1039 .await
1040 .map_err(|_| ComHubError::SignatureCreationError)?
1041 .to_vec()
1042 }
1043
1044 SignatureType::None => unreachable!("handled above"),
1045 };
1046
1047 block.signature = Some([sig_bytes, pub_key].concat());
1048 update_sender_and_timestamp(block, endpoint)
1049 }))
1050 }
1051 }
1052 }
1053
1054 pub async fn send_own_block_async(
1056 &self,
1057 mut block: DXBBlock,
1058 ) -> Result<(), Vec<Endpoint>> {
1059 block = self
1060 .prepare_own_block(block)
1061 .into_result()
1062 .await
1063 .unwrap_or_else(|e| {
1064 panic!("Error preparing own block for sending: {:?}", e)
1065 });
1066
1067 self.block_handler
1069 .add_block_id_to_history(block.get_block_id(), None);
1070 self.send_block_async(block, vec![], false).await
1071 }
1072
1073 pub fn send_own_block(
1077 &self,
1078 mut block: DXBBlock,
1079 ) -> Result<Option<Vec<Vec<u8>>>, Vec<Endpoint>> {
1080 let receivers = block.receiver_endpoints();
1081 block = match self.prepare_own_block(block) {
1082 SyncOrAsync::Sync(res) => res.unwrap_or_else(|e| {
1083 panic!("Error preparing own block for sending: {:?}", e)
1084 }),
1085 SyncOrAsync::Async(_) => {
1086 return Err(receivers);
1087 }
1088 };
1089 self.block_handler
1090 .add_block_id_to_history(block.get_block_id(), None);
1091 match self.send_block(block, vec![], false) {
1092 BlockSendSyncOrAsyncResult::Sync(res) => res,
1093 BlockSendSyncOrAsyncResult::Async(_) => Err(receivers),
1094 }
1095 }
1096
1097 pub async fn send_own_block_await_response(
1102 &self,
1103 block: DXBBlock,
1104 options: ResponseOptions,
1105 ) -> Vec<Result<Response, ResponseError>> {
1106 let context_id = block.block_header.context_id;
1107 let section_index = block.block_header.section_index;
1108
1109 let mut rx = self
1110 .block_handler
1111 .register_incoming_block_observer(context_id, section_index);
1112
1113 let has_exact_receiver_count = block.has_exact_receiver_count();
1114 let receivers = block.receiver_endpoints();
1115
1116 let res = self.send_own_block_async(block).await;
1117 let failed_endpoints = res.err().unwrap_or_default();
1118
1119 let timeout = options
1120 .timeout
1121 .unwrap_or_default(self.options.default_receive_timeout);
1122
1123 if has_exact_receiver_count {
1125 if (options.resolution_strategy
1127 == ResponseResolutionStrategy::ReturnOnAnyError
1128 || options.resolution_strategy
1129 == ResponseResolutionStrategy::ReturnOnFirstResult)
1130 && !failed_endpoints.is_empty()
1131 {
1132 return receivers
1134 .iter()
1135 .map(|receiver| {
1136 if failed_endpoints.contains(receiver) {
1137 Err(ResponseError::NotReachable(receiver.clone()))
1138 } else {
1139 Err(ResponseError::EarlyAbort(receiver.clone()))
1140 }
1141 })
1142 .collect::<Vec<_>>();
1143 }
1144
1145 let mut responses = HashMap::new();
1147 let mut missing_response_count = receivers.len();
1148 for receiver in &receivers {
1149 responses.insert(
1150 receiver.clone(),
1151 if failed_endpoints.contains(receiver) {
1152 Err(ResponseError::NotReachable(receiver.clone()))
1153 } else {
1154 Err(ResponseError::NoResponseAfterTimeout(
1155 receiver.clone(),
1156 timeout,
1157 ))
1158 },
1159 );
1160 }
1161 missing_response_count -= failed_endpoints.len();
1163
1164 info!(
1165 "Waiting for responses from receivers {}",
1166 receivers
1167 .iter()
1168 .map(|e| e.to_string())
1169 .collect::<Vec<_>>()
1170 .join(",")
1171 );
1172
1173 let res = task::timeout(timeout, async {
1174 while let Some(section) = rx.next().await {
1175 let mut received_response = false;
1176 let mut sender = section.get_sender();
1178 if let Some(response) = responses.get_mut(&sender) {
1180 if response.is_err() {
1182 *response = Ok(Response::ExactResponse(sender.clone(), section));
1183 missing_response_count -= 1;
1184 info!("Received expected response from {sender}");
1185 received_response = true;
1186 }
1187 else {
1189 error!("Received multiple responses from the same sender: {sender}");
1190 }
1191 }
1192 else if let Some(matches_endpoint) = self.try_match_sender(&mut responses, &sender) {
1194 let response = responses.get_mut(&matches_endpoint).unwrap();
1195 info!("Received resolved response from {} -> {}", &sender, &sender.any_instance_endpoint());
1196 sender = sender.any_instance_endpoint();
1197 if response.is_err() {
1199 *response = Ok(Response::ResolvedResponse(sender.clone(), section));
1200 missing_response_count -= 1;
1201 received_response = true;
1202 }
1203 else {
1205 info!("Received multiple resolved responses from the {}", &sender);
1206 }
1207 }
1208 else {
1210 error!("Received response from unexpected sender: {}", &sender);
1211 }
1212
1213 if received_response && options.resolution_strategy == ResponseResolutionStrategy::ReturnOnFirstResult {
1215 for (receiver, response) in responses.iter_mut() {
1217 if receiver != &sender {
1218 *response = Err(ResponseError::EarlyAbort(receiver.clone()));
1219 }
1220 }
1221 break;
1222 }
1223
1224 if missing_response_count == 0 {
1226 break;
1227 }
1228 }
1229 }).await;
1230
1231 if res.is_err() {
1232 error!("Timeout waiting for responses");
1233 }
1234
1235 responses.into_values().collect::<Vec<_>>()
1237 }
1238 else {
1240 let mut responses = vec![];
1241
1242 let res = task::timeout(timeout, async {
1243 let mut rx =
1244 self.block_handler.register_incoming_block_observer(
1245 context_id,
1246 section_index,
1247 );
1248 while let Some(section) = rx.next().await {
1249 let sender = section.get_sender();
1251 info!("Received response from {sender}");
1252 responses.push(Ok(Response::UnspecifiedResponse(section)));
1254
1255 if options.resolution_strategy
1257 == ResponseResolutionStrategy::ReturnOnFirstResult
1258 {
1259 break;
1260 }
1261 }
1262 })
1263 .await;
1264
1265 if res.is_err() {
1266 info!("Timeout waiting for responses");
1267 }
1268
1269 responses
1270 }
1271 }
1272
1273 fn try_match_sender(
1275 &self,
1276 responses: &mut HashMap<Endpoint, Result<Response, ResponseError>>,
1277 sender: &Endpoint,
1278 ) -> Option<Endpoint> {
1279 let matches = gen {
1280 yield sender.any_instance_endpoint();
1282 if self.is_local_endpoint_exact(sender) {
1284 yield Endpoint::LOCAL;
1285 yield Endpoint::LOCAL_ALL_INSTANCES;
1286 }
1287 }
1288 .collect::<Vec<Endpoint>>();
1289 for try_match_sender in matches {
1290 let res = responses.get(&try_match_sender);
1291 if let Some(_response) = res {
1292 return Some(try_match_sender);
1293 }
1294 }
1295 None
1296 }
1297
1298 pub async fn send_block_async(
1302 &self,
1303 block: DXBBlock,
1304 exclude_sockets: Vec<ComInterfaceSocketUUID>,
1305 forked: bool,
1306 ) -> Result<(), Vec<Endpoint>> {
1307 match self.send_block(block, exclude_sockets, forked) {
1308 SyncOrAsyncResult::Sync(res) => {
1309 res.map(|_| ())
1311 }
1312 SyncOrAsyncResult::Async(fut) => fut.await,
1313 }
1314 }
1315
1316 pub fn send_block(
1325 &self,
1326 mut block: DXBBlock,
1327 exclude_sockets: Vec<ComInterfaceSocketUUID>,
1328 forked: bool,
1329 ) -> BlockSendSyncOrAsyncResult<
1330 impl Future<Output = Result<(), Vec<Endpoint>>>,
1331 > {
1332 let outbound_receiver_groups =
1333 self.socket_manager.get_outbound_receiver_groups(
1334 &self.endpoint,
1335 &block.receiver_endpoints(),
1336 exclude_sockets,
1337 );
1338
1339 if outbound_receiver_groups.is_none() {
1340 error!("No outbound receiver groups found for block");
1341 return SyncOrAsyncResult::Sync(Err(vec![]));
1342 }
1343
1344 let outbound_receiver_groups = outbound_receiver_groups.unwrap();
1345
1346 let mut unreachable_endpoints = vec![];
1347
1348 let mut fork_count = if forked || outbound_receiver_groups.len() > 1 {
1353 Some(0)
1354 } else {
1355 None
1356 };
1357
1358 block.set_bounce_back(false);
1359
1360 let mut results = Vec::new();
1361
1362 for (receiver_socket, endpoints) in outbound_receiver_groups {
1363 if let Some(socket_uuid) = receiver_socket {
1364 results.push((
1365 endpoints.clone(),
1366 self.send_block_to_endpoints_via_socket(
1367 block.clone(),
1368 socket_uuid,
1369 endpoints,
1370 fork_count,
1371 ),
1372 ));
1373 } else {
1374 error!(
1375 "{}: cannot send block, no receiver sockets found for endpoints {:?}",
1376 self.endpoint,
1377 endpoints.iter().map(|e| e.to_string()).collect::<Vec<_>>()
1378 );
1379 unreachable_endpoints.extend(endpoints);
1380 }
1381 if let Some(count) = fork_count {
1383 fork_count = Some(count + 1);
1384 }
1385 }
1386
1387 if !unreachable_endpoints.is_empty() {
1389 return SyncOrAsyncResult::Sync(Err(unreachable_endpoints));
1390 }
1391
1392 if results
1394 .iter()
1395 .all(|(_, res)| matches!(res, SyncOrAsync::Sync(_)))
1396 {
1397 let mut received_blocks = Vec::new();
1398 for (endpoints, res) in results {
1399 match res {
1400 SyncOrAsync::Sync(r) => {
1401 match r {
1402 Ok(Some(data)) => {
1403 received_blocks.push(data); }
1405 Ok(None) => { }
1406 Err(_) => {
1407 unreachable_endpoints.extend(endpoints);
1408 }
1409 }
1410 }
1411 _ => unreachable!(),
1412 }
1413 }
1414 if !unreachable_endpoints.is_empty() {
1415 SyncOrAsyncResult::Sync(Err(unreachable_endpoints))
1416 } else {
1417 SyncOrAsyncResult::Sync(Ok(Some(received_blocks)))
1418 }
1419 }
1420 else {
1422 SyncOrAsyncResult::Async(async move {
1423 let futures =
1424 results.into_iter().map(|(endpoints, res)| async move {
1425 match res {
1426 SyncOrAsync::Sync(r) => {
1427 r.map(|_data| ()).map_err(|_| endpoints)
1429 }
1430 SyncOrAsync::Async(fut) => {
1431 fut.await.map_err(|_| endpoints)
1432 }
1433 }
1434 });
1435
1436 let res = futures::future::join_all(futures).await;
1437 let all_unreachable_endpoints = res
1439 .into_iter()
1440 .filter_map(|r| r.err())
1441 .flatten()
1442 .collect::<Vec<_>>();
1443 if !all_unreachable_endpoints.is_empty() {
1444 Err(all_unreachable_endpoints)
1445 } else {
1446 Ok(())
1447 }
1448 })
1449 }
1450 }
1451
1452 fn send_block_to_endpoints_via_socket(
1455 &self,
1456 mut block: DXBBlock,
1457 socket_uuid: ComInterfaceSocketUUID,
1458 endpoints: Vec<Endpoint>,
1459 fork_count: Option<usize>,
1461 ) -> SyncOrAsyncResult<
1462 Option<Vec<u8>>,
1463 (),
1464 SendFailure,
1465 impl Future<Output = Result<(), SendFailure>>,
1466 > {
1467 block.set_receivers(&endpoints);
1468
1469 let socket_data =
1470 match self.socket_manager.get_socket_by_uuid(&socket_uuid) {
1471 Some(socket_data) => socket_data,
1472 None => {
1473 return SyncOrAsyncResult::Sync(Err(SendFailure(
1474 Box::new(block),
1475 )));
1476 }
1477 };
1478
1479 if block.is_bounce_back() {
1482 block.routing_header.distance -= 2;
1483 }
1484
1485 match block.block_header.flags_and_timestamp.block_type() {
1489 BlockType::Trace | BlockType::TraceBack => {
1490 let distance = block.routing_header.distance;
1491 let new_fork_nr = self.calculate_fork_nr(&block, fork_count);
1492 let bounce_back = block.is_bounce_back();
1493
1494 self.add_hop_to_block_trace_data(
1495 &mut block,
1496 NetworkTraceHop {
1497 endpoint: self.endpoint.clone(),
1498 distance,
1499 socket: NetworkTraceHopSocket::new(
1500 &socket_data.interface_properties,
1501 socket_uuid.clone(),
1502 ),
1503 direction: NetworkTraceHopDirection::Outgoing,
1504 fork_nr: new_fork_nr,
1505 bounce_back,
1506 },
1507 );
1508 }
1509 _ => {}
1510 }
1511
1512 let is_broadcast = endpoints
1513 .iter()
1514 .any(|e| e == &Endpoint::ANY_ALL_INSTANCES || e == &Endpoint::ANY);
1515
1516 if is_broadcast
1518 && let Some(direct_endpoint) =
1519 &socket_data.socket_properties.direct_endpoint
1520 && self.is_local_endpoint_exact(direct_endpoint)
1521 {
1522 return SyncOrAsyncResult::Sync(Ok(None));
1523 }
1524 for interceptor in self.outgoing_block_interceptors.borrow().iter() {
1525 interceptor(&block, &socket_uuid, &endpoints);
1526 }
1527 info!(
1528 "Sending block to socket {}: {}",
1529 socket_uuid,
1530 endpoints.iter().map(|e| e.to_string()).join(", ")
1531 );
1532
1533 if let Some(send_callback) = socket_data.send_callback.clone() {
1535 match send_callback {
1536 SendCallback::Sync(callback)
1537 | SendCallback::SyncOnce(callback) => SyncOrAsyncResult::Sync(
1538 callback(block).map(|send_success| match send_success {
1539 SendSuccess::SentWithNewIncomingData(data) => {
1540 Some(data)
1541 }
1542 _ => None,
1543 }),
1544 ),
1545 SendCallback::Async(callback) => {
1546 SyncOrAsyncResult::Async(async move {
1547 callback.call(block).await.map(|_| ())
1548 })
1549 }
1550 }
1551 } else {
1552 panic!("No send callback registered for socket {}", socket_uuid);
1553 }
1554 }
1555
1556 pub async fn send_hello_block(
1559 &self,
1560 socket_uuid: ComInterfaceSocketUUID,
1561 ) -> Result<(), SendFailure> {
1562 let mut block: DXBBlock = DXBBlock::default();
1563 block
1564 .block_header
1565 .flags_and_timestamp
1566 .set_block_type(BlockType::Hello);
1567 let block = self
1570 .prepare_own_block(block)
1571 .into_result()
1572 .await
1573 .unwrap_or_else(|e| {
1574 panic!("Error preparing own block for sending: {:?}", e)
1575 });
1576
1577 match self
1578 .send_block_to_endpoints_via_socket(
1579 block,
1580 socket_uuid.clone(),
1581 vec![Endpoint::ANY],
1582 None,
1583 )
1584 .into_future()
1585 .await
1586 {
1587 SyncOrAsyncResolved::Sync(r) => r.map(|_| ()),
1588 SyncOrAsyncResolved::Async(fut) => fut,
1589 }
1590 }
1591
1592 pub fn clear_endpoint_blacklist(&self) {
1593 self.socket_manager
1594 .endpoint_sockets_blacklist
1595 .borrow_mut()
1596 .clear();
1597 }
1598
1599 pub fn interfaces_manager(&self) -> &ComInterfaceManager {
1600 &self.interfaces_manager
1601 }
1602
1603 pub fn socket_manager(&self) -> &ComInterfaceSocketManager {
1604 &self.socket_manager
1605 }
1606}
1607
1608#[cfg(test)]
1609#[cfg(feature = "crypto_enabled")]
1610pub mod tests {
1611 use crate::{
1612 channel::mpsc::{
1613 UnboundedReceiver, UnboundedSender, create_unbounded_channel,
1614 },
1615 global::{
1616 dxb_block::{DXBBlock, IncomingSection},
1617 protocol_structures::{
1618 block_header::{BlockHeader, FlagsAndTimestamp},
1619 routing_header::RoutingHeader,
1620 },
1621 },
1622 network::{
1623 com_hub::{
1624 ComHub, InterfacePriority,
1625 errors::ComInterfaceCreateError,
1626 managers::com_interface_manager::{
1627 ComInterfaceAsyncFactoryResult, ComInterfaceManager,
1628 },
1629 test_utils::{
1630 TEST_ENDPOINT_A, TEST_ENDPOINT_B, TEST_ENDPOINT_C,
1631 get_endpoints_from_com_hub_metadata,
1632 run_with_coupled_com_hubs,
1633 },
1634 },
1635 com_interfaces::com_interface::{
1636 factory::{
1637 ComInterfaceAsyncFactory, ComInterfaceConfiguration,
1638 ComInterfaceSyncFactory, SendCallback, SendSuccess,
1639 SocketConfiguration, SocketProperties,
1640 },
1641 properties::{ComInterfaceProperties, InterfaceDirection},
1642 },
1643 },
1644 prelude::*,
1645 std_sync::Mutex,
1646 values::core_values::endpoint::Endpoint,
1647 };
1648 use alloc::rc::Rc;
1649 use async_select::select;
1650 use futures_util::FutureExt;
1651 use log::info;
1652 use serde::Deserialize;
1653 use tokio::task::yield_now;
1654
1655 async fn run_with_com_hub<AppReturn, AppFuture>(
1657 app_logic: impl FnOnce(
1658 Rc<ComHub>,
1659 UnboundedReceiver<IncomingSection>,
1660 ) -> AppFuture,
1661 ) -> AppReturn
1662 where
1663 AppFuture: Future<Output = AppReturn>,
1664 {
1665 let (sender, receiver) = create_unbounded_channel();
1666 let (com_hub, com_hub_future) =
1667 ComHub::create(TEST_ENDPOINT_A.clone(), sender);
1668 select! {
1669 app_result = app_logic(com_hub, receiver).fuse() => app_result,
1670 _ = com_hub_future.fuse() => panic!("ComHub future should not complete during the test"),
1671 }
1672 }
1673
1674 async fn add_proxy_interface_to_com_hub(
1675 com_hub: Rc<ComHub>,
1676 endpoint: Endpoint,
1677 ) -> (UnboundedSender<Vec<u8>>, UnboundedReceiver<DXBBlock>) {
1678 let (outgoing_block_sender, outgoing_block_receiver) =
1679 create_unbounded_channel();
1680 let (incoming_data_sender, mut incoming_data_receiver) =
1681 create_unbounded_channel::<Vec<u8>>();
1682 let outgoing_block_sender = Rc::new(Mutex::new(outgoing_block_sender));
1683
1684 let proxy_interface_configuration =
1685 ComInterfaceConfiguration::new_single_socket(
1686 ComInterfaceProperties {
1687 interface_type: "proxy".to_string(),
1688 channel: "proxy".to_string(),
1689 name: Some("proxy".to_string()),
1690 ..Default::default()
1691 },
1692 SocketConfiguration::new_in_out(
1693 SocketProperties::new_with_direct_endpoint(
1694 InterfaceDirection::InOut,
1695 1,
1696 endpoint,
1697 ),
1698 async gen move {
1699 while let Some(block) =
1700 incoming_data_receiver.next().await
1701 {
1702 yield Ok(block)
1703 }
1704 },
1705 SendCallback::new_sync(move |block| {
1706 outgoing_block_sender
1707 .try_lock()
1708 .unwrap()
1709 .start_send(block)
1710 .unwrap();
1711 Ok(SendSuccess::Sent)
1712 }),
1713 ),
1714 );
1715 com_hub
1716 .clone()
1717 .add_interface_from_configuration(
1718 proxy_interface_configuration,
1719 InterfacePriority::None,
1720 )
1721 .unwrap()
1722 .unwrap()
1723 .await
1724 .unwrap();
1725
1726 (incoming_data_sender, outgoing_block_receiver)
1727 }
1728
1729 async fn run_with_com_hub_and_proxy_interface<AppReturn, AppFuture>(
1730 app_logic: impl FnOnce(
1731 Rc<ComHub>,
1732 UnboundedSender<Vec<u8>>,
1733 UnboundedReceiver<DXBBlock>,
1734 UnboundedReceiver<IncomingSection>,
1735 ) -> AppFuture,
1736 ) -> AppReturn
1737 where
1738 AppFuture: Future<Output = AppReturn>,
1739 {
1740 run_with_com_hub(|com_hub, incoming_sections_receiver| async move {
1741 let (incoming_data_sender, outgoing_block_receiver) =
1742 add_proxy_interface_to_com_hub(
1743 com_hub.clone(),
1744 TEST_ENDPOINT_B.clone(),
1745 )
1746 .await;
1747 app_logic(
1748 com_hub,
1749 incoming_data_sender,
1750 outgoing_block_receiver,
1751 incoming_sections_receiver,
1752 )
1753 .await
1754 })
1755 .await
1756 }
1757
1758 async fn send_blocks_to_endpoint(
1759 com_hub: Rc<ComHub>,
1760 incoming_data_sender: &mut UnboundedSender<Vec<u8>>,
1761 endpoint: Endpoint,
1762 blocks: &mut Vec<DXBBlock>,
1763 ) {
1764 for mut block in blocks {
1765 block.set_receivers(vec![endpoint.clone()]);
1766
1767 *block = com_hub
1768 .prepare_own_block(block.clone())
1769 .into_result()
1770 .await
1771 .unwrap();
1772
1773 let block_bytes = block.to_bytes();
1774 incoming_data_sender
1775 .start_send(block_bytes.as_slice().to_vec())
1776 .unwrap();
1777 }
1778 }
1779
1780 #[derive(Clone, Copy, PartialEq, Eq, Debug, Default)]
1781 pub enum CollectedBlockType {
1782 #[default]
1783 All,
1784 SingleBocks,
1785 BlockStream,
1786 }
1787
1788 impl CollectedBlockType {
1789 pub fn matches_section(&self, section: &IncomingSection) -> bool {
1790 match self {
1791 CollectedBlockType::SingleBocks => {
1792 matches!(section, IncomingSection::SingleBlock(_))
1793 }
1794 CollectedBlockType::BlockStream => {
1795 matches!(section, IncomingSection::BlockStream(_))
1796 }
1797 CollectedBlockType::All => true,
1798 }
1799 }
1800 }
1801
1802 pub async fn get_collected_received_blocks_from_receiver(
1803 sections_receiver: &mut UnboundedReceiver<IncomingSection>,
1804 collected_type: CollectedBlockType,
1805 count: usize,
1806 ) -> Vec<DXBBlock> {
1807 let mut blocks = vec![];
1808
1809 let mut received_count = 0;
1810
1811 while let Some(section) = sections_receiver.next().await {
1812 if !collected_type.matches_section(§ion) {
1813 panic!(
1814 "Received section does not match collected block type {:?}",
1815 collected_type
1816 );
1817 }
1818
1819 match section {
1820 IncomingSection::SingleBlock((Some(block), ..)) => {
1821 blocks.push(block.clone());
1822 received_count += 1;
1823 info!("Received single block");
1824 }
1825 IncomingSection::BlockStream((Some(mut block_stream), ..)) => {
1826 info!("[START] block stream");
1827 while let Some(block) = block_stream.next().await {
1828 received_count += 1;
1829 blocks.push(block.clone());
1830 info!("Received block from stream");
1831
1832 if received_count >= count {
1833 break;
1834 }
1835 }
1836 info!("[END] receiving block stream");
1837 }
1838 _ => {
1839 panic!("Received section does not contain a block");
1840 }
1841 }
1842
1843 if received_count >= count {
1844 break;
1845 }
1846 }
1847
1848 if blocks.len() != count {
1849 panic!(
1850 "Expected to receive {} blocks, but got {}",
1851 count,
1852 blocks.len()
1853 );
1854 }
1855
1856 blocks
1857 }
1858
1859 #[derive(Debug, Clone, Deserialize)]
1860 struct MockupInterfaceSetupData {
1861 pub name: String,
1862 }
1863 impl MockupInterfaceSetupData {
1864 pub fn new(name: &str) -> Self {
1865 Self {
1866 name: name.to_string(),
1867 }
1868 }
1869 }
1870
1871 impl ComInterfaceSyncFactory for MockupInterfaceSetupData {
1872 fn create_interface(
1873 self,
1874 ) -> Result<ComInterfaceConfiguration, ComInterfaceCreateError>
1875 {
1876 Ok(ComInterfaceConfiguration::new_single_socket(
1877 ComInterfaceProperties::default(),
1878 SocketConfiguration::new_in_out(
1879 SocketProperties::new(InterfaceDirection::InOut, 1),
1880 async gen move {
1881 loop {
1882 yield Ok(vec![]);
1883 }
1884 },
1885 SendCallback::new_sync(|_| Ok(SendSuccess::Sent)),
1886 ),
1887 ))
1888 }
1889
1890 fn get_default_properties() -> ComInterfaceProperties {
1891 ComInterfaceProperties {
1892 name: Some("mockup".to_string()),
1893 ..Default::default()
1894 }
1895 }
1896 }
1897
1898 impl ComInterfaceAsyncFactory for MockupInterfaceSetupData {
1899 fn create_interface(self) -> ComInterfaceAsyncFactoryResult {
1900 Box::pin(
1901 async move { ComInterfaceSyncFactory::create_interface(self) },
1902 )
1903 }
1904 fn get_default_properties() -> ComInterfaceProperties {
1905 <MockupInterfaceSetupData as ComInterfaceSyncFactory>::get_default_properties()
1906 }
1907 }
1908
1909 #[tokio::test]
1910 pub async fn create_from_sync_factory() {
1911 run_with_com_hub(|com_hub, _| async move {
1912 let interface_configuration =
1913 ComInterfaceManager::create_interface_sync_from_setup_data(
1914 MockupInterfaceSetupData::new("test"),
1915 )
1916 .unwrap();
1917 let uuid = interface_configuration.uuid().clone();
1918
1919 com_hub
1920 .clone()
1921 .add_interface_from_configuration(
1922 interface_configuration,
1923 InterfacePriority::default(),
1924 )
1925 .unwrap();
1926
1927 assert!(com_hub.remove_interface(uuid).await.is_ok());
1928 })
1929 .await;
1930 }
1931
1932 #[tokio::test]
1933 pub async fn create_from_async_factory() {
1934 run_with_com_hub(|com_hub, _| async move {
1935 let interface_configuration =
1936 ComInterfaceManager::create_interface_async_from_setup_data(
1937 MockupInterfaceSetupData::new("test"),
1938 )
1939 .await
1940 .unwrap();
1941 let uuid = interface_configuration.uuid().clone();
1942
1943 com_hub
1944 .clone()
1945 .add_interface_from_configuration(
1946 interface_configuration,
1947 InterfacePriority::default(),
1948 )
1949 .unwrap();
1950
1951 assert!(com_hub.remove_interface(uuid).await.is_ok());
1952 })
1953 .await;
1954 }
1955
1956 #[tokio::test]
1957 async fn create_hello_connection() {
1958 run_with_coupled_com_hubs(async |a, b| {
1959 let com_hub_a_sockets =
1960 get_endpoints_from_com_hub_metadata(a.com_hub.get_metadata());
1961 assert!(
1962 com_hub_a_sockets
1963 .contains(&(Some(TEST_ENDPOINT_B.clone()), Some(1)))
1964 );
1965
1966 let com_hub_b_sockets =
1967 get_endpoints_from_com_hub_metadata(b.com_hub.get_metadata());
1968 assert!(
1969 com_hub_b_sockets
1970 .contains(&(Some(TEST_ENDPOINT_A.clone()), Some(1)))
1971 );
1972 })
1973 .await;
1974 }
1975
1976 #[tokio::test]
1977 pub async fn test_send() {
1978 run_with_com_hub_and_proxy_interface(
1979 async move |com_hub, _, mut outgoing_block_receiver, _| {
1980 let mut block = DXBBlock::new_with_body(b"Hello world!");
1982 block.set_receivers(vec![TEST_ENDPOINT_B.clone()]);
1983 com_hub.send_own_block_async(block).await.unwrap();
1984
1985 outgoing_block_receiver.next().await.unwrap();
1987
1988 let outgoing_block =
1990 outgoing_block_receiver.next().await.unwrap();
1991 assert_eq!(outgoing_block.body, b"Hello world!");
1992 },
1993 )
1994 .await;
1995 }
1996
1997 #[tokio::test]
1998 pub async fn test_send_between_com_hubs() {
1999 run_with_coupled_com_hubs(async |a, mut b| {
2000 let mut block = DXBBlock::new_with_body(b"Hello world!");
2002 block.set_receivers(vec![TEST_ENDPOINT_B.clone()]);
2003 a.com_hub.send_own_block_async(block).await.unwrap();
2004
2005 let next_block = b.incoming_sections_receiver.next().await.unwrap();
2007 if let IncomingSection::SingleBlock((Some(block), _)) = next_block {
2008 assert_eq!(block.body, b"Hello world!");
2009 } else {
2010 panic!("Expected single block section");
2011 }
2012 })
2013 .await;
2014 }
2015
2016 #[tokio::test]
2017 pub async fn test_send_block_to_invalid_receiver() {
2018 run_with_com_hub_and_proxy_interface(async move |com_hub, _, _, _| {
2019 let mut block = DXBBlock::new_with_body(b"Hello world!");
2020 block.set_receivers(vec![TEST_ENDPOINT_C.clone()]);
2022 let res = com_hub.send_own_block_async(block).await;
2023 assert!(res.is_err());
2024 })
2025 .await;
2026 }
2027
2028 #[tokio::test]
2029 pub async fn send_block_via_multiple_interfaces() {
2030 run_with_com_hub(|com_hub, _| async move {
2031 let (_sender_b, mut outgoing_block_receiver_b) =
2032 add_proxy_interface_to_com_hub(
2033 com_hub.clone(),
2034 TEST_ENDPOINT_B.clone(),
2035 )
2036 .await;
2037 let (_sender_c, mut outgoing_block_receiver_c) =
2038 add_proxy_interface_to_com_hub(
2039 com_hub.clone(),
2040 TEST_ENDPOINT_C.clone(),
2041 )
2042 .await;
2043 com_hub.print_metadata();
2044
2045 let mut block = DXBBlock::new_with_body(b"Hello world!");
2046 block.set_receivers(vec![
2047 TEST_ENDPOINT_B.clone(),
2048 TEST_ENDPOINT_C.clone(),
2049 ]);
2050 com_hub.send_own_block_async(block).await.unwrap();
2051
2052 outgoing_block_receiver_b.next().await.unwrap();
2054 outgoing_block_receiver_c.next().await.unwrap();
2055
2056 let outgoing_block_b =
2058 outgoing_block_receiver_b.next().await.unwrap();
2059 let outgoing_block_c =
2060 outgoing_block_receiver_c.next().await.unwrap();
2061
2062 info!("block sender b: {}", outgoing_block_b.sender());
2063 info!("block sender c: {}", outgoing_block_c.sender());
2064 assert_eq!(outgoing_block_b.body, b"Hello world!");
2065 assert_eq!(outgoing_block_c.body, b"Hello world!");
2066 })
2067 .await
2068 }
2069
2070 #[tokio::test]
2071 pub async fn test_receive() {
2072 flexi_logger::init();
2073 run_with_com_hub_and_proxy_interface(
2074 async move |com_hub,
2075 mut incoming_data_sender,
2076 _,
2077 mut incoming_sections_receiver| {
2078 let mut block = DXBBlock::new_with_body(b"Hello world!");
2080 block.set_receivers(vec![TEST_ENDPOINT_A.clone()]);
2081
2082 let block = com_hub
2083 .prepare_own_block(block)
2084 .into_result()
2085 .await
2086 .unwrap();
2087
2088 let block_bytes = block.to_bytes();
2089 incoming_data_sender
2090 .start_send(block_bytes.as_slice().to_vec())
2091 .unwrap();
2092
2093 let incoming_section =
2094 incoming_sections_receiver.next().await.unwrap();
2095 if let IncomingSection::SingleBlock((Some(block), _)) =
2096 incoming_section
2097 {
2098 assert_eq!(block.raw_bytes.clone().unwrap(), block_bytes);
2099 } else {
2100 panic!("Expected single block section");
2101 }
2102 },
2103 )
2104 .await;
2105 }
2106
2107 #[tokio::test]
2108 pub async fn test_receive_multiple_blocks_single_section() {
2109 run_with_com_hub_and_proxy_interface(
2110 async move |com_hub,
2111 mut incoming_data_sender,
2112 _,
2113 mut incoming_sections_receiver| {
2114 let mut blocks = vec![
2115 DXBBlock {
2116 routing_header: RoutingHeader::default(),
2117 block_header: BlockHeader {
2118 section_index: 0,
2119 block_number: 0,
2120 flags_and_timestamp: FlagsAndTimestamp::new()
2121 .with_is_end_of_section(false)
2122 .with_is_end_of_context(false),
2123 ..Default::default()
2124 },
2125 ..Default::default()
2126 },
2127 DXBBlock {
2128 routing_header: RoutingHeader::default(),
2129 block_header: BlockHeader {
2130 section_index: 0,
2131 block_number: 1,
2132 flags_and_timestamp: FlagsAndTimestamp::new()
2133 .with_is_end_of_section(false)
2134 .with_is_end_of_context(false),
2135 ..Default::default()
2136 },
2137 ..Default::default()
2138 },
2139 DXBBlock {
2140 routing_header: RoutingHeader::default(),
2141 block_header: BlockHeader {
2142 section_index: 0,
2143 block_number: 2,
2144 flags_and_timestamp: FlagsAndTimestamp::new()
2145 .with_is_end_of_section(true)
2146 .with_is_end_of_context(true),
2147 ..Default::default()
2148 },
2149 ..Default::default()
2150 },
2151 ];
2152 let blocks_count = blocks.len();
2153
2154 send_blocks_to_endpoint(
2155 com_hub.clone(),
2156 &mut incoming_data_sender,
2157 TEST_ENDPOINT_A.clone(),
2158 &mut blocks,
2159 )
2160 .await;
2161
2162 let incoming_blocks =
2163 get_collected_received_blocks_from_receiver(
2164 &mut incoming_sections_receiver,
2165 CollectedBlockType::BlockStream,
2166 blocks_count,
2167 )
2168 .await;
2169
2170 for (incoming_block, block) in
2171 incoming_blocks.iter().zip(blocks.iter())
2172 {
2173 assert_eq!(
2174 incoming_block.raw_bytes.clone().unwrap(),
2175 block.to_bytes()
2176 );
2177 }
2178 },
2179 )
2180 .await;
2181 }
2182
2183 #[tokio::test]
2184 pub async fn test_receive_multiple_separate_blocks() {
2185 run_with_com_hub_and_proxy_interface(
2186 async move |com_hub,
2187 mut incoming_data_sender,
2188 _,
2189 mut incoming_sections_receiver| {
2190 let mut blocks = vec![
2191 DXBBlock {
2192 routing_header: RoutingHeader::default(),
2193 block_header: BlockHeader {
2194 section_index: 1,
2195 block_number: 0,
2196 ..Default::default()
2197 },
2198 ..Default::default()
2199 },
2200 DXBBlock {
2201 routing_header: RoutingHeader::default(),
2202 block_header: BlockHeader {
2203 section_index: 2,
2204 block_number: 0,
2205 ..Default::default()
2206 },
2207 ..Default::default()
2208 },
2209 DXBBlock {
2210 routing_header: RoutingHeader::default(),
2211 block_header: BlockHeader {
2212 section_index: 3,
2213 block_number: 0,
2214 ..Default::default()
2215 },
2216 ..Default::default()
2217 },
2218 ];
2219 let blocks_count = blocks.len();
2220
2221 send_blocks_to_endpoint(
2222 com_hub.clone(),
2223 &mut incoming_data_sender,
2224 TEST_ENDPOINT_A.clone(),
2225 &mut blocks,
2226 )
2227 .await;
2228
2229 let incoming_blocks =
2230 get_collected_received_blocks_from_receiver(
2231 &mut incoming_sections_receiver,
2232 CollectedBlockType::SingleBocks,
2233 blocks_count,
2234 )
2235 .await;
2236
2237 for (incoming_block, block) in
2238 incoming_blocks.iter().zip(blocks.iter())
2239 {
2240 assert_eq!(
2241 incoming_block.raw_bytes.clone().unwrap(),
2242 block.to_bytes()
2243 );
2244 }
2245 },
2246 )
2247 .await;
2248 }
2249
2250 }