1use crate::global::protocol_structures::block_header::BlockType;
2use crate::global::protocol_structures::routing_header::{
3 ReceiverEndpoints, SignatureType,
4};
5use crate::runtime::global_context::get_global_context;
6use crate::stdlib::{cell::RefCell, rc::Rc};
7use crate::task::{self, sleep, spawn_with_panic_notify};
8
9use futures::channel::oneshot::Sender;
10use futures_util::StreamExt;
11use itertools::Itertools;
12use log::{debug, error, info, warn};
13use std::cmp::{Ordering, PartialEq};
14use std::collections::{HashMap, HashSet};
15use std::fmt::{Debug, Display, Formatter};
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18#[cfg(feature = "tokio_runtime")]
19use tokio::task::yield_now;
20use super::com_interfaces::com_interface::{
23 self, ComInterfaceError, ComInterfaceState
24};
25use super::com_interfaces::{
26 com_interface::ComInterface, com_interface_socket::ComInterfaceSocket,
27};
28use crate::values::core_values::endpoint::{Endpoint, EndpointInstance};
29use crate::global::dxb_block::{DXBBlock, IncomingSection};
30use crate::network::block_handler::{BlockHandler, BlockHistoryData};
31use crate::network::com_hub_network_tracing::{NetworkTraceHop, NetworkTraceHopDirection, NetworkTraceHopSocket};
32use crate::network::com_interfaces::com_interface::ComInterfaceUUID;
33use crate::network::com_interfaces::com_interface_properties::{
34 InterfaceDirection, ReconnectionConfig,
35};
36use crate::network::com_interfaces::com_interface_socket::ComInterfaceSocketUUID;
37use crate::network::com_interfaces::default_com_interfaces::local_loopback_interface::LocalLoopbackInterface;
38use crate::values::value_container::ValueContainer;
39
40#[derive(Debug, Clone)]
41pub struct DynamicEndpointProperties {
42 pub known_since: u64,
43 pub distance: i8,
44 pub is_direct: bool,
45 pub channel_factor: u32,
46 pub direction: InterfaceDirection,
47}
48
49pub type ComInterfaceFactoryFn =
50 fn(
51 setup_data: ValueContainer,
52 ) -> Result<Rc<RefCell<dyn ComInterface>>, ComInterfaceError>;
53
54#[derive(Debug)]
55pub struct ComHubOptions {
56 default_receive_timeout: Duration,
57}
58
59impl Default for ComHubOptions {
60 fn default() -> Self {
61 ComHubOptions {
62 default_receive_timeout: Duration::from_secs(5),
63 }
64 }
65}
66
67pub struct ComHub {
68 pub endpoint: Endpoint,
70
71 pub options: ComHubOptions,
73
74 pub interface_factories: RefCell<HashMap<String, ComInterfaceFactoryFn>>,
76
77 pub interfaces: RefCell<
79 HashMap<
80 ComInterfaceUUID,
81 (Rc<RefCell<dyn ComInterface>>, InterfacePriority),
82 >,
83 >,
84
85 pub sockets: RefCell<
88 HashMap<
89 ComInterfaceSocketUUID,
90 (Arc<Mutex<ComInterfaceSocket>>, HashSet<Endpoint>),
91 >,
92 >,
93
94 pub endpoint_sockets_blacklist:
96 RefCell<HashMap<Endpoint, HashSet<ComInterfaceSocketUUID>>>,
97
98 pub fallback_sockets:
101 RefCell<Vec<(ComInterfaceSocketUUID, u16, InterfaceDirection)>>,
102
103 pub endpoint_sockets: RefCell<
106 HashMap<
107 Endpoint,
108 Vec<(ComInterfaceSocketUUID, DynamicEndpointProperties)>,
109 >,
110 >,
111
112 update_loop_running: RefCell<bool>,
115 update_loop_stop_sender: RefCell<Option<Sender<()>>>,
116
117 pub block_handler: BlockHandler,
118}
119
120impl Debug for ComHub {
121 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
122 f.debug_struct("ComHub")
123 .field("endpoint", &self.endpoint)
124 .field("options", &self.options)
125 .field("sockets", &self.sockets)
126 .field("endpoint_sockets_blacklist", &self.endpoint_sockets_blacklist)
127 .field("fallback_sockets", &self.fallback_sockets)
128 .field("endpoint_sockets", &self.endpoint_sockets)
129 .finish()
130 }
131}
132
133#[derive(Debug, Clone, Default)]
134struct EndpointIterateOptions<'a> {
135 pub only_direct: bool,
136 pub exact_instance: bool,
137 pub exclude_sockets: &'a [ComInterfaceSocketUUID],
138}
139
140impl Default for ComHub {
141 fn default() -> Self {
142 ComHub {
143 endpoint: Endpoint::default(),
144 options: ComHubOptions::default(),
145 interface_factories: RefCell::new(HashMap::new()),
146 interfaces: RefCell::new(HashMap::new()),
147 endpoint_sockets: RefCell::new(HashMap::new()),
148 block_handler: BlockHandler::new(),
149 sockets: RefCell::new(HashMap::new()),
150 fallback_sockets: RefCell::new(Vec::new()),
151 endpoint_sockets_blacklist: RefCell::new(HashMap::new()),
152 update_loop_running: RefCell::new(false),
153 update_loop_stop_sender: RefCell::new(None),
154 }
155 }
156}
157
158#[derive(Debug, Clone, Copy, Ord, PartialOrd, Eq, PartialEq)]
159pub enum InterfacePriority {
160 None,
164 Priority(u16),
168}
169
170impl Default for InterfacePriority {
171 fn default() -> Self {
172 InterfacePriority::Priority(0)
173 }
174}
175
176#[derive(Debug, Clone, PartialEq)]
177pub enum ComHubError {
178 InterfaceError(ComInterfaceError),
179 InterfaceCloseFailed,
180 InterfaceNotConnected,
181 InterfaceDoesNotExist,
182 InterfaceAlreadyExists,
183 InterfaceTypeDoesNotExist,
184 InvalidInterfaceDirectionForFallbackInterface,
185 NoResponse,
186 InterfaceOpenError,
187}
188
189#[derive(Debug)]
190pub enum SocketEndpointRegistrationError {
191 SocketDisconnected,
192 SocketUninitialized,
193 SocketEndpointAlreadyRegistered,
194}
195
196impl ComHub {
197 pub fn new(endpoint: impl Into<Endpoint>) -> ComHub {
198 ComHub {
199 endpoint: endpoint.into(),
200 ..ComHub::default()
201 }
202 }
203
204 pub async fn init(&self) -> Result<(), ComHubError> {
205 let local_interface = LocalLoopbackInterface::new();
207 self.open_and_add_interface(
208 Rc::new(RefCell::new(local_interface)),
209 InterfacePriority::None,
210 )
211 .await
212 }
213
214 pub fn register_interface_factory(
217 &self,
218 interface_type: String,
219 factory: ComInterfaceFactoryFn,
220 ) {
221 self.interface_factories
222 .borrow_mut()
223 .insert(interface_type, factory);
224 }
225
226 pub async fn create_interface<'a>(
230 &self,
231 interface_type: &str,
232 setup_data: ValueContainer,
233 priority: InterfacePriority,
234 ) -> Result<Rc<RefCell<dyn ComInterface>>, ComHubError> {
235 info!("creating interface {interface_type}");
236 let interface_factories = self.interface_factories.borrow();
237 if let Some(factory) = interface_factories.get(interface_type) {
238 let interface =
239 factory(setup_data).map_err(ComHubError::InterfaceError)?;
240 drop(interface_factories);
241
242 self
243 .open_and_add_interface(interface.clone(), priority)
244 .await
245 .map(|_| interface)
246 } else {
247 Err(ComHubError::InterfaceTypeDoesNotExist)
248 }
249 }
250
251 fn try_downcast<T: 'static>(
252 input: Rc<RefCell<dyn ComInterface>>,
253 ) -> Option<Rc<RefCell<T>>> {
254 if input.borrow().as_any().is::<T>() {
256 let ptr = Rc::into_raw(input) as *const RefCell<T>;
258 unsafe { Some(Rc::from_raw(ptr)) }
259 } else {
260 None
261 }
262 }
263
264 pub fn get_interface_by_uuid<T: ComInterface>(
265 &self,
266 interface_uuid: &ComInterfaceUUID,
267 ) -> Option<Rc<RefCell<T>>> {
268 ComHub::try_downcast(
269 self.interfaces.borrow().get(interface_uuid)?.0.clone(),
270 )
271 }
272
273 pub fn has_interface(&self, interface_uuid: &ComInterfaceUUID) -> bool {
274 self.interfaces.borrow().contains_key(interface_uuid)
275 }
276
277 pub fn get_dyn_interface_by_uuid(
278 &self,
279 uuid: &ComInterfaceUUID,
280 ) -> Option<Rc<RefCell<dyn ComInterface>>> {
281 self.interfaces
282 .borrow()
283 .get(uuid)
284 .map(|(interface, _)| interface.clone())
285 }
286
287 pub async fn open_and_add_interface(
288 &self,
289 interface: Rc<RefCell<dyn ComInterface>>,
290 priority: InterfacePriority,
291 ) -> Result<(), ComHubError> {
292 if interface.borrow().get_state() != ComInterfaceState::Connected {
293 if !(interface.borrow_mut().handle_open().await) {
297 return Err(ComHubError::InterfaceOpenError);
298 }
299 }
300 self.add_interface(interface.clone(), priority)
301 }
302
303 pub fn add_interface(
304 &self,
305 interface: Rc<RefCell<dyn ComInterface>>,
306 priority: InterfacePriority,
307 ) -> Result<(), ComHubError> {
308 let uuid = interface.borrow().get_uuid().clone();
309 let mut interfaces = self.interfaces.borrow_mut();
310 if interfaces.contains_key(&uuid) {
311 return Err(ComHubError::InterfaceAlreadyExists);
312 }
313
314 if priority != InterfacePriority::None
316 && interface.borrow_mut().get_properties().direction
317 == InterfaceDirection::In
318 {
319 return Err(
320 ComHubError::InvalidInterfaceDirectionForFallbackInterface,
321 );
322 }
323
324 interfaces.insert(uuid, (interface, priority));
325 Ok(())
326 }
327
328 pub async fn remove_interface(
331 &self,
332 interface_uuid: ComInterfaceUUID,
333 ) -> Result<(), ComHubError> {
334 info!("Removing interface {interface_uuid}");
335 let interface = self
336 .interfaces
337 .borrow_mut()
338 .get_mut(&interface_uuid.clone())
339 .ok_or(ComHubError::InterfaceDoesNotExist)?
340 .0
341 .clone();
342 {
343 let mut interface = interface.borrow_mut();
346 interface.handle_destroy().await;
347 }
348
349 self.update_sockets();
351
352 self.cleanup_interface(interface_uuid)
353 .ok_or(ComHubError::InterfaceDoesNotExist)?;
354
355 Ok(())
356 }
357
358 fn cleanup_interface(
361 &self,
362 interface_uuid: ComInterfaceUUID,
363 ) -> Option<Rc<RefCell<dyn ComInterface>>> {
364 let interface = self
365 .interfaces
366 .borrow_mut()
367 .remove(&interface_uuid)
368 .or(None)?
369 .0;
370 Some(interface)
371 }
372
373 pub(crate) fn receive_block(
374 &self,
375 block: &DXBBlock,
376 socket_uuid: ComInterfaceSocketUUID,
377 ) {
378 info!("{} received block: {}", self.endpoint, block);
379
380 if !self.validate_block(block) {
382 warn!("Block validation failed. Dropping block...");
383 return;
384 }
385
386 let block_type = block.block_header.flags_and_timestamp.block_type();
387
388 let is_new_block = !self.block_handler.is_block_in_history(block);
390 if is_new_block && block.routing_header.sender != self.endpoint {
393 self.register_socket_endpoint_from_incoming_block(
394 socket_uuid.clone(),
395 block,
396 );
397 }
398
399 if let Some(receivers) = &block.routing_header.receivers.endpoints {
400 let is_for_own = receivers.endpoints.iter().any(|e| {
401 e == &self.endpoint
402 || e == &Endpoint::ANY
403 || e == &Endpoint::ANY_ALL_INSTANCES
404 });
405
406 if is_for_own && block_type != BlockType::Hello {
408 info!("Block is for this endpoint");
409
410 match block_type {
411 BlockType::Trace => {
412 self.handle_trace_block(block, socket_uuid.clone());
413 }
414 BlockType::TraceBack => {
415 self.handle_trace_back_block(
416 block,
417 socket_uuid.clone(),
418 );
419 }
420 _ => {
421 self.block_handler.handle_incoming_block(block.clone());
422 }
423 };
424 }
425
426 let should_relay =
428 !(
430 is_for_own && block_type == BlockType::Hello
431 );
432
433 if should_relay {
435 let remaining_receivers = if is_for_own {
437 &self.get_remote_receivers(receivers)
438 } else {
439 &receivers.endpoints
440 };
441
442 if !remaining_receivers.is_empty() {
444 match block_type {
445 BlockType::Trace | BlockType::TraceBack => {
446 self.redirect_trace_block(
447 block.clone_with_new_receivers(
448 remaining_receivers,
449 ),
450 socket_uuid.clone(),
451 is_for_own,
452 );
453 }
454 _ => {
455 self.redirect_block(
456 block.clone_with_new_receivers(
457 remaining_receivers,
458 ),
459 socket_uuid.clone(),
460 is_for_own,
461 );
462 }
463 }
464 }
465 }
466 }
467
468 if is_new_block {
470 self.block_handler
471 .add_block_to_history(block, Some(socket_uuid));
472 }
473 }
474
475 fn get_remote_receivers(
478 &self,
479 receiver_endpoints: &ReceiverEndpoints,
480 ) -> Vec<Endpoint> {
481 receiver_endpoints
482 .endpoints
483 .iter()
484 .filter(|e| e != &&self.endpoint)
485 .cloned()
486 .collect::<Vec<_>>()
487 }
488
489 fn register_socket_endpoint_from_incoming_block(
492 &self,
493 socket_uuid: ComInterfaceSocketUUID,
494 block: &DXBBlock,
495 ) {
496 let socket = self.get_socket_by_uuid(&socket_uuid);
497 let mut socket_ref = socket.lock().unwrap();
498
499 let distance = block.routing_header.distance;
500 let sender = block.routing_header.sender.clone();
501
502 if socket_ref.direct_endpoint.is_none() && distance == 1 {
504 info!(
505 "Setting direct endpoint for socket {}: {}",
506 socket_ref.uuid, sender
507 );
508 socket_ref.direct_endpoint = Some(sender.clone());
509 }
510
511 drop(socket_ref);
512 match self.register_socket_endpoint(
513 socket.clone(),
514 sender.clone(),
515 distance,
516 ) {
517 Err(SocketEndpointRegistrationError::SocketEndpointAlreadyRegistered) => {
518 debug!(
519 "Socket already registered for endpoint {sender}",
520 );
521 }
522 Err(error) => {
523 panic!("Failed to register socket endpoint {sender}: {error:?}");
524 },
525 Ok(_) => { }
526 }
527 }
528
529 pub(crate) fn redirect_block(
532 &self,
533 mut block: DXBBlock,
534 incoming_socket: ComInterfaceSocketUUID,
535 forked: bool,
537 ) {
538 let receivers = block.get_receivers();
539
540 let history_block_data =
543 self.block_handler.get_block_data_from_history(&block);
544 if history_block_data.is_some() {
545 for receiver in &receivers {
546 if receiver != &self.endpoint {
547 info!(
548 "{}: Adding socket {} to blacklist for receiver {}",
549 self.endpoint, incoming_socket, receiver
550 );
551 self.endpoint_sockets_blacklist
552 .borrow_mut()
553 .entry(receiver.clone())
554 .or_default()
555 .insert(incoming_socket.clone());
556 }
557 }
558 }
559
560 block.routing_header.distance += 1;
562
563 block.routing_header.ttl -= 1;
566 if block.routing_header.ttl == 0 {
568 warn!("Block TTL expired. Dropping block...");
569 return;
570 }
571
572 let mut prefer_incoming_socket_for_bounce_back = false;
573 let res = {
576 if block.routing_header.sender == self.endpoint {
577 prefer_incoming_socket_for_bounce_back =
579 !block.is_bounce_back();
580 Err(receivers.to_vec())
581 } else {
582 let mut excluded_sockets = vec![incoming_socket.clone()];
583 if let Some(BlockHistoryData {
584 original_socket_uuid: Some(original_socket_uuid),
585 }) = &history_block_data
586 {
587 excluded_sockets.push(original_socket_uuid.clone())
588 }
589 self.send_block(block.clone(), excluded_sockets, forked)
590 }
591 };
592
593 if let Err(unreachable_endpoints) = res {
595 let send_back_socket = if !prefer_incoming_socket_for_bounce_back
599 && let Some(history_block_data) = history_block_data
600 {
601 history_block_data.original_socket_uuid
602 } else {
603 Some(incoming_socket.clone())
604 };
605
606 if let Some(send_back_socket) = send_back_socket {
609 if block.is_bounce_back() && send_back_socket == incoming_socket
611 {
612 warn!("{}: Tried to send bounce back block back to incoming socket, but this is not allowed", self.endpoint);
613 } else if self
614 .get_socket_by_uuid(&send_back_socket)
615 .lock()
616 .unwrap()
617 .can_send()
618 {
619 block.set_bounce_back(true);
620 self.send_block_to_endpoints_via_socket(
621 block,
622 &send_back_socket,
623 &unreachable_endpoints,
624 if forked { Some(0) } else { None },
625 )
626 } else {
627 error!("Tried to send bounce back block, but cannot send back to incoming socket")
628 }
629 }
630 else {
634 self.send_block(block, vec![], forked).unwrap_or_else(|_| {
635 error!(
636 "Failed to send out block to {}",
637 unreachable_endpoints
638 .iter()
639 .map(|e| e.to_string())
640 .join(",")
641 );
642 });
643 }
644 }
645 }
646
647 fn validate_block(&self, block: &DXBBlock) -> bool {
648 let is_signed =
651 block.routing_header.flags.signature_type() != SignatureType::None;
652
653 match is_signed {
654 true => {
655 true
659 }
660 false => {
661 let endpoint = block.routing_header.sender.clone();
662 let is_trusted = {
663 cfg_if::cfg_if! {
664 if #[cfg(feature = "debug")] {
665 get_global_context().debug_flags.allow_unsigned_blocks
666 }
667 else {
668 false
670 }
671 }
672 };
673 match is_trusted {
674 true => true,
675 false => {
676 warn!("Block received by {endpoint} is not signed. Dropping block...");
677 false
678 }
679 }
680 }
681 }
682 }
683
684 pub fn register_socket_endpoint(
689 &self,
690 socket: Arc<Mutex<ComInterfaceSocket>>,
691 endpoint: Endpoint,
692 distance: i8,
693 ) -> Result<(), SocketEndpointRegistrationError> {
694 log::info!(
695 "{} registering endpoint {} for socket {}",
696 self.endpoint,
697 endpoint,
698 socket.lock().unwrap().uuid
699 );
700 let socket_ref = socket.lock().unwrap();
701
702 let is_direct = socket_ref.direct_endpoint == Some(endpoint.clone());
705
706 if !socket_ref.state.is_open() {
708 return Err(SocketEndpointRegistrationError::SocketDisconnected);
709 }
710
711 if let Some(entries) = self.endpoint_sockets.borrow().get(&endpoint)
713 && entries
714 .iter()
715 .any(|(socket_uuid, _)| socket_uuid == &socket_ref.uuid)
716 {
717 return Err(SocketEndpointRegistrationError::SocketEndpointAlreadyRegistered);
718 }
719
720 let socket_uuid = socket_ref.uuid.clone();
721 let channel_factor = socket_ref.channel_factor;
722 let direction = socket_ref.direction.clone();
723 drop(socket_ref);
724
725 self.add_socket_endpoint(&socket_uuid, endpoint.clone());
727
728 self.add_endpoint_socket(
730 &endpoint,
731 socket_uuid,
732 distance,
733 is_direct,
734 channel_factor,
735 direction,
736 );
737
738 self.sort_sockets(&endpoint);
740
741 Ok(())
742 }
743
744 pub async fn wait_for_update_async(&self) {
748 loop {
749 let mut is_done = true;
750 for interface in self.interfaces.borrow().values() {
751 let interface = interface.0.clone();
752 let interface = interface.borrow_mut();
753 let outgoing_blocks_count =
754 interface.get_info().outgoing_blocks_count.get();
755 if outgoing_blocks_count > 0 {
757 is_done = false;
758 break;
759 }
760 if interface.get_state() == ComInterfaceState::Connecting {
762 is_done = false;
763 break;
764 }
765 }
766 if is_done {
767 break;
768 }
769 sleep(Duration::from_millis(10)).await;
770 }
771 }
772
773 pub async fn update_async(&self) {
778 self.update();
779 self.wait_for_update_async().await;
780 }
781
782 fn add_endpoint_socket(
785 &self,
786 endpoint: &Endpoint,
787 socket_uuid: ComInterfaceSocketUUID,
788 distance: i8,
789 is_direct: bool,
790 channel_factor: u32,
791 direction: InterfaceDirection,
792 ) {
793 let mut endpoint_sockets = self.endpoint_sockets.borrow_mut();
794 if !endpoint_sockets.contains_key(endpoint) {
795 endpoint_sockets.insert(endpoint.clone(), Vec::new());
796 }
797
798 let endpoint_sockets = endpoint_sockets.get_mut(endpoint).unwrap();
799 endpoint_sockets.push((
800 socket_uuid,
801 DynamicEndpointProperties {
802 known_since: get_global_context().time.lock().unwrap().now(),
803 distance,
804 is_direct,
805 channel_factor,
806 direction,
807 },
808 ));
809 }
810
811 fn add_socket(
816 &self,
817 socket: Arc<Mutex<ComInterfaceSocket>>,
818 priority: InterfacePriority,
819 ) {
820 let socket_ref = socket.lock().unwrap();
821 let socket_uuid = socket_ref.uuid.clone();
822 if self.sockets.borrow().contains_key(&socket_ref.uuid) {
823 panic!("Socket {} already exists in ComHub", socket_ref.uuid);
824 }
825
826 if !socket_ref.can_send() && priority != InterfacePriority::None {
832 panic!(
833 "Socket {} cannot be used for fallback routing, since it has no send capability",
834 socket_ref.uuid
835 );
836 }
837 let direction = socket_ref.direction.clone();
838
839 self.sockets
840 .borrow_mut()
841 .insert(socket_ref.uuid.clone(), (socket.clone(), HashSet::new()));
842
843 if socket_ref.can_send() {
845 match priority {
846 InterfacePriority::None => {
847 }
849 InterfacePriority::Priority(priority) => {
850 self.add_fallback_socket(&socket_uuid, priority, direction);
852 }
853 }
854
855 let mut block: DXBBlock = DXBBlock::default();
857 block
858 .block_header
859 .flags_and_timestamp
860 .set_block_type(BlockType::Hello);
861 block
862 .routing_header
863 .flags
864 .set_signature_type(SignatureType::Unencrypted);
865 let block = self.prepare_own_block(block);
868
869 drop(socket_ref);
870 self.send_block_to_endpoints_via_socket(
871 block,
872 &socket_uuid,
873 &[Endpoint::ANY],
874 None,
875 );
876 }
877 }
878
879 fn add_fallback_socket(
883 &self,
884 socket_uuid: &ComInterfaceSocketUUID,
885 priority: u16,
886 direction: InterfaceDirection,
887 ) {
888 let mut fallback_sockets = self.fallback_sockets.borrow_mut();
890 fallback_sockets.push((socket_uuid.clone(), priority, direction));
891 fallback_sockets.sort_by_key(|(_, priority, direction)| {
894 let dir_rank = match direction {
895 InterfaceDirection::InOut => 0,
896 InterfaceDirection::Out => 1,
897 InterfaceDirection::In => {
898 panic!("Socket {socket_uuid} is not allowed to be used as fallback socket")
899 }
900 };
901 (dir_rank, std::cmp::Reverse(*priority))
902 });
903 }
904
905 fn delete_socket(&self, socket_uuid: &ComInterfaceSocketUUID) {
907 self.sockets
908 .borrow_mut()
909 .remove(socket_uuid)
910 .or_else(|| panic!("Socket {socket_uuid} not found in ComHub"));
911
912 self.endpoint_sockets.borrow_mut().retain(|_, sockets| {
915 sockets.retain(|(uuid, _)| uuid != socket_uuid);
916 !sockets.is_empty()
917 });
918
919 self.fallback_sockets
921 .borrow_mut()
922 .retain(|(uuid, _, _)| uuid != socket_uuid);
923 }
924
925 fn add_socket_endpoint(
927 &self,
928 socket_uuid: &ComInterfaceSocketUUID,
929 endpoint: Endpoint,
930 ) {
931 assert!(
932 self.sockets.borrow().contains_key(socket_uuid),
933 "Socket not found in ComHub"
934 );
935 self.sockets
937 .borrow_mut()
938 .get_mut(socket_uuid)
939 .unwrap()
940 .1
941 .insert(endpoint.clone());
942 }
943
944 fn sort_sockets(&self, endpoint: &Endpoint) {
953 let mut endpoint_sockets = self.endpoint_sockets.borrow_mut();
954 let sockets = endpoint_sockets.get_mut(endpoint).unwrap();
955
956 sockets.sort_by(|(_, a), (_, b)| {
957 b.is_direct
959 .cmp(&a.is_direct)
960 .then_with(|| b.channel_factor.cmp(&a.channel_factor))
961 .then_with(|| b.distance.cmp(&a.distance))
962 .then_with(
963 || {
964 cfg_if::cfg_if! {
965 if #[cfg(feature = "debug")] {
966 if get_global_context().debug_flags.enable_deterministic_behavior {
967 Ordering::Equal
968 }
969 else {
970 b.known_since.cmp(&a.known_since)
971 }
972 }
973 else {
974 b.known_since.cmp(&a.known_since)
975 }
976 }
977 }
978 )
979
980 });
981 }
982
983 pub(crate) fn get_socket_by_uuid(
987 &self,
988 socket_uuid: &ComInterfaceSocketUUID,
989 ) -> Arc<Mutex<ComInterfaceSocket>> {
990 self.sockets
991 .borrow()
992 .get(socket_uuid)
993 .map(|socket| socket.0.clone())
994 .unwrap_or_else(|| {
995 panic!("Socket for uuid {socket_uuid} not found")
996 })
997 }
998
999 pub(crate) fn get_com_interface_by_uuid(
1003 &self,
1004 interface_uuid: &ComInterfaceUUID,
1005 ) -> Rc<RefCell<dyn ComInterface>> {
1006 self.interfaces
1007 .borrow()
1008 .get(interface_uuid)
1009 .unwrap_or_else(|| {
1010 panic!("Interface for uuid {interface_uuid} not found")
1011 })
1012 .0
1013 .clone()
1014 }
1015
1016 pub(crate) fn get_com_interface_from_socket_uuid(
1020 &self,
1021 socket_uuid: &ComInterfaceSocketUUID,
1022 ) -> Rc<RefCell<dyn ComInterface>> {
1023 let socket = self.get_socket_by_uuid(socket_uuid);
1024 let socket = socket.lock().unwrap();
1025 self.get_com_interface_by_uuid(&socket.interface_uuid)
1026 }
1027
1028 fn iterate_endpoint_sockets<'a>(
1032 &'a self,
1033 endpoint: &'a Endpoint,
1034 options: EndpointIterateOptions<'a>,
1035 ) -> impl Iterator<Item = ComInterfaceSocketUUID> + 'a {
1036 std::iter::from_coroutine(
1037 #[coroutine]
1038 move || {
1039 let endpoint_sockets_borrow = self.endpoint_sockets.borrow();
1040 let endpoint_sockets =
1042 endpoint_sockets_borrow.get(endpoint).cloned();
1043 if endpoint_sockets.is_none() {
1044 return;
1045 }
1046 for (socket_uuid, _) in endpoint_sockets.unwrap() {
1047 {
1048 let socket = self.get_socket_by_uuid(&socket_uuid);
1049 let socket = socket.lock().unwrap();
1050
1051 if options.only_direct
1053 && socket.direct_endpoint.is_some()
1054 && socket.direct_endpoint.as_ref().unwrap()
1055 == endpoint
1056 {
1057 debug!(
1058 "No direct socket found for endpoint {endpoint}. Skipping..."
1059 );
1060 continue;
1061 }
1062
1063 if options.exclude_sockets.contains(&socket.uuid) {
1065 debug!(
1066 "Socket {} is excluded for endpoint {}. Skipping...",
1067 socket.uuid,
1068 endpoint
1069 );
1070 continue;
1071 }
1072
1073 if !socket.can_send() {
1078 info!(
1079 "Socket {} is not outgoing for endpoint {}. Skipping...",
1080 socket.uuid,
1081 endpoint
1082 );
1083 return;
1084 }
1085 }
1086
1087 debug!(
1088 "Found matching socket {socket_uuid} for endpoint {endpoint}"
1089 );
1090 yield socket_uuid.clone()
1091 }
1092 },
1093 )
1094 }
1095
1096 fn find_known_endpoint_socket(
1098 &self,
1099 endpoint: &Endpoint,
1100 exclude_socket: &[ComInterfaceSocketUUID],
1101 ) -> Option<ComInterfaceSocketUUID> {
1102 match endpoint.instance {
1103 EndpointInstance::Any => {
1105 let options = EndpointIterateOptions {
1106 only_direct: false,
1107 exact_instance: false,
1108 exclude_sockets: exclude_socket,
1109 };
1110 if let Some(socket) =
1111 self.iterate_endpoint_sockets(endpoint, options).next()
1112 {
1113 return Some(socket);
1114 }
1115 None
1116 }
1117
1118 EndpointInstance::Instance(_) => {
1120 let options = EndpointIterateOptions {
1122 only_direct: false,
1123 exact_instance: true,
1124 exclude_sockets: exclude_socket,
1125 };
1126 if let Some(socket) =
1127 self.iterate_endpoint_sockets(endpoint, options).next()
1128 {
1129 return Some(socket);
1130 }
1131 None
1132 }
1133
1134 EndpointInstance::All => {
1136 todo!("#186 Undescribed by author.")
1137 }
1138 }
1139 }
1140
1141 fn find_best_endpoint_socket(
1145 &self,
1146 endpoint: &Endpoint,
1147 exclude_sockets: &[ComInterfaceSocketUUID],
1148 ) -> Option<ComInterfaceSocketUUID> {
1149 if endpoint == &self.endpoint
1152 && let Some(socket) = self
1153 .find_known_endpoint_socket(&Endpoint::LOCAL, exclude_sockets)
1154 {
1155 return Some(socket);
1156 }
1157
1158 let matching_socket =
1160 self.find_known_endpoint_socket(endpoint, exclude_sockets);
1161
1162 if matching_socket.is_some() {
1164 matching_socket
1165 }
1166 else {
1168 let sockets = self.fallback_sockets.borrow();
1169 for (socket_uuid, _, _) in sockets.iter() {
1170 let socket = self.get_socket_by_uuid(socket_uuid);
1171 info!(
1172 "{}: Find best for {}: {} ({}); excluded:{}",
1173 self.endpoint,
1174 endpoint,
1175 socket_uuid,
1176 socket
1177 .lock()
1178 .unwrap()
1179 .direct_endpoint
1180 .clone()
1181 .map(|e| e.to_string())
1182 .unwrap_or("None".to_string()),
1183 exclude_sockets.contains(socket_uuid)
1184 );
1185 if !exclude_sockets.contains(socket_uuid) {
1186 return Some(socket_uuid.clone());
1187 }
1188 }
1189 None
1190 }
1191 }
1192
1193 fn get_outbound_receiver_groups(
1196 &self,
1197 block: &DXBBlock,
1200 mut exclude_sockets: Vec<ComInterfaceSocketUUID>,
1201 ) -> Option<Vec<(Option<ComInterfaceSocketUUID>, Vec<Endpoint>)>> {
1202 if let Some(receivers) = block.receivers() {
1203 if !receivers.is_empty() {
1204 let endpoint_sockets = receivers
1205 .iter()
1206 .map(|e| {
1207 if let Some(blacklist) =
1209 self.endpoint_sockets_blacklist.borrow().get(e)
1210 {
1211 exclude_sockets.extend(blacklist.iter().cloned());
1212 }
1213 let socket =
1214 self.find_best_endpoint_socket(e, &exclude_sockets);
1215 (socket, e)
1216 })
1217 .group_by(|(socket, _)| socket.clone())
1218 .into_iter()
1219 .map(|(socket, group)| {
1220 let endpoints = group
1221 .map(|(_, endpoint)| endpoint.clone())
1222 .collect::<Vec<_>>();
1223 (socket, endpoints)
1224 })
1225 .collect::<Vec<_>>();
1226
1227 Some(endpoint_sockets)
1228 } else {
1229 None
1230 }
1231 } else {
1232 None
1233 }
1234 }
1235
1236 pub fn _start_update_loop(self_rc: Rc<Self>) {
1242 if *self_rc.update_loop_running.borrow() {
1244 return;
1245 }
1246
1247 *self_rc.update_loop_running.borrow_mut() = true;
1249
1250 spawn_with_panic_notify(async move {
1251 while *self_rc.update_loop_running.borrow() {
1252 self_rc.update();
1253 sleep(Duration::from_millis(1)).await;
1254 }
1255 if let Some(sender) =
1256 self_rc.update_loop_stop_sender.borrow_mut().take()
1257 {
1258 sender.send(()).expect("Failed to send stop signal");
1259 }
1260 });
1261 }
1262
1263 pub fn update(&self) {
1268 self.update_interfaces();
1270
1271 self.update_sockets();
1273
1274 self.collect_incoming_data();
1276
1277 self.receive_incoming_blocks();
1279
1280 self.flush_outgoing_blocks();
1282 }
1283
1284 fn prepare_own_block(&self, mut block: DXBBlock) -> DXBBlock {
1287 let now = get_global_context().clone().time.lock().unwrap().now();
1289 block.routing_header.sender = self.endpoint.clone();
1290 block
1291 .block_header
1292 .flags_and_timestamp
1293 .set_creation_timestamp(now);
1294
1295 block.routing_header.distance = 1;
1297 block
1298 }
1299
1300 pub fn send_own_block(
1302 &self,
1303 mut block: DXBBlock,
1304 ) -> Result<(), Vec<Endpoint>> {
1305 block = self.prepare_own_block(block);
1306 self.block_handler.add_block_to_history(&block, None);
1308 self.send_block(block, vec![], false)
1309 }
1310
1311 pub async fn send_own_block_await_response(
1316 &self,
1317 block: DXBBlock,
1318 options: ResponseOptions,
1319 ) -> Vec<Result<Response, ResponseError>> {
1320 let context_id = block.block_header.context_id;
1321 let section_index = block.block_header.section_index;
1322
1323 let has_exact_receiver_count = block.has_exact_receiver_count();
1324 let receivers = block.get_receivers();
1325
1326 let res = self.send_own_block(block);
1327 let failed_endpoints = res.err().unwrap_or_default();
1328
1329 #[cfg(feature = "tokio_runtime")]
1331 yield_now().await;
1332
1333 let timeout = options
1334 .timeout
1335 .unwrap_or_default(self.options.default_receive_timeout);
1336
1337 if has_exact_receiver_count {
1339 if (options.resolution_strategy
1341 == ResponseResolutionStrategy::ReturnOnAnyError
1342 || options.resolution_strategy
1343 == ResponseResolutionStrategy::ReturnOnFirstResult)
1344 && !failed_endpoints.is_empty()
1345 {
1346 return receivers
1348 .iter()
1349 .map(|receiver| {
1350 if failed_endpoints.contains(receiver) {
1351 Err(ResponseError::NotReachable(receiver.clone()))
1352 } else {
1353 Err(ResponseError::EarlyAbort(receiver.clone()))
1354 }
1355 })
1356 .collect::<Vec<_>>();
1357 }
1358
1359 let mut responses = HashMap::new();
1361 let mut missing_response_count = receivers.len();
1362 for receiver in &receivers {
1363 responses.insert(
1364 receiver.clone(),
1365 if failed_endpoints.contains(receiver) {
1366 Err(ResponseError::NotReachable(receiver.clone()))
1367 } else {
1368 Err(ResponseError::NoResponseAfterTimeout(
1369 receiver.clone(),
1370 timeout,
1371 ))
1372 },
1373 );
1374 }
1375 missing_response_count -= failed_endpoints.len();
1377
1378 info!(
1379 "Waiting for responses from receivers {}",
1380 receivers
1381 .iter()
1382 .map(|e| e.to_string())
1383 .collect::<Vec<_>>()
1384 .join(",")
1385 );
1386
1387 let mut rx = self
1388 .block_handler
1389 .register_incoming_block_observer(context_id, section_index);
1390
1391 let res = task::timeout(timeout, async {
1392 while let Some(section) = rx.next().await {
1393 let mut received_response = false;
1394 let mut sender = section.get_sender();
1396 if let Some(response) = responses.get_mut(&sender) {
1398 if response.is_err() {
1400 *response = Ok(Response::ExactResponse(sender.clone(), section));
1401 missing_response_count -= 1;
1402 info!("Received expected response from {sender}");
1403 received_response = true;
1404 }
1405 else {
1407 error!("Received multiple responses from the same sender: {sender}");
1408 }
1409 }
1410 else if let Some(response) = responses.get_mut(&sender.any_instance_endpoint()) {
1412 info!("Received resolved response from {} -> {}", &sender, &sender.any_instance_endpoint());
1413 sender = sender.any_instance_endpoint();
1414 if response.is_err() {
1416 *response = Ok(Response::ResolvedResponse(sender.clone(), section));
1417 missing_response_count -= 1;
1418 received_response = true;
1419 }
1420 else {
1422 info!("Received multiple resolved responses from the {}", &sender);
1423 }
1424 }
1425 else {
1427 error!("Received response from unexpected sender: {}", &sender);
1428 }
1429
1430 if received_response && options.resolution_strategy == ResponseResolutionStrategy::ReturnOnFirstResult {
1432 for (receiver, response) in responses.iter_mut() {
1434 if receiver != &sender {
1435 *response = Err(ResponseError::EarlyAbort(receiver.clone()));
1436 }
1437 }
1438 break;
1439 }
1440
1441 if missing_response_count == 0 {
1443 break;
1444 }
1445 }
1446 }).await;
1447
1448 if res.is_err() {
1449 error!("Timeout waiting for responses");
1450 }
1451
1452 responses.into_values().collect::<Vec<_>>()
1454 }
1455 else {
1457 let mut responses = vec![];
1458
1459 let res = task::timeout(timeout, async {
1460 let mut rx = self.block_handler.register_incoming_block_observer(context_id, section_index);
1461 while let Some(section) = rx.next().await {
1462 let sender = section.get_sender();
1464 info!("Received response from {sender}");
1465 responses.push(Ok(Response::UnspecifiedResponse(section)));
1467
1468 if options.resolution_strategy == ResponseResolutionStrategy::ReturnOnFirstResult {
1470 break;
1471 }
1472 }
1473 }).await;
1474
1475 if res.is_err() {
1476 info!("Timeout waiting for responses");
1477 }
1478
1479 responses
1480 }
1481 }
1482
1483 pub fn send_block(
1490 &self,
1491 mut block: DXBBlock,
1492 exclude_sockets: Vec<ComInterfaceSocketUUID>,
1493 forked: bool,
1494 ) -> Result<(), Vec<Endpoint>> {
1495 let outbound_receiver_groups =
1496 self.get_outbound_receiver_groups(&block, exclude_sockets);
1497
1498 if outbound_receiver_groups.is_none() {
1499 error!("No outbound receiver groups found for block");
1500 return Err(vec![]);
1501 }
1502
1503 let outbound_receiver_groups = outbound_receiver_groups.unwrap();
1504
1505 let mut unreachable_endpoints = vec![];
1506
1507 let mut fork_count = if forked || outbound_receiver_groups.len() > 1 {
1512 Some(0)
1513 } else {
1514 None
1515 };
1516
1517 block.set_bounce_back(false);
1518
1519 for (receiver_socket, endpoints) in outbound_receiver_groups {
1520 if let Some(socket_uuid) = receiver_socket {
1521 self.send_block_to_endpoints_via_socket(
1522 block.clone(),
1523 &socket_uuid,
1524 &endpoints,
1525 fork_count,
1526 );
1527 } else {
1528 error!("{}: cannot send block, no receiver sockets found for endpoints {:?}", self.endpoint, endpoints.iter().map(|e| e.to_string()).collect::<Vec<_>>());
1529 unreachable_endpoints.extend(endpoints);
1530 }
1531 if let Some(count) = fork_count {
1533 fork_count = Some(count + 1);
1534 }
1535 }
1536
1537 if !unreachable_endpoints.is_empty() {
1538 return Err(unreachable_endpoints);
1539 }
1540 Ok(())
1541 }
1542
1543 fn send_block_to_endpoints_via_socket(
1546 &self,
1547 mut block: DXBBlock,
1548 socket_uuid: &ComInterfaceSocketUUID,
1549 endpoints: &[Endpoint],
1550 fork_count: Option<usize>,
1552 ) {
1553 block.set_receivers(endpoints);
1554
1555 if block.is_bounce_back() {
1558 block.routing_header.distance -= 2;
1559 }
1560
1561 match block.block_header.flags_and_timestamp.block_type() {
1563 BlockType::Trace | BlockType::TraceBack => {
1564 let distance = block.routing_header.distance;
1565 let new_fork_nr = self.calculate_fork_nr(&block, fork_count);
1566 let bounce_back = block.is_bounce_back();
1567
1568 self.add_hop_to_block_trace_data(
1569 &mut block,
1570 NetworkTraceHop {
1571 endpoint: self.endpoint.clone(),
1572 distance,
1573 socket: NetworkTraceHopSocket::new(
1574 self.get_com_interface_from_socket_uuid(
1575 socket_uuid,
1576 )
1577 .borrow_mut()
1578 .get_properties(),
1579 socket_uuid.clone(),
1580 ),
1581 direction: NetworkTraceHopDirection::Outgoing,
1582 fork_nr: new_fork_nr,
1583 bounce_back,
1584 },
1585 );
1586 }
1587 _ => {}
1588 }
1589
1590 let socket = self.get_socket_by_uuid(socket_uuid);
1591 let mut socket_ref = socket.lock().unwrap();
1592
1593 let is_broadcast = endpoints
1594 .iter()
1595 .any(|e| e == &Endpoint::ANY_ALL_INSTANCES || e == &Endpoint::ANY);
1596
1597 if is_broadcast
1598 && let Some(direct_endpoint) = &socket_ref.direct_endpoint
1599 && (direct_endpoint == &self.endpoint
1600 || direct_endpoint == &Endpoint::LOCAL)
1601 {
1602 return;
1603 }
1604
1605 match &block.to_bytes() {
1606 Ok(bytes) => {
1607 info!(
1608 "Sending block to socket {}: {}",
1609 socket_uuid,
1610 endpoints.iter().map(|e| e.to_string()).join(", ")
1611 );
1612
1613 socket_ref.queue_outgoing_block(bytes);
1615 }
1616 Err(err) => {
1617 error!("Failed to convert block to bytes: {err:?}");
1618 }
1619 }
1620 }
1621
1622 fn update_interfaces(&self) {
1625 let mut to_remove = Vec::new();
1626 for (interface, _) in self.interfaces.borrow().values() {
1627 let uuid = interface.borrow().get_uuid().clone();
1628 let state = interface.borrow().get_state();
1629
1630 if state.is_destroyed() {
1634 info!("Destroying interface on the ComHub {uuid}");
1635 to_remove.push(uuid);
1636 } else if state.is_not_connected()
1637 && interface.borrow_mut().get_properties().shall_reconnect()
1638 {
1639 let interface_rc = interface.clone();
1642 let mut interface = interface.borrow_mut();
1643
1644 let already_connecting =
1645 interface.get_state() == ComInterfaceState::Connecting;
1646
1647 if !already_connecting {
1648 let config = interface.get_properties_mut();
1649
1650 let reconnect_now = match &config.reconnection_config {
1651 ReconnectionConfig::InstantReconnect => true,
1652 ReconnectionConfig::ReconnectWithTimeout { timeout } => {
1653 ReconnectionConfig::check_reconnect_timeout(
1654 config.close_timestamp,
1655 timeout,
1656 )
1657 }
1658 ReconnectionConfig::ReconnectWithTimeoutAndAttempts {
1659 timeout,
1660 attempts,
1661 } => {
1662 let max_attempts = attempts;
1663
1664 let attempts = config.reconnect_attempts.unwrap_or(0);
1666 let attempts = attempts + 1;
1667 if attempts > *max_attempts {
1668 to_remove.push(uuid.clone());
1669 return;
1670 }
1671
1672 config.reconnect_attempts = Some(attempts);
1673
1674 ReconnectionConfig::check_reconnect_timeout(
1675 config.close_timestamp,
1676 timeout,
1677 )
1678 }
1679 ReconnectionConfig::NoReconnect => false,
1680 };
1681 if reconnect_now {
1682 debug!("Reconnecting interface {uuid}");
1683 interface.set_state(ComInterfaceState::Connecting);
1684 spawn_with_panic_notify(async move {
1685 let interface = interface_rc.clone();
1686 let mut interface = interface.borrow_mut();
1687
1688 let config = interface.get_properties_mut();
1689 config.close_timestamp = None;
1690
1691 let current_attempts =
1692 config.reconnect_attempts.unwrap_or(0);
1693 config.reconnect_attempts =
1694 Some(current_attempts + 1);
1695
1696 let res = interface.handle_open().await;
1697 if res {
1698 interface
1699 .set_state(ComInterfaceState::Connected);
1700 } else {
1702 interface
1703 .set_state(ComInterfaceState::NotConnected);
1704 }
1705 });
1706 } else {
1707 debug!("Not reconnecting interface {uuid}");
1708 }
1709 }
1710 }
1711 }
1712
1713 for uuid in to_remove {
1714 self.cleanup_interface(uuid);
1715 }
1716 }
1717
1718 fn update_sockets(&self) {
1721 let mut new_sockets = Vec::new();
1722 let mut deleted_sockets = Vec::new();
1723 let mut registered_sockets = Vec::new();
1724
1725 for (interface, priority) in self.interfaces.borrow().values() {
1726 let socket_updates = interface.clone().borrow().get_sockets();
1727 let mut socket_updates = socket_updates.lock().unwrap();
1728
1729 registered_sockets
1730 .extend(socket_updates.socket_registrations.drain(..));
1731 new_sockets.extend(
1732 socket_updates.new_sockets.drain(..).map(|s| (s, *priority)),
1733 );
1734 deleted_sockets.extend(socket_updates.deleted_sockets.drain(..));
1735 }
1736
1737 for (socket, priority) in new_sockets {
1738 self.add_socket(socket.clone(), priority);
1739 }
1740 for socket_uuid in deleted_sockets {
1741 self.delete_socket(&socket_uuid);
1742 }
1743 for (socket_uuid, distance, endpoint) in registered_sockets {
1744 let socket = self.get_socket_by_uuid(&socket_uuid);
1745 self.register_socket_endpoint(socket, endpoint.clone(), distance)
1746 .unwrap_or_else(|e| {
1747 error!(
1748 "Failed to register socket {socket_uuid} for endpoint {endpoint} {e:?}"
1749 );
1750 });
1751 }
1752 }
1753
1754 fn collect_incoming_data(&self) {
1757 for (socket, _) in self.sockets.borrow().values() {
1759 let mut socket_ref = socket.lock().unwrap();
1760 socket_ref.collect_incoming_data();
1761 }
1762 }
1763
1764 fn receive_incoming_blocks(&self) {
1767 let mut blocks = vec![];
1768 for (socket, _) in self.sockets.borrow().values() {
1770 let mut socket_ref = socket.lock().unwrap();
1771 let uuid = socket_ref.uuid.clone();
1772 let block_queue = socket_ref.get_incoming_block_queue();
1773 blocks.push((uuid, block_queue.drain(..).collect::<Vec<_>>()));
1774 }
1775
1776 for (uuid, blocks) in blocks {
1777 for block in blocks.iter() {
1778 self.receive_block(block, uuid.clone());
1779 }
1780 }
1781 }
1782
1783 fn flush_outgoing_blocks(&self) {
1785 let interfaces = self.interfaces.borrow();
1786 for (interface, _) in interfaces.values() {
1787 com_interface::flush_outgoing_blocks(interface.clone());
1788 }
1789 }
1790}
1791
1792#[derive(Default, PartialEq, Debug)]
1793pub enum ResponseResolutionStrategy {
1794 #[default]
1801 ReturnAfterAllSettled,
1802
1803 ReturnOnAnyError,
1811
1812 ReturnOnFirstResponse,
1815
1816 ReturnOnFirstResult,
1819}
1820
1821#[derive(Default, Debug)]
1822pub enum ResponseTimeout {
1823 #[default]
1824 Default,
1825 Custom(Duration),
1826}
1827
1828impl ResponseTimeout {
1829 pub fn unwrap_or_default(self, default: Duration) -> Duration {
1830 match self {
1831 ResponseTimeout::Default => default,
1832 ResponseTimeout::Custom(timeout) => timeout,
1833 }
1834 }
1835}
1836
1837#[derive(Default, Debug)]
1838pub struct ResponseOptions {
1839 pub resolution_strategy: ResponseResolutionStrategy,
1840 pub timeout: ResponseTimeout,
1841}
1842
1843impl ResponseOptions {
1844 pub fn new_with_resolution_strategy(
1845 resolution_strategy: ResponseResolutionStrategy,
1846 ) -> Self {
1847 Self {
1848 resolution_strategy,
1849 ..ResponseOptions::default()
1850 }
1851 }
1852
1853 pub fn new_with_timeout(timeout: Duration) -> Self {
1854 Self {
1855 timeout: ResponseTimeout::Custom(timeout),
1856 ..ResponseOptions::default()
1857 }
1858 }
1859}
1860
1861#[derive(Debug)]
1862pub enum Response {
1863 ExactResponse(Endpoint, IncomingSection),
1864 ResolvedResponse(Endpoint, IncomingSection),
1865 UnspecifiedResponse(IncomingSection),
1866}
1867
1868impl Response {
1869 pub fn take_incoming_section(self) -> IncomingSection {
1870 match self {
1871 Response::ExactResponse(_, section) => section,
1872 Response::ResolvedResponse(_, section) => section,
1873 Response::UnspecifiedResponse(section) => section,
1874 }
1875 }
1876}
1877
1878#[derive(Debug)]
1879pub enum ResponseError {
1880 NoResponseAfterTimeout(Endpoint, Duration),
1881 NotReachable(Endpoint),
1882 EarlyAbort(Endpoint),
1883}
1884
1885impl Display for ResponseError {
1886 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1887 match self {
1888 ResponseError::NoResponseAfterTimeout(endpoint, duration) => {
1889 write!(
1890 f,
1891 "No response after timeout ({}s) for endpoint {}",
1892 duration.as_secs(),
1893 endpoint
1894 )
1895 }
1896 ResponseError::NotReachable(endpoint) => {
1897 write!(f, "Endpoint {endpoint} is not reachable")
1898 }
1899 ResponseError::EarlyAbort(endpoint) => {
1900 write!(f, "Early abort for endpoint {endpoint}")
1901 }
1902 }
1903 }
1904}