kadcast/transport/
sockets.rs1use std::io;
8use std::net::SocketAddr;
9use std::time::Duration;
10
11use tokio::net::UdpSocket;
12use tokio::runtime::Handle;
13use tokio::task::block_in_place;
14use tokio::time::{self, timeout, Interval};
15use tracing::{info, warn};
16
17use super::encoding::Configurable;
18use crate::config::NetworkConfig;
19
20const MIN_RETRY_COUNT: u8 = 1;
21
22pub(super) struct MultipleOutSocket {
23 ipv4: UdpSocket,
24 ipv6: UdpSocket,
25 udp_backoff_timeout: Option<Interval>,
26 retry_count: u8,
27 udp_send_retry_interval: Duration,
28}
29
30impl Configurable for MultipleOutSocket {
31 type TConf = NetworkConfig;
32 fn default_configuration() -> Self::TConf {
33 NetworkConfig::default()
34 }
35 fn configure(conf: &Self::TConf) -> Self {
36 let udp_backoff_timeout =
37 conf.udp_send_backoff_timeout.map(time::interval);
38 let retry_count = {
39 match conf.udp_send_retry_count > MIN_RETRY_COUNT {
40 true => conf.udp_send_retry_count,
41 _ => MIN_RETRY_COUNT,
42 }
43 };
44 let udp_send_retry_interval = conf.udp_send_retry_interval;
45
46 let ipv4 = block_in_place(move || {
47 Handle::current()
48 .block_on(async move { UdpSocket::bind("0.0.0.0:0").await })
49 })
50 .unwrap();
51 let ipv6 = block_in_place(move || {
52 Handle::current()
53 .block_on(async move { UdpSocket::bind("[::]:0").await })
54 })
55 .unwrap();
56 MultipleOutSocket {
57 ipv4,
58 ipv6,
59 udp_backoff_timeout,
60 retry_count,
61 udp_send_retry_interval,
62 }
63 }
64}
65
66impl MultipleOutSocket {
67 pub(super) async fn send(
68 &mut self,
69 data: &[u8],
70 remote_addr: &SocketAddr,
71 ) -> io::Result<()> {
72 if let Some(sleep) = &mut self.udp_backoff_timeout {
73 sleep.tick().await;
74 }
75 let max_retry = self.retry_count;
76
77 for i in 1..=max_retry {
78 let send_fn = match remote_addr.is_ipv4() {
79 true => self.ipv4.send_to(data, *remote_addr),
80 false => self.ipv6.send_to(data, *remote_addr),
81 };
82
83 let send = timeout(self.udp_send_retry_interval, send_fn)
84 .await
85 .map_err(|_| io::Error::new(io::ErrorKind::Other, "TIMEOUT"));
86
87 match send {
88 Ok(Ok(_)) => {
89 if i > 1 {
90 info!("Message sent, recovered from previous error");
91 }
92 return Ok(());
93 }
94 Ok(Err(e)) | Err(e) => {
95 if i < max_retry {
96 warn!("Unable to send msg, temptative {i}/{max_retry} - {e}");
97 tokio::time::sleep(self.udp_send_retry_interval).await
98 } else {
99 return Err(e);
100 }
101 }
102 }
103 }
104 unreachable!()
105 }
106}
107
108#[cfg(test)]
109mod tests {
110 use tracing::error;
111
112 use super::*;
113 use crate::peer::PeerNode;
114 use crate::tests::Result;
115 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
116 async fn test_send_peers() -> Result<()> {
117 let subscriber = tracing_subscriber::fmt::Subscriber::builder()
119 .with_max_level(tracing::Level::DEBUG)
120 .finish();
121 tracing::subscriber::set_global_default(subscriber)
126 .expect("Failed on subscribe tracing");
127 let mut socket = MultipleOutSocket::configure(
128 &MultipleOutSocket::default_configuration(),
129 );
130 let data = [0u8; 1000];
131 let root = PeerNode::generate("192.168.0.1:666", 0)?;
132 let target = root.as_peer_info().to_socket_address();
133
134 for _ in 0..1000 * 1000 {
135 if let Err(e) = socket.send(&data, &target).await {
136 error!("{e}");
137 }
138 }
139 Ok(())
140 }
141}