1use crate::{metrics::METRICS, node_info::NodeAddress, packet::*, Executor};
3use std::{net::SocketAddr, sync::Arc};
4use tokio::{
5 net::UdpSocket,
6 sync::{mpsc, oneshot},
7};
8use tracing::{debug, error, trace, warn};
9
10pub struct OutboundPacket {
11 pub node_address: NodeAddress,
13 pub packet: Packet,
15}
16
17pub(crate) struct SendHandler {
19 send_ipv4: Option<Arc<UdpSocket>>,
21 send_ipv6: Option<Arc<UdpSocket>>,
23 handler_recv: mpsc::Receiver<OutboundPacket>,
25 exit: oneshot::Receiver<()>,
27}
28
29enum Error {
30 Io(std::io::Error),
31 SocketMismatch,
32}
33
34impl SendHandler {
35 pub(crate) fn spawn(
39 executor: Box<dyn Executor>,
40 send_ipv4: Option<Arc<UdpSocket>>,
41 send_ipv6: Option<Arc<UdpSocket>>,
42 ) -> (mpsc::Sender<OutboundPacket>, oneshot::Sender<()>) {
43 let (exit_send, exit) = oneshot::channel();
44 let (handler_send, handler_recv) = mpsc::channel(30);
45
46 let mut send_handler = SendHandler {
47 send_ipv4,
48 send_ipv6,
49 handler_recv,
50 exit,
51 };
52
53 executor.spawn(Box::pin(async move {
55 debug!("Send handler starting");
56 send_handler.start().await;
57 }));
58 (handler_send, exit_send)
59 }
60
61 async fn start(&mut self) {
63 loop {
64 tokio::select! {
65 Some(packet) = self.handler_recv.recv() => {
66 let encoded_packet = packet.packet.encode(&packet.node_address.node_id);
67 if encoded_packet.len() > MAX_PACKET_SIZE {
68 warn!(
69 size = encoded_packet.len(),
70 max = MAX_PACKET_SIZE,
71 "Sending packet larger than max size"
72 );
73 }
74 let addr = &packet.node_address.socket_addr;
75 if let Err(e) = self.send(&encoded_packet, addr).await {
76 match e {
77 Error::Io(e) => {
78 trace!(%addr, error = %e, "Could not send packet.");
79 },
80 Error::SocketMismatch => {
81 error!(%addr, "Socket mismatch attempting to send a packet.")
82 }
83 }
84 } else {
85 METRICS.add_sent_bytes(encoded_packet.len());
86 }
87 }
88 _ = &mut self.exit => {
89 debug!("Send handler shutdown");
90 return;
91 }
92 }
93 }
94 }
95
96 async fn send(&self, encoded_packet: &[u8], socket_addr: &SocketAddr) -> Result<usize, Error> {
97 let socket = match socket_addr {
98 SocketAddr::V4(_) => {
99 if let Some(socket) = self.send_ipv4.as_ref() {
100 socket
101 } else {
102 return Err(Error::SocketMismatch);
103 }
104 }
105 SocketAddr::V6(_) => {
106 if let Some(socket) = self.send_ipv6.as_ref() {
107 socket
108 } else {
109 return Err(Error::SocketMismatch);
110 }
111 }
112 };
113
114 socket
115 .send_to(encoded_packet, socket_addr)
116 .await
117 .map_err(Error::Io)
118 }
119}