1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
use crate::{node_info::NodeAddress, packet::*, Executor};
use std::sync::Arc;
use tokio::{
net::UdpSocket,
sync::{mpsc, oneshot},
};
use tracing::{debug, trace, warn};
pub struct OutboundPacket {
pub node_address: NodeAddress,
pub packet: Packet,
}
pub(crate) struct SendHandler {
send: Arc<UdpSocket>,
handler_recv: mpsc::Receiver<OutboundPacket>,
exit: oneshot::Receiver<()>,
}
impl SendHandler {
pub(crate) fn spawn(
executor: Box<dyn Executor>,
send: Arc<UdpSocket>,
) -> (mpsc::Sender<OutboundPacket>, oneshot::Sender<()>) {
let (exit_send, exit) = oneshot::channel();
let (handler_send, handler_recv) = mpsc::channel(30);
let mut send_handler = SendHandler {
send,
handler_recv,
exit,
};
executor.spawn(Box::pin(async move {
debug!("Send handler starting");
send_handler.start().await;
}));
(handler_send, exit_send)
}
async fn start(&mut self) {
loop {
tokio::select! {
Some(packet) = self.handler_recv.recv() => {
let encoded_packet = packet.packet.encode(&packet.node_address.node_id);
if encoded_packet.len() > MAX_PACKET_SIZE {
warn!("Sending packet larger than max size: {} max: {}", encoded_packet.len(), MAX_PACKET_SIZE);
}
if let Err(e) = self.send.send_to(&encoded_packet, &packet.node_address.socket_addr).await {
trace!("Could not send packet. Error: {:?}", e);
}
}
_ = &mut self.exit => {
debug!("Send handler shutdown");
return;
}
}
}
}
}