kadcast/transport/
sockets.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4//
5// Copyright (c) DUSK NETWORK. All rights reserved.
6
7use std::io;
8use std::net::SocketAddr;
9use std::time::Duration;
10
11use tokio::net::UdpSocket;
12use tokio::runtime::Handle;
13use tokio::task::block_in_place;
14use tokio::time::{self, timeout, Interval};
15use tracing::{info, warn};
16
17use super::encoding::Configurable;
18use crate::config::NetworkConfig;
19
20const MIN_RETRY_COUNT: u8 = 1;
21
22pub(super) struct MultipleOutSocket {
23    ipv4: UdpSocket,
24    ipv6: UdpSocket,
25    udp_backoff_timeout: Option<Interval>,
26    retry_count: u8,
27    udp_send_retry_interval: Duration,
28}
29
30impl Configurable for MultipleOutSocket {
31    type TConf = NetworkConfig;
32    fn default_configuration() -> Self::TConf {
33        NetworkConfig::default()
34    }
35    fn configure(conf: &Self::TConf) -> Self {
36        let udp_backoff_timeout =
37            conf.udp_send_backoff_timeout.map(time::interval);
38        let retry_count = {
39            match conf.udp_send_retry_count > MIN_RETRY_COUNT {
40                true => conf.udp_send_retry_count,
41                _ => MIN_RETRY_COUNT,
42            }
43        };
44        let udp_send_retry_interval = conf.udp_send_retry_interval;
45
46        let ipv4 = block_in_place(move || {
47            Handle::current()
48                .block_on(async move { UdpSocket::bind("0.0.0.0:0").await })
49        })
50        .unwrap();
51        let ipv6 = block_in_place(move || {
52            Handle::current()
53                .block_on(async move { UdpSocket::bind("[::]:0").await })
54        })
55        .unwrap();
56        MultipleOutSocket {
57            ipv4,
58            ipv6,
59            udp_backoff_timeout,
60            retry_count,
61            udp_send_retry_interval,
62        }
63    }
64}
65
66impl MultipleOutSocket {
67    pub(super) async fn send(
68        &mut self,
69        data: &[u8],
70        remote_addr: &SocketAddr,
71    ) -> io::Result<()> {
72        if let Some(sleep) = &mut self.udp_backoff_timeout {
73            sleep.tick().await;
74        }
75        let max_retry = self.retry_count;
76
77        for i in 1..=max_retry {
78            let send_fn = match remote_addr.is_ipv4() {
79                true => self.ipv4.send_to(data, *remote_addr),
80                false => self.ipv6.send_to(data, *remote_addr),
81            };
82
83            let send = timeout(self.udp_send_retry_interval, send_fn)
84                .await
85                .map_err(|_| io::Error::new(io::ErrorKind::Other, "TIMEOUT"));
86
87            match send {
88                Ok(Ok(_)) => {
89                    if i > 1 {
90                        info!("Message sent, recovered from previous error");
91                    }
92                    return Ok(());
93                }
94                Ok(Err(e)) | Err(e) => {
95                    if i < max_retry {
96                        warn!("Unable to send msg, temptative {i}/{max_retry} - {e}");
97                        tokio::time::sleep(self.udp_send_retry_interval).await
98                    } else {
99                        return Err(e);
100                    }
101                }
102            }
103        }
104        unreachable!()
105    }
106}
107
108#[cfg(test)]
109mod tests {
110    use tracing::error;
111
112    use super::*;
113    use crate::peer::PeerNode;
114    use crate::tests::Result;
115    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
116    async fn test_send_peers() -> Result<()> {
117        // Generate a subscriber with the desired log level.
118        let subscriber = tracing_subscriber::fmt::Subscriber::builder()
119            .with_max_level(tracing::Level::DEBUG)
120            .finish();
121        // Set the subscriber as global.
122        // so this subscriber will be used as the default in all threads for the
123        // remainder of the duration of the program, similar to how `loggers`
124        // work in the `log` crate.
125        tracing::subscriber::set_global_default(subscriber)
126            .expect("Failed on subscribe tracing");
127        let mut socket = MultipleOutSocket::configure(
128            &MultipleOutSocket::default_configuration(),
129        );
130        let data = [0u8; 1000];
131        let root = PeerNode::generate("192.168.0.1:666", 0)?;
132        let target = root.as_peer_info().to_socket_address();
133
134        for _ in 0..1000 * 1000 {
135            if let Err(e) = socket.send(&data, &target).await {
136                error!("{e}");
137            }
138        }
139        Ok(())
140    }
141}