1use crate::global::protocol_structures::block_header::BlockType;
2use crate::global::protocol_structures::routing_header::SignatureType;
3use crate::stdlib::{cell::RefCell, rc::Rc};
4use crate::task::{self, sleep, spawn_with_panic_notify};
5use crate::utils::time::Time;
6
7use futures::channel::oneshot::Sender;
8use futures_util::StreamExt;
9use itertools::Itertools;
10use log::{debug, error, info, warn};
11use std::cmp::PartialEq;
12use std::collections::{HashMap, HashSet};
13use std::fmt::{Debug, Display, Formatter};
14use std::sync::{Arc, Mutex};
15use std::time::Duration;
16#[cfg(feature = "tokio_runtime")]
17use tokio::task::yield_now;
18use super::com_interfaces::com_interface::{
21 self, ComInterfaceError, ComInterfaceState
22};
23use super::com_interfaces::{
24 com_interface::ComInterface, com_interface_socket::ComInterfaceSocket,
25};
26use crate::values::core_values::endpoint::{Endpoint, EndpointInstance};
27use crate::global::dxb_block::{DXBBlock, IncomingSection};
28use crate::network::block_handler::{BlockHandler, BlockHistoryData};
29use crate::network::com_hub_network_tracing::{NetworkTraceHop, NetworkTraceHopDirection, NetworkTraceHopSocket};
30use crate::network::com_interfaces::com_interface::ComInterfaceUUID;
31use crate::network::com_interfaces::com_interface_properties::{
32 InterfaceDirection, ReconnectionConfig,
33};
34use crate::network::com_interfaces::com_interface_socket::ComInterfaceSocketUUID;
35use crate::network::com_interfaces::default_com_interfaces::local_loopback_interface::LocalLoopbackInterface;
36use crate::values::value_container::ValueContainer;
37
38#[derive(Debug, Clone)]
39pub struct DynamicEndpointProperties {
40 pub known_since: u64,
41 pub distance: i8,
42 pub is_direct: bool,
43 pub channel_factor: u32,
44 pub direction: InterfaceDirection,
45}
46
47pub type ComInterfaceFactoryFn =
48 fn(
49 setup_data: ValueContainer,
50 ) -> Result<Rc<RefCell<dyn ComInterface>>, ComInterfaceError>;
51
52#[derive(Debug)]
53pub struct ComHubOptions {
54 default_receive_timeout: Duration,
55}
56
57impl Default for ComHubOptions {
58 fn default() -> Self {
59 ComHubOptions {
60 default_receive_timeout: Duration::from_secs(5),
61 }
62 }
63}
64
65type SocketMap = HashMap<
66 ComInterfaceSocketUUID,
67 (Arc<Mutex<ComInterfaceSocket>>, HashSet<Endpoint>),
68>;
69type InterfaceMap = HashMap<
70 ComInterfaceUUID,
71 (Rc<RefCell<dyn ComInterface>>, InterfacePriority),
72>;
73
74pub struct ComHub {
75 pub endpoint: Endpoint,
77
78 pub options: ComHubOptions,
80
81 pub interface_factories: RefCell<HashMap<String, ComInterfaceFactoryFn>>,
83
84 pub interfaces: RefCell<InterfaceMap>,
86
87 pub sockets: RefCell<SocketMap>,
90
91 pub endpoint_sockets_blacklist:
93 RefCell<HashMap<Endpoint, HashSet<ComInterfaceSocketUUID>>>,
94
95 pub fallback_sockets:
98 RefCell<Vec<(ComInterfaceSocketUUID, u16, InterfaceDirection)>>,
99
100 pub endpoint_sockets: RefCell<
103 HashMap<
104 Endpoint,
105 Vec<(ComInterfaceSocketUUID, DynamicEndpointProperties)>,
106 >,
107 >,
108
109 update_loop_running: RefCell<bool>,
112 update_loop_stop_sender: RefCell<Option<Sender<()>>>,
113
114 pub block_handler: BlockHandler,
115}
116
117impl Debug for ComHub {
118 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
119 f.debug_struct("ComHub")
120 .field("endpoint", &self.endpoint)
121 .field("options", &self.options)
122 .field("sockets", &self.sockets)
123 .field(
124 "endpoint_sockets_blacklist",
125 &self.endpoint_sockets_blacklist,
126 )
127 .field("fallback_sockets", &self.fallback_sockets)
128 .field("endpoint_sockets", &self.endpoint_sockets)
129 .finish()
130 }
131}
132
133#[derive(Debug, Clone, Default)]
134struct EndpointIterateOptions<'a> {
135 pub only_direct: bool,
136 pub exact_instance: bool,
137 pub exclude_sockets: &'a [ComInterfaceSocketUUID],
138}
139
140impl Default for ComHub {
141 fn default() -> Self {
142 ComHub {
143 endpoint: Endpoint::default(),
144 options: ComHubOptions::default(),
145 interface_factories: RefCell::new(HashMap::new()),
146 interfaces: RefCell::new(HashMap::new()),
147 endpoint_sockets: RefCell::new(HashMap::new()),
148 block_handler: BlockHandler::new(),
149 sockets: RefCell::new(HashMap::new()),
150 fallback_sockets: RefCell::new(Vec::new()),
151 endpoint_sockets_blacklist: RefCell::new(HashMap::new()),
152 update_loop_running: RefCell::new(false),
153 update_loop_stop_sender: RefCell::new(None),
154 }
155 }
156}
157
158#[derive(Debug, Clone, Copy, Ord, PartialOrd, Eq, PartialEq)]
159pub enum InterfacePriority {
160 None,
164 Priority(u16),
168}
169
170impl Default for InterfacePriority {
171 fn default() -> Self {
172 InterfacePriority::Priority(0)
173 }
174}
175
176#[derive(Debug, Clone, PartialEq)]
177pub enum ComHubError {
178 InterfaceError(ComInterfaceError),
179 InterfaceCloseFailed,
180 InterfaceNotConnected,
181 InterfaceDoesNotExist,
182 InterfaceAlreadyExists,
183 InterfaceTypeDoesNotExist,
184 InvalidInterfaceDirectionForFallbackInterface,
185 NoResponse,
186 InterfaceOpenError,
187}
188
189#[derive(Debug)]
190pub enum SocketEndpointRegistrationError {
191 SocketDisconnected,
192 SocketUninitialized,
193 SocketEndpointAlreadyRegistered,
194}
195
196impl ComHub {
197 pub fn new(endpoint: impl Into<Endpoint>) -> ComHub {
198 ComHub {
199 endpoint: endpoint.into(),
200 ..ComHub::default()
201 }
202 }
203
204 pub async fn init(&self) -> Result<(), ComHubError> {
205 let local_interface = LocalLoopbackInterface::new();
207 self.open_and_add_interface(
208 Rc::new(RefCell::new(local_interface)),
209 InterfacePriority::None,
210 )
211 .await
212 }
213
214 pub fn register_interface_factory(
217 &self,
218 interface_type: String,
219 factory: ComInterfaceFactoryFn,
220 ) {
221 self.interface_factories
222 .borrow_mut()
223 .insert(interface_type, factory);
224 }
225
226 pub async fn create_interface(
230 &self,
231 interface_type: &str,
232 setup_data: ValueContainer,
233 priority: InterfacePriority,
234 ) -> Result<Rc<RefCell<dyn ComInterface>>, ComHubError> {
235 info!(
236 "creating interface {interface_type} with setup data: {setup_data:?}"
237 );
238 let interface_factories = self.interface_factories.borrow();
239 if let Some(factory) = interface_factories.get(interface_type) {
240 let interface =
241 factory(setup_data).map_err(ComHubError::InterfaceError)?;
242 drop(interface_factories);
243
244 self.open_and_add_interface(interface.clone(), priority)
245 .await
246 .map(|_| interface)
247 } else {
248 Err(ComHubError::InterfaceTypeDoesNotExist)
249 }
250 }
251
252 fn try_downcast<T: 'static>(
253 input: Rc<RefCell<dyn ComInterface>>,
254 ) -> Option<Rc<RefCell<T>>> {
255 if input.borrow().as_any().is::<T>() {
257 let ptr = Rc::into_raw(input) as *const RefCell<T>;
259 unsafe { Some(Rc::from_raw(ptr)) }
260 } else {
261 None
262 }
263 }
264
265 pub fn get_interface_by_uuid<T: ComInterface>(
266 &self,
267 interface_uuid: &ComInterfaceUUID,
268 ) -> Option<Rc<RefCell<T>>> {
269 ComHub::try_downcast(
270 self.interfaces.borrow().get(interface_uuid)?.0.clone(),
271 )
272 }
273
274 pub fn has_interface(&self, interface_uuid: &ComInterfaceUUID) -> bool {
275 self.interfaces.borrow().contains_key(interface_uuid)
276 }
277
278 pub fn get_dyn_interface_by_uuid(
279 &self,
280 uuid: &ComInterfaceUUID,
281 ) -> Option<Rc<RefCell<dyn ComInterface>>> {
282 self.interfaces
283 .borrow()
284 .get(uuid)
285 .map(|(interface, _)| interface.clone())
286 }
287
288 pub async fn open_and_add_interface(
289 &self,
290 interface: Rc<RefCell<dyn ComInterface>>,
291 priority: InterfacePriority,
292 ) -> Result<(), ComHubError> {
293 if interface.borrow().get_state() != ComInterfaceState::Connected {
294 if !(interface.borrow_mut().handle_open().await) {
298 return Err(ComHubError::InterfaceOpenError);
299 }
300 }
301 self.add_interface(interface.clone(), priority)
302 }
303
304 pub fn add_interface(
305 &self,
306 interface: Rc<RefCell<dyn ComInterface>>,
307 priority: InterfacePriority,
308 ) -> Result<(), ComHubError> {
309 let uuid = interface.borrow().get_uuid().clone();
310 let mut interfaces = self.interfaces.borrow_mut();
311 if interfaces.contains_key(&uuid) {
312 return Err(ComHubError::InterfaceAlreadyExists);
313 }
314
315 if priority != InterfacePriority::None
317 && interface.borrow_mut().get_properties().direction
318 == InterfaceDirection::In
319 {
320 return Err(
321 ComHubError::InvalidInterfaceDirectionForFallbackInterface,
322 );
323 }
324
325 interfaces.insert(uuid, (interface, priority));
326 Ok(())
327 }
328
329 pub async fn remove_interface(
332 &self,
333 interface_uuid: ComInterfaceUUID,
334 ) -> Result<(), ComHubError> {
335 info!("Removing interface {interface_uuid}");
336 let interface = self
337 .interfaces
338 .borrow_mut()
339 .get_mut(&interface_uuid.clone())
340 .ok_or(ComHubError::InterfaceDoesNotExist)?
341 .0
342 .clone();
343 {
344 let mut interface = interface.borrow_mut();
347 interface.handle_destroy().await;
348 }
349
350 self.update_sockets();
352
353 self.cleanup_interface(interface_uuid)
354 .ok_or(ComHubError::InterfaceDoesNotExist)?;
355
356 Ok(())
357 }
358
359 fn cleanup_interface(
362 &self,
363 interface_uuid: ComInterfaceUUID,
364 ) -> Option<Rc<RefCell<dyn ComInterface>>> {
365 let interface = self
366 .interfaces
367 .borrow_mut()
368 .remove(&interface_uuid)
369 .or(None)?
370 .0;
371 Some(interface)
372 }
373
374 pub(crate) fn receive_block(
375 &self,
376 block: &DXBBlock,
377 socket_uuid: ComInterfaceSocketUUID,
378 ) {
379 info!("{} received block: {}", self.endpoint, block);
380
381 if !self.validate_block(block) {
383 warn!("Block validation failed. Dropping block...");
384 return;
385 }
386
387 let block_type = block.block_header.flags_and_timestamp.block_type();
388
389 let is_new_block = !self.block_handler.is_block_in_history(block);
391 if is_new_block && block.routing_header.sender != self.endpoint {
394 self.register_socket_endpoint_from_incoming_block(
395 socket_uuid.clone(),
396 block,
397 );
398 }
399
400 let receivers = block.receiver_endpoints();
401 if !receivers.is_empty() {
402 let is_for_own = receivers.iter().any(|e| {
403 e == &self.endpoint
404 || e == &Endpoint::ANY
405 || e == &Endpoint::ANY_ALL_INSTANCES
406 });
407
408 if is_for_own && block_type != BlockType::Hello {
410 info!("Block is for this endpoint");
411
412 match block_type {
413 BlockType::Trace => {
414 self.handle_trace_block(block, socket_uuid.clone());
415 }
416 BlockType::TraceBack => {
417 self.handle_trace_back_block(
418 block,
419 socket_uuid.clone(),
420 );
421 }
422 _ => {
423 self.block_handler.handle_incoming_block(block.clone());
424 }
425 };
426 }
427
428 let should_relay =
430 !(
432 is_for_own && block_type == BlockType::Hello
433 );
434
435 if should_relay {
437 let remaining_receivers = if is_for_own {
439 &self.get_remote_receivers(&receivers)
440 } else {
441 &receivers
442 };
443
444 if !remaining_receivers.is_empty() {
446 match block_type {
447 BlockType::Trace | BlockType::TraceBack => {
448 self.redirect_trace_block(
449 block.clone_with_new_receivers(
450 remaining_receivers,
451 ),
452 socket_uuid.clone(),
453 is_for_own,
454 );
455 }
456 _ => {
457 self.redirect_block(
458 block.clone_with_new_receivers(
459 remaining_receivers,
460 ),
461 socket_uuid.clone(),
462 is_for_own,
463 );
464 }
465 }
466 }
467 }
468 }
469
470 if is_new_block {
472 self.block_handler
473 .add_block_to_history(block, Some(socket_uuid));
474 }
475 }
476
477 fn get_remote_receivers(
480 &self,
481 receiver_endpoints: &[Endpoint],
482 ) -> Vec<Endpoint> {
483 receiver_endpoints
484 .iter()
485 .filter(|e| e != &&self.endpoint)
486 .cloned()
487 .collect::<Vec<_>>()
488 }
489
490 fn register_socket_endpoint_from_incoming_block(
493 &self,
494 socket_uuid: ComInterfaceSocketUUID,
495 block: &DXBBlock,
496 ) {
497 let socket = self.get_socket_by_uuid(&socket_uuid);
498 let mut socket_ref = socket.lock().unwrap();
499
500 let distance = block.routing_header.distance;
501 let sender = block.routing_header.sender.clone();
502
503 if socket_ref.direct_endpoint.is_none() && distance == 1 {
505 info!(
506 "Setting direct endpoint for socket {}: {}",
507 socket_ref.uuid, sender
508 );
509 socket_ref.direct_endpoint = Some(sender.clone());
510 }
511
512 drop(socket_ref);
513 match self.register_socket_endpoint(
514 socket.clone(),
515 sender.clone(),
516 distance,
517 ) {
518 Err(SocketEndpointRegistrationError::SocketEndpointAlreadyRegistered) => {
519 debug!(
520 "Socket already registered for endpoint {sender}",
521 );
522 }
523 Err(error) => {
524 panic!("Failed to register socket endpoint {sender}: {error:?}");
525 },
526 Ok(_) => { }
527 }
528 }
529
530 pub(crate) fn redirect_block(
533 &self,
534 mut block: DXBBlock,
535 incoming_socket: ComInterfaceSocketUUID,
536 forked: bool,
538 ) {
539 let receivers = block.receiver_endpoints();
540
541 let history_block_data =
544 self.block_handler.get_block_data_from_history(&block);
545 if history_block_data.is_some() {
546 for receiver in &receivers {
547 if receiver != &self.endpoint {
548 info!(
549 "{}: Adding socket {} to blacklist for receiver {}",
550 self.endpoint, incoming_socket, receiver
551 );
552 self.endpoint_sockets_blacklist
553 .borrow_mut()
554 .entry(receiver.clone())
555 .or_default()
556 .insert(incoming_socket.clone());
557 }
558 }
559 }
560
561 block.routing_header.distance += 1;
563
564 block.routing_header.ttl -= 1;
567 if block.routing_header.ttl == 0 {
569 warn!("Block TTL expired. Dropping block...");
570 return;
571 }
572
573 let mut prefer_incoming_socket_for_bounce_back = false;
574 let res = {
577 if block.routing_header.sender == self.endpoint {
578 prefer_incoming_socket_for_bounce_back =
580 !block.is_bounce_back();
581 Err(receivers.to_vec())
582 } else {
583 let mut excluded_sockets = vec![incoming_socket.clone()];
584 if let Some(BlockHistoryData {
585 original_socket_uuid: Some(original_socket_uuid),
586 }) = &history_block_data
587 {
588 excluded_sockets.push(original_socket_uuid.clone())
589 }
590 self.send_block(block.clone(), excluded_sockets, forked)
591 }
592 };
593
594 if let Err(unreachable_endpoints) = res {
596 let send_back_socket = if !prefer_incoming_socket_for_bounce_back
600 && let Some(history_block_data) = history_block_data
601 {
602 history_block_data.original_socket_uuid
603 } else {
604 Some(incoming_socket.clone())
605 };
606
607 if let Some(send_back_socket) = send_back_socket {
610 if block.is_bounce_back() && send_back_socket == incoming_socket
612 {
613 warn!(
614 "{}: Tried to send bounce back block back to incoming socket, but this is not allowed",
615 self.endpoint
616 );
617 } else if self
618 .get_socket_by_uuid(&send_back_socket)
619 .lock()
620 .unwrap()
621 .can_send()
622 {
623 block.set_bounce_back(true);
624 self.send_block_to_endpoints_via_socket(
625 block,
626 &send_back_socket,
627 &unreachable_endpoints,
628 if forked { Some(0) } else { None },
629 )
630 } else {
631 error!(
632 "Tried to send bounce back block, but cannot send back to incoming socket"
633 )
634 }
635 }
636 else {
640 self.send_block(block, vec![], forked).unwrap_or_else(|_| {
641 error!(
642 "Failed to send out block to {}",
643 unreachable_endpoints
644 .iter()
645 .map(|e| e.to_string())
646 .join(",")
647 );
648 });
649 }
650 }
651 }
652
653 fn validate_block(&self, block: &DXBBlock) -> bool {
656 let is_signed =
659 block.routing_header.flags.signature_type() != SignatureType::None;
660
661 match is_signed {
662 true => {
663 true
667 }
668 false => {
669 let endpoint = block.routing_header.sender.clone();
670 let is_trusted = {
671 cfg_if::cfg_if! {
672 if #[cfg(feature = "debug")] {
673 use crate::runtime::global_context::get_global_context;
674 get_global_context().debug_flags.allow_unsigned_blocks
675 }
676 else {
677 false
679 }
680 }
681 };
682 match is_trusted {
683 true => true,
684 false => {
685 warn!(
686 "Block received by {endpoint} is not signed. Dropping block..."
687 );
688 false
689 }
690 }
691 }
692 }
693 }
694
695 pub fn register_socket_endpoint(
700 &self,
701 socket: Arc<Mutex<ComInterfaceSocket>>,
702 endpoint: Endpoint,
703 distance: i8,
704 ) -> Result<(), SocketEndpointRegistrationError> {
705 log::info!(
706 "{} registering endpoint {} for socket {}",
707 self.endpoint,
708 endpoint,
709 socket.lock().unwrap().uuid
710 );
711 let socket_ref = socket.lock().unwrap();
712
713 let is_direct = socket_ref.direct_endpoint == Some(endpoint.clone());
716
717 if !socket_ref.state.is_open() {
719 return Err(SocketEndpointRegistrationError::SocketDisconnected);
720 }
721
722 if let Some(entries) = self.endpoint_sockets.borrow().get(&endpoint)
724 && entries
725 .iter()
726 .any(|(socket_uuid, _)| socket_uuid == &socket_ref.uuid)
727 {
728 return Err(SocketEndpointRegistrationError::SocketEndpointAlreadyRegistered);
729 }
730
731 let socket_uuid = socket_ref.uuid.clone();
732 let channel_factor = socket_ref.channel_factor;
733 let direction = socket_ref.direction.clone();
734 drop(socket_ref);
735
736 self.add_socket_endpoint(&socket_uuid, endpoint.clone());
738
739 self.add_endpoint_socket(
741 &endpoint,
742 socket_uuid,
743 distance,
744 is_direct,
745 channel_factor,
746 direction,
747 );
748
749 self.sort_sockets(&endpoint);
751
752 Ok(())
753 }
754
755 pub async fn wait_for_update_async(&self) {
759 loop {
760 let mut is_done = true;
761 for interface in self.interfaces.borrow().values() {
762 let interface = interface.0.clone();
763 let interface = interface.borrow_mut();
764 let outgoing_blocks_count =
765 interface.get_info().outgoing_blocks_count.get();
766 if outgoing_blocks_count > 0 {
768 is_done = false;
769 break;
770 }
771 if interface.get_state() == ComInterfaceState::Connecting {
773 is_done = false;
774 break;
775 }
776 }
777 if is_done {
778 break;
779 }
780 sleep(Duration::from_millis(10)).await;
781 }
782 }
783
784 pub async fn update_async(&self) {
789 self.update();
790 self.wait_for_update_async().await;
791 }
792
793 fn add_endpoint_socket(
796 &self,
797 endpoint: &Endpoint,
798 socket_uuid: ComInterfaceSocketUUID,
799 distance: i8,
800 is_direct: bool,
801 channel_factor: u32,
802 direction: InterfaceDirection,
803 ) {
804 let mut endpoint_sockets = self.endpoint_sockets.borrow_mut();
805 if !endpoint_sockets.contains_key(endpoint) {
806 endpoint_sockets.insert(endpoint.clone(), Vec::new());
807 }
808
809 let endpoint_sockets = endpoint_sockets.get_mut(endpoint).unwrap();
810 endpoint_sockets.push((
811 socket_uuid,
812 DynamicEndpointProperties {
813 known_since: Time::now(),
814 distance,
815 is_direct,
816 channel_factor,
817 direction,
818 },
819 ));
820 }
821
822 fn add_socket(
827 &self,
828 socket: Arc<Mutex<ComInterfaceSocket>>,
829 priority: InterfacePriority,
830 ) {
831 let socket_ref = socket.lock().unwrap();
832 let socket_uuid = socket_ref.uuid.clone();
833 if self.sockets.borrow().contains_key(&socket_ref.uuid) {
834 panic!("Socket {} already exists in ComHub", socket_ref.uuid);
835 }
836
837 if !socket_ref.can_send() && priority != InterfacePriority::None {
843 panic!(
844 "Socket {} cannot be used for fallback routing, since it has no send capability",
845 socket_ref.uuid
846 );
847 }
848 let direction = socket_ref.direction.clone();
849
850 self.sockets
851 .borrow_mut()
852 .insert(socket_ref.uuid.clone(), (socket.clone(), HashSet::new()));
853
854 if socket_ref.can_send() {
856 match priority {
857 InterfacePriority::None => {
858 }
860 InterfacePriority::Priority(priority) => {
861 self.add_fallback_socket(&socket_uuid, priority, direction);
863 }
864 }
865
866 let mut block: DXBBlock = DXBBlock::default();
868 block
869 .block_header
870 .flags_and_timestamp
871 .set_block_type(BlockType::Hello);
872 block
873 .routing_header
874 .flags
875 .set_signature_type(SignatureType::Unencrypted);
876 let block = self.prepare_own_block(block);
879
880 drop(socket_ref);
881 self.send_block_to_endpoints_via_socket(
882 block,
883 &socket_uuid,
884 &[Endpoint::ANY],
885 None,
886 );
887 }
888 }
889
890 fn add_fallback_socket(
894 &self,
895 socket_uuid: &ComInterfaceSocketUUID,
896 priority: u16,
897 direction: InterfaceDirection,
898 ) {
899 let mut fallback_sockets = self.fallback_sockets.borrow_mut();
901 fallback_sockets.push((socket_uuid.clone(), priority, direction));
902 fallback_sockets.sort_by_key(|(_, priority, direction)| {
905 let dir_rank = match direction {
906 InterfaceDirection::InOut => 0,
907 InterfaceDirection::Out => 1,
908 InterfaceDirection::In => {
909 panic!("Socket {socket_uuid} is not allowed to be used as fallback socket")
910 }
911 };
912 (dir_rank, std::cmp::Reverse(*priority))
913 });
914 }
915
916 fn delete_socket(&self, socket_uuid: &ComInterfaceSocketUUID) {
918 self.sockets
919 .borrow_mut()
920 .remove(socket_uuid)
921 .or_else(|| panic!("Socket {socket_uuid} not found in ComHub"));
922
923 self.endpoint_sockets.borrow_mut().retain(|_, sockets| {
926 sockets.retain(|(uuid, _)| uuid != socket_uuid);
927 !sockets.is_empty()
928 });
929
930 self.fallback_sockets
932 .borrow_mut()
933 .retain(|(uuid, _, _)| uuid != socket_uuid);
934 }
935
936 fn add_socket_endpoint(
938 &self,
939 socket_uuid: &ComInterfaceSocketUUID,
940 endpoint: Endpoint,
941 ) {
942 assert!(
943 self.sockets.borrow().contains_key(socket_uuid),
944 "Socket not found in ComHub"
945 );
946 self.sockets
948 .borrow_mut()
949 .get_mut(socket_uuid)
950 .unwrap()
951 .1
952 .insert(endpoint.clone());
953 }
954
955 fn sort_sockets(&self, endpoint: &Endpoint) {
965 let mut endpoint_sockets = self.endpoint_sockets.borrow_mut();
966 let sockets = endpoint_sockets.get_mut(endpoint).unwrap();
967
968 sockets.sort_by(|(_, a), (_, b)| {
969 b.is_direct
971 .cmp(&a.is_direct)
972 .then_with(|| b.channel_factor.cmp(&a.channel_factor))
973 .then_with(|| b.distance.cmp(&a.distance))
974 .then_with(
975 || {
976 cfg_if::cfg_if! {
977 if #[cfg(feature = "debug")] {
978 use crate::runtime::global_context::get_global_context;
979 use std::cmp::Ordering;
980 if get_global_context().debug_flags.enable_deterministic_behavior {
981 Ordering::Equal
982 }
983 else {
984 b.known_since.cmp(&a.known_since)
985 }
986 }
987 else {
988 b.known_since.cmp(&a.known_since)
989 }
990 }
991 }
992 )
993
994 });
995 }
996
997 pub(crate) fn get_socket_by_uuid(
1001 &self,
1002 socket_uuid: &ComInterfaceSocketUUID,
1003 ) -> Arc<Mutex<ComInterfaceSocket>> {
1004 self.sockets
1005 .borrow()
1006 .get(socket_uuid)
1007 .map(|socket| socket.0.clone())
1008 .unwrap_or_else(|| {
1009 panic!("Socket for uuid {socket_uuid} not found")
1010 })
1011 }
1012
1013 pub(crate) fn get_com_interface_by_uuid(
1017 &self,
1018 interface_uuid: &ComInterfaceUUID,
1019 ) -> Rc<RefCell<dyn ComInterface>> {
1020 self.interfaces
1021 .borrow()
1022 .get(interface_uuid)
1023 .unwrap_or_else(|| {
1024 panic!("Interface for uuid {interface_uuid} not found")
1025 })
1026 .0
1027 .clone()
1028 }
1029
1030 pub(crate) fn get_com_interface_from_socket_uuid(
1034 &self,
1035 socket_uuid: &ComInterfaceSocketUUID,
1036 ) -> Rc<RefCell<dyn ComInterface>> {
1037 let socket = self.get_socket_by_uuid(socket_uuid);
1038 let socket = socket.lock().unwrap();
1039 self.get_com_interface_by_uuid(&socket.interface_uuid)
1040 }
1041
1042 fn iterate_endpoint_sockets<'a>(
1046 &'a self,
1047 endpoint: &'a Endpoint,
1048 options: EndpointIterateOptions<'a>,
1049 ) -> impl Iterator<Item = ComInterfaceSocketUUID> + 'a {
1050 std::iter::from_coroutine(
1051 #[coroutine]
1052 move || {
1053 let endpoint_sockets_borrow = self.endpoint_sockets.borrow();
1054 let endpoint_sockets =
1056 endpoint_sockets_borrow.get(endpoint).cloned();
1057 if endpoint_sockets.is_none() {
1058 return;
1059 }
1060 for (socket_uuid, _) in endpoint_sockets.unwrap() {
1061 {
1062 let socket = self.get_socket_by_uuid(&socket_uuid);
1063 let socket = socket.lock().unwrap();
1064
1065 if options.only_direct
1067 && socket.direct_endpoint.is_some()
1068 && socket.direct_endpoint.as_ref().unwrap()
1069 == endpoint
1070 {
1071 debug!(
1072 "No direct socket found for endpoint {endpoint}. Skipping..."
1073 );
1074 continue;
1075 }
1076
1077 if options.exclude_sockets.contains(&socket.uuid) {
1079 debug!(
1080 "Socket {} is excluded for endpoint {}. Skipping...",
1081 socket.uuid, endpoint
1082 );
1083 continue;
1084 }
1085
1086 if !socket.can_send() {
1091 info!(
1092 "Socket {} is not outgoing for endpoint {}. Skipping...",
1093 socket.uuid, endpoint
1094 );
1095 return;
1096 }
1097 }
1098
1099 debug!(
1100 "Found matching socket {socket_uuid} for endpoint {endpoint}"
1101 );
1102 yield socket_uuid.clone()
1103 }
1104 },
1105 )
1106 }
1107
1108 fn find_known_endpoint_socket(
1110 &self,
1111 endpoint: &Endpoint,
1112 exclude_socket: &[ComInterfaceSocketUUID],
1113 ) -> Option<ComInterfaceSocketUUID> {
1114 match endpoint.instance {
1115 EndpointInstance::Any => {
1117 let options = EndpointIterateOptions {
1118 only_direct: false,
1119 exact_instance: false,
1120 exclude_sockets: exclude_socket,
1121 };
1122 if let Some(socket) =
1123 self.iterate_endpoint_sockets(endpoint, options).next()
1124 {
1125 return Some(socket);
1126 }
1127 None
1128 }
1129
1130 EndpointInstance::Instance(_) => {
1132 let options = EndpointIterateOptions {
1134 only_direct: false,
1135 exact_instance: true,
1136 exclude_sockets: exclude_socket,
1137 };
1138 if let Some(socket) =
1139 self.iterate_endpoint_sockets(endpoint, options).next()
1140 {
1141 return Some(socket);
1142 }
1143 None
1144 }
1145
1146 EndpointInstance::All => {
1148 todo!("#186 Undescribed by author.")
1149 }
1150 }
1151 }
1152
1153 fn find_best_endpoint_socket(
1157 &self,
1158 endpoint: &Endpoint,
1159 exclude_sockets: &[ComInterfaceSocketUUID],
1160 ) -> Option<ComInterfaceSocketUUID> {
1161 if endpoint == &self.endpoint
1164 && let Some(socket) = self
1165 .find_known_endpoint_socket(&Endpoint::LOCAL, exclude_sockets)
1166 {
1167 return Some(socket);
1168 }
1169
1170 let matching_socket =
1172 self.find_known_endpoint_socket(endpoint, exclude_sockets);
1173
1174 if matching_socket.is_some() {
1176 matching_socket
1177 }
1178 else {
1180 let sockets = self.fallback_sockets.borrow();
1181 for (socket_uuid, _, _) in sockets.iter() {
1182 let socket = self.get_socket_by_uuid(socket_uuid);
1183 info!(
1184 "{}: Find best for {}: {} ({}); excluded:{}",
1185 self.endpoint,
1186 endpoint,
1187 socket_uuid,
1188 socket
1189 .lock()
1190 .unwrap()
1191 .direct_endpoint
1192 .clone()
1193 .map(|e| e.to_string())
1194 .unwrap_or("None".to_string()),
1195 exclude_sockets.contains(socket_uuid)
1196 );
1197 if !exclude_sockets.contains(socket_uuid) {
1198 return Some(socket_uuid.clone());
1199 }
1200 }
1201 None
1202 }
1203 }
1204
1205 fn get_outbound_receiver_groups(
1208 &self,
1209 block: &DXBBlock,
1212 mut exclude_sockets: Vec<ComInterfaceSocketUUID>,
1213 ) -> Option<Vec<(Option<ComInterfaceSocketUUID>, Vec<Endpoint>)>> {
1214 let receivers = block.receiver_endpoints();
1215
1216 if !receivers.is_empty() {
1217 let endpoint_sockets = receivers
1218 .iter()
1219 .map(|e| {
1220 if let Some(blacklist) =
1222 self.endpoint_sockets_blacklist.borrow().get(e)
1223 {
1224 exclude_sockets.extend(blacklist.iter().cloned());
1225 }
1226 let socket =
1227 self.find_best_endpoint_socket(e, &exclude_sockets);
1228 (socket, e)
1229 })
1230 .group_by(|(socket, _)| socket.clone())
1231 .into_iter()
1232 .map(|(socket, group)| {
1233 let endpoints = group
1234 .map(|(_, endpoint)| endpoint.clone())
1235 .collect::<Vec<_>>();
1236 (socket, endpoints)
1237 })
1238 .collect::<Vec<_>>();
1239
1240 Some(endpoint_sockets)
1241 } else {
1242 None
1243 }
1244 }
1245
1246 pub fn _start_update_loop(self_rc: Rc<Self>) {
1252 if *self_rc.update_loop_running.borrow() {
1254 return;
1255 }
1256
1257 *self_rc.update_loop_running.borrow_mut() = true;
1259
1260 spawn_with_panic_notify(async move {
1261 while *self_rc.update_loop_running.borrow() {
1262 self_rc.update();
1263 sleep(Duration::from_millis(1)).await;
1264 }
1265 if let Some(sender) =
1266 self_rc.update_loop_stop_sender.borrow_mut().take()
1267 {
1268 sender.send(()).expect("Failed to send stop signal");
1269 }
1270 });
1271 }
1272
1273 pub fn update(&self) {
1278 self.update_interfaces();
1280
1281 self.update_sockets();
1283
1284 self.collect_incoming_data();
1286
1287 self.receive_incoming_blocks();
1289
1290 self.flush_outgoing_blocks();
1292 }
1293
1294 fn prepare_own_block(&self, mut block: DXBBlock) -> DXBBlock {
1298 let now = Time::now();
1300 block.routing_header.sender = self.endpoint.clone();
1301 block
1302 .block_header
1303 .flags_and_timestamp
1304 .set_creation_timestamp(now);
1305
1306 block.routing_header.distance = 1;
1308 block
1309 }
1310
1311 pub fn send_own_block(
1313 &self,
1314 mut block: DXBBlock,
1315 ) -> Result<(), Vec<Endpoint>> {
1316 block = self.prepare_own_block(block);
1317 self.block_handler.add_block_to_history(&block, None);
1319 self.send_block(block, vec![], false)
1320 }
1321
1322 pub async fn send_own_block_await_response(
1327 &self,
1328 block: DXBBlock,
1329 options: ResponseOptions,
1330 ) -> Vec<Result<Response, ResponseError>> {
1331 let context_id = block.block_header.context_id;
1332 let section_index = block.block_header.section_index;
1333
1334 let has_exact_receiver_count = block.has_exact_receiver_count();
1335 let receivers = block.receiver_endpoints();
1336
1337 let res = self.send_own_block(block);
1338 let failed_endpoints = res.err().unwrap_or_default();
1339
1340 #[cfg(feature = "tokio_runtime")]
1342 yield_now().await;
1343
1344 let timeout = options
1345 .timeout
1346 .unwrap_or_default(self.options.default_receive_timeout);
1347
1348 if has_exact_receiver_count {
1350 if (options.resolution_strategy
1352 == ResponseResolutionStrategy::ReturnOnAnyError
1353 || options.resolution_strategy
1354 == ResponseResolutionStrategy::ReturnOnFirstResult)
1355 && !failed_endpoints.is_empty()
1356 {
1357 return receivers
1359 .iter()
1360 .map(|receiver| {
1361 if failed_endpoints.contains(receiver) {
1362 Err(ResponseError::NotReachable(receiver.clone()))
1363 } else {
1364 Err(ResponseError::EarlyAbort(receiver.clone()))
1365 }
1366 })
1367 .collect::<Vec<_>>();
1368 }
1369
1370 let mut responses = HashMap::new();
1372 let mut missing_response_count = receivers.len();
1373 for receiver in &receivers {
1374 responses.insert(
1375 receiver.clone(),
1376 if failed_endpoints.contains(receiver) {
1377 Err(ResponseError::NotReachable(receiver.clone()))
1378 } else {
1379 Err(ResponseError::NoResponseAfterTimeout(
1380 receiver.clone(),
1381 timeout,
1382 ))
1383 },
1384 );
1385 }
1386 missing_response_count -= failed_endpoints.len();
1388
1389 info!(
1390 "Waiting for responses from receivers {}",
1391 receivers
1392 .iter()
1393 .map(|e| e.to_string())
1394 .collect::<Vec<_>>()
1395 .join(",")
1396 );
1397
1398 let mut rx = self
1399 .block_handler
1400 .register_incoming_block_observer(context_id, section_index);
1401
1402 let res = task::timeout(timeout, async {
1403 while let Some(section) = rx.next().await {
1404 let mut received_response = false;
1405 let mut sender = section.get_sender();
1407 if let Some(response) = responses.get_mut(&sender) {
1409 if response.is_err() {
1411 *response = Ok(Response::ExactResponse(sender.clone(), section));
1412 missing_response_count -= 1;
1413 info!("Received expected response from {sender}");
1414 received_response = true;
1415 }
1416 else {
1418 error!("Received multiple responses from the same sender: {sender}");
1419 }
1420 }
1421 else if let Some(response) = responses.get_mut(&sender.any_instance_endpoint()) {
1423 info!("Received resolved response from {} -> {}", &sender, &sender.any_instance_endpoint());
1424 sender = sender.any_instance_endpoint();
1425 if response.is_err() {
1427 *response = Ok(Response::ResolvedResponse(sender.clone(), section));
1428 missing_response_count -= 1;
1429 received_response = true;
1430 }
1431 else {
1433 info!("Received multiple resolved responses from the {}", &sender);
1434 }
1435 }
1436 else {
1438 error!("Received response from unexpected sender: {}", &sender);
1439 }
1440
1441 if received_response && options.resolution_strategy == ResponseResolutionStrategy::ReturnOnFirstResult {
1443 for (receiver, response) in responses.iter_mut() {
1445 if receiver != &sender {
1446 *response = Err(ResponseError::EarlyAbort(receiver.clone()));
1447 }
1448 }
1449 break;
1450 }
1451
1452 if missing_response_count == 0 {
1454 break;
1455 }
1456 }
1457 }).await;
1458
1459 if res.is_err() {
1460 error!("Timeout waiting for responses");
1461 }
1462
1463 responses.into_values().collect::<Vec<_>>()
1465 }
1466 else {
1468 let mut responses = vec![];
1469
1470 let res = task::timeout(timeout, async {
1471 let mut rx =
1472 self.block_handler.register_incoming_block_observer(
1473 context_id,
1474 section_index,
1475 );
1476 while let Some(section) = rx.next().await {
1477 let sender = section.get_sender();
1479 info!("Received response from {sender}");
1480 responses.push(Ok(Response::UnspecifiedResponse(section)));
1482
1483 if options.resolution_strategy
1485 == ResponseResolutionStrategy::ReturnOnFirstResult
1486 {
1487 break;
1488 }
1489 }
1490 })
1491 .await;
1492
1493 if res.is_err() {
1494 info!("Timeout waiting for responses");
1495 }
1496
1497 responses
1498 }
1499 }
1500
1501 pub fn send_block(
1508 &self,
1509 mut block: DXBBlock,
1510 exclude_sockets: Vec<ComInterfaceSocketUUID>,
1511 forked: bool,
1512 ) -> Result<(), Vec<Endpoint>> {
1513 let outbound_receiver_groups =
1514 self.get_outbound_receiver_groups(&block, exclude_sockets);
1515
1516 if outbound_receiver_groups.is_none() {
1517 error!("No outbound receiver groups found for block");
1518 return Err(vec![]);
1519 }
1520
1521 let outbound_receiver_groups = outbound_receiver_groups.unwrap();
1522
1523 let mut unreachable_endpoints = vec![];
1524
1525 let mut fork_count = if forked || outbound_receiver_groups.len() > 1 {
1530 Some(0)
1531 } else {
1532 None
1533 };
1534
1535 block.set_bounce_back(false);
1536
1537 for (receiver_socket, endpoints) in outbound_receiver_groups {
1538 if let Some(socket_uuid) = receiver_socket {
1539 self.send_block_to_endpoints_via_socket(
1540 block.clone(),
1541 &socket_uuid,
1542 &endpoints,
1543 fork_count,
1544 );
1545 } else {
1546 error!(
1547 "{}: cannot send block, no receiver sockets found for endpoints {:?}",
1548 self.endpoint,
1549 endpoints.iter().map(|e| e.to_string()).collect::<Vec<_>>()
1550 );
1551 unreachable_endpoints.extend(endpoints);
1552 }
1553 if let Some(count) = fork_count {
1555 fork_count = Some(count + 1);
1556 }
1557 }
1558
1559 if !unreachable_endpoints.is_empty() {
1560 return Err(unreachable_endpoints);
1561 }
1562 Ok(())
1563 }
1564
1565 fn send_block_to_endpoints_via_socket(
1568 &self,
1569 mut block: DXBBlock,
1570 socket_uuid: &ComInterfaceSocketUUID,
1571 endpoints: &[Endpoint],
1572 fork_count: Option<usize>,
1574 ) {
1575 block.set_receivers(endpoints);
1576
1577 if block.is_bounce_back() {
1580 block.routing_header.distance -= 2;
1581 }
1582
1583 match block.block_header.flags_and_timestamp.block_type() {
1585 BlockType::Trace | BlockType::TraceBack => {
1586 let distance = block.routing_header.distance;
1587 let new_fork_nr = self.calculate_fork_nr(&block, fork_count);
1588 let bounce_back = block.is_bounce_back();
1589
1590 self.add_hop_to_block_trace_data(
1591 &mut block,
1592 NetworkTraceHop {
1593 endpoint: self.endpoint.clone(),
1594 distance,
1595 socket: NetworkTraceHopSocket::new(
1596 self.get_com_interface_from_socket_uuid(
1597 socket_uuid,
1598 )
1599 .borrow_mut()
1600 .get_properties(),
1601 socket_uuid.clone(),
1602 ),
1603 direction: NetworkTraceHopDirection::Outgoing,
1604 fork_nr: new_fork_nr,
1605 bounce_back,
1606 },
1607 );
1608 }
1609 _ => {}
1610 }
1611
1612 let socket = self.get_socket_by_uuid(socket_uuid);
1613 let mut socket_ref = socket.lock().unwrap();
1614
1615 let is_broadcast = endpoints
1616 .iter()
1617 .any(|e| e == &Endpoint::ANY_ALL_INSTANCES || e == &Endpoint::ANY);
1618
1619 if is_broadcast
1620 && let Some(direct_endpoint) = &socket_ref.direct_endpoint
1621 && (direct_endpoint == &self.endpoint
1622 || direct_endpoint == &Endpoint::LOCAL)
1623 {
1624 return;
1625 }
1626
1627 match &block.to_bytes() {
1628 Ok(bytes) => {
1629 info!(
1630 "Sending block to socket {}: {}",
1631 socket_uuid,
1632 endpoints.iter().map(|e| e.to_string()).join(", ")
1633 );
1634
1635 socket_ref.queue_outgoing_block(bytes);
1637 }
1638 Err(err) => {
1639 error!("Failed to convert block to bytes: {err:?}");
1640 }
1641 }
1642 }
1643
1644 fn update_interfaces(&self) {
1647 let mut to_remove = Vec::new();
1648 for (interface, _) in self.interfaces.borrow().values() {
1649 let uuid = interface.borrow().get_uuid().clone();
1650 let state = interface.borrow().get_state();
1651
1652 if state.is_destroyed() {
1656 info!("Destroying interface on the ComHub {uuid}");
1657 to_remove.push(uuid);
1658 } else if state.is_not_connected()
1659 && interface.borrow_mut().get_properties().shall_reconnect()
1660 {
1661 let interface_rc = interface.clone();
1664 let mut interface = interface.borrow_mut();
1665
1666 let already_connecting =
1667 interface.get_state() == ComInterfaceState::Connecting;
1668
1669 if !already_connecting {
1670 let config = interface.get_properties_mut();
1671
1672 let reconnect_now = match &config.reconnection_config {
1673 ReconnectionConfig::InstantReconnect => true,
1674 ReconnectionConfig::ReconnectWithTimeout { timeout } => {
1675 ReconnectionConfig::check_reconnect_timeout(
1676 config.close_timestamp,
1677 timeout,
1678 )
1679 }
1680 ReconnectionConfig::ReconnectWithTimeoutAndAttempts {
1681 timeout,
1682 attempts,
1683 } => {
1684 let max_attempts = attempts;
1685
1686 let attempts = config.reconnect_attempts.unwrap_or(0);
1688 let attempts = attempts + 1;
1689 if attempts > *max_attempts {
1690 to_remove.push(uuid.clone());
1691 return;
1692 }
1693
1694 config.reconnect_attempts = Some(attempts);
1695
1696 ReconnectionConfig::check_reconnect_timeout(
1697 config.close_timestamp,
1698 timeout,
1699 )
1700 }
1701 ReconnectionConfig::NoReconnect => false,
1702 };
1703 if reconnect_now {
1704 debug!("Reconnecting interface {uuid}");
1705 interface.set_state(ComInterfaceState::Connecting);
1706 spawn_with_panic_notify(async move {
1707 let interface = interface_rc.clone();
1708 let mut interface = interface.borrow_mut();
1709
1710 let config = interface.get_properties_mut();
1711 config.close_timestamp = None;
1712
1713 let current_attempts =
1714 config.reconnect_attempts.unwrap_or(0);
1715 config.reconnect_attempts =
1716 Some(current_attempts + 1);
1717
1718 let res = interface.handle_open().await;
1719 if res {
1720 interface
1721 .set_state(ComInterfaceState::Connected);
1722 } else {
1724 interface
1725 .set_state(ComInterfaceState::NotConnected);
1726 }
1727 });
1728 } else {
1729 debug!("Not reconnecting interface {uuid}");
1730 }
1731 }
1732 }
1733 }
1734
1735 for uuid in to_remove {
1736 self.cleanup_interface(uuid);
1737 }
1738 }
1739
1740 fn update_sockets(&self) {
1743 let mut new_sockets = Vec::new();
1744 let mut deleted_sockets = Vec::new();
1745 let mut registered_sockets = Vec::new();
1746
1747 for (interface, priority) in self.interfaces.borrow().values() {
1748 let socket_updates = interface.clone().borrow().get_sockets();
1749 let mut socket_updates = socket_updates.lock().unwrap();
1750
1751 registered_sockets
1752 .extend(socket_updates.socket_registrations.drain(..));
1753 new_sockets.extend(
1754 socket_updates.new_sockets.drain(..).map(|s| (s, *priority)),
1755 );
1756 deleted_sockets.extend(socket_updates.deleted_sockets.drain(..));
1757 }
1758
1759 for (socket, priority) in new_sockets {
1760 self.add_socket(socket.clone(), priority);
1761 }
1762 for socket_uuid in deleted_sockets {
1763 self.delete_socket(&socket_uuid);
1764 }
1765 for (socket_uuid, distance, endpoint) in registered_sockets {
1766 let socket = self.get_socket_by_uuid(&socket_uuid);
1767 self.register_socket_endpoint(socket, endpoint.clone(), distance)
1768 .unwrap_or_else(|e| {
1769 error!(
1770 "Failed to register socket {socket_uuid} for endpoint {endpoint} {e:?}"
1771 );
1772 });
1773 }
1774 }
1775
1776 fn collect_incoming_data(&self) {
1779 for (socket, _) in self.sockets.borrow().values() {
1781 let mut socket_ref = socket.lock().unwrap();
1782 socket_ref.collect_incoming_data();
1783 }
1784 }
1785
1786 fn receive_incoming_blocks(&self) {
1789 let mut blocks = vec![];
1790 for (socket, _) in self.sockets.borrow().values() {
1792 let mut socket_ref = socket.lock().unwrap();
1793 let uuid = socket_ref.uuid.clone();
1794 let block_queue = socket_ref.get_incoming_block_queue();
1795 blocks.push((uuid, block_queue.drain(..).collect::<Vec<_>>()));
1796 }
1797
1798 for (uuid, blocks) in blocks {
1799 for block in blocks.iter() {
1800 self.receive_block(block, uuid.clone());
1801 }
1802 }
1803 }
1804
1805 fn flush_outgoing_blocks(&self) {
1807 let interfaces = self.interfaces.borrow();
1808 for (interface, _) in interfaces.values() {
1809 com_interface::flush_outgoing_blocks(interface.clone());
1810 }
1811 }
1812}
1813
1814#[derive(Default, PartialEq, Debug)]
1815pub enum ResponseResolutionStrategy {
1816 #[default]
1823 ReturnAfterAllSettled,
1824
1825 ReturnOnAnyError,
1833
1834 ReturnOnFirstResponse,
1837
1838 ReturnOnFirstResult,
1841}
1842
1843#[derive(Default, Debug)]
1844pub enum ResponseTimeout {
1845 #[default]
1846 Default,
1847 Custom(Duration),
1848}
1849
1850impl ResponseTimeout {
1851 pub fn unwrap_or_default(self, default: Duration) -> Duration {
1852 match self {
1853 ResponseTimeout::Default => default,
1854 ResponseTimeout::Custom(timeout) => timeout,
1855 }
1856 }
1857}
1858
1859#[derive(Default, Debug)]
1860pub struct ResponseOptions {
1861 pub resolution_strategy: ResponseResolutionStrategy,
1862 pub timeout: ResponseTimeout,
1863}
1864
1865impl ResponseOptions {
1866 pub fn new_with_resolution_strategy(
1867 resolution_strategy: ResponseResolutionStrategy,
1868 ) -> Self {
1869 Self {
1870 resolution_strategy,
1871 ..ResponseOptions::default()
1872 }
1873 }
1874
1875 pub fn new_with_timeout(timeout: Duration) -> Self {
1876 Self {
1877 timeout: ResponseTimeout::Custom(timeout),
1878 ..ResponseOptions::default()
1879 }
1880 }
1881}
1882
1883#[derive(Debug)]
1884pub enum Response {
1885 ExactResponse(Endpoint, IncomingSection),
1886 ResolvedResponse(Endpoint, IncomingSection),
1887 UnspecifiedResponse(IncomingSection),
1888}
1889
1890impl Response {
1891 pub fn take_incoming_section(self) -> IncomingSection {
1892 match self {
1893 Response::ExactResponse(_, section) => section,
1894 Response::ResolvedResponse(_, section) => section,
1895 Response::UnspecifiedResponse(section) => section,
1896 }
1897 }
1898}
1899
1900#[derive(Debug)]
1901pub enum ResponseError {
1902 NoResponseAfterTimeout(Endpoint, Duration),
1903 NotReachable(Endpoint),
1904 EarlyAbort(Endpoint),
1905}
1906
1907impl Display for ResponseError {
1908 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1909 match self {
1910 ResponseError::NoResponseAfterTimeout(endpoint, duration) => {
1911 write!(
1912 f,
1913 "No response after timeout ({}s) for endpoint {}",
1914 duration.as_secs(),
1915 endpoint
1916 )
1917 }
1918 ResponseError::NotReachable(endpoint) => {
1919 write!(f, "Endpoint {endpoint} is not reachable")
1920 }
1921 ResponseError::EarlyAbort(endpoint) => {
1922 write!(f, "Early abort for endpoint {endpoint}")
1923 }
1924 }
1925 }
1926}