discv5/socket/
send.rs

1//! This is a standalone task that encodes and sends Discv5 UDP packets
2use crate::{metrics::METRICS, node_info::NodeAddress, packet::*, Executor};
3use std::{net::SocketAddr, sync::Arc};
4use tokio::{
5    net::UdpSocket,
6    sync::{mpsc, oneshot},
7};
8use tracing::{debug, error, trace, warn};
9
10pub struct OutboundPacket {
11    /// The destination node address
12    pub node_address: NodeAddress,
13    /// The packet to be encoded.
14    pub packet: Packet,
15}
16
17/// The main task that handles outbound UDP packets.
18pub(crate) struct SendHandler {
19    /// The UDP send socket for IPv4.
20    send_ipv4: Option<Arc<UdpSocket>>,
21    /// The UDP send socket for IPv6.
22    send_ipv6: Option<Arc<UdpSocket>>,
23    /// The channel to respond to send requests.
24    handler_recv: mpsc::Receiver<OutboundPacket>,
25    /// Exit channel to shutdown the handler.
26    exit: oneshot::Receiver<()>,
27}
28
29enum Error {
30    Io(std::io::Error),
31    SocketMismatch,
32}
33
34impl SendHandler {
35    /// Spawns the `SendHandler` on a provided executor.
36    /// This returns the sending channel to process `OutboundPacket`'s and an exit channel to
37    /// shutdown the handler.
38    pub(crate) fn spawn(
39        executor: Box<dyn Executor>,
40        send_ipv4: Option<Arc<UdpSocket>>,
41        send_ipv6: Option<Arc<UdpSocket>>,
42    ) -> (mpsc::Sender<OutboundPacket>, oneshot::Sender<()>) {
43        let (exit_send, exit) = oneshot::channel();
44        let (handler_send, handler_recv) = mpsc::channel(30);
45
46        let mut send_handler = SendHandler {
47            send_ipv4,
48            send_ipv6,
49            handler_recv,
50            exit,
51        };
52
53        // start the handler
54        executor.spawn(Box::pin(async move {
55            debug!("Send handler starting");
56            send_handler.start().await;
57        }));
58        (handler_send, exit_send)
59    }
60
61    /// The main future driving the send handler. This will shutdown when the exit future is fired.
62    async fn start(&mut self) {
63        loop {
64            tokio::select! {
65                Some(packet) = self.handler_recv.recv() => {
66                    let encoded_packet = packet.packet.encode(&packet.node_address.node_id);
67                    if encoded_packet.len() > MAX_PACKET_SIZE {
68                        warn!(
69                            size = encoded_packet.len(),
70                            max = MAX_PACKET_SIZE,
71                            "Sending packet larger than max size"
72                        );
73                    }
74                    let addr = &packet.node_address.socket_addr;
75                    if let Err(e) = self.send(&encoded_packet, addr).await {
76                        match e {
77                            Error::Io(e) => {
78                                trace!(%addr, error = %e, "Could not send packet.");
79                            },
80                            Error::SocketMismatch => {
81                                error!(%addr, "Socket mismatch attempting to send a packet.")
82                            }
83                        }
84                    } else {
85                        METRICS.add_sent_bytes(encoded_packet.len());
86                    }
87                }
88                _ = &mut self.exit => {
89                    debug!("Send handler shutdown");
90                    return;
91                }
92            }
93        }
94    }
95
96    async fn send(&self, encoded_packet: &[u8], socket_addr: &SocketAddr) -> Result<usize, Error> {
97        let socket = match socket_addr {
98            SocketAddr::V4(_) => {
99                if let Some(socket) = self.send_ipv4.as_ref() {
100                    socket
101                } else {
102                    return Err(Error::SocketMismatch);
103                }
104            }
105            SocketAddr::V6(_) => {
106                if let Some(socket) = self.send_ipv6.as_ref() {
107                    socket
108                } else {
109                    return Err(Error::SocketMismatch);
110                }
111            }
112        };
113
114        socket
115            .send_to(encoded_packet, socket_addr)
116            .await
117            .map_err(Error::Io)
118    }
119}