udp2p_transport/handler.rs
1#![allow(dead_code)]
2use std::sync::mpsc::Sender;
3use udp2p_protocol::protocol::{AckMessage, Message, KadMessage, Packet, Header, InnerKey};
4use std::net::{SocketAddr, UdpSocket};
5use udp2p_gd_udp::gd_udp::GDUdp;
6use std::collections::HashMap;
7use udp2p_utils::utils::ByteRep;
8use log::info;
9
10/// The core struct of the handler module
11/// Contains an outgoing message sender
12/// an incoming acknowldgement sender
13/// a hashmap of pending message packets
14/// a kad sender for sending messages to a kademlia instance
15/// and a gossip sender for sending messages to a gossip instance
16///
17/// TODO: make kad_tx and gossip_tx optional
18pub struct MessageHandler {
19 om_tx: Sender<(SocketAddr, Message)>,
20 ia_tx: Sender<AckMessage>,
21 pending: HashMap<InnerKey, HashMap<usize, Packet>>,
22 kad_tx: Sender<(SocketAddr, KadMessage)>,
23 gossip_tx: Sender<(SocketAddr, Message)>,
24}
25
26impl MessageHandler {
27 /// Creates a new mesasge handler instance
28 ///
29 /// # Arguments
30 ///
31 /// * om_tx - an outgoing message sender that sends a tuple of a SocketAddress (destination) and Message to send to the transport layer
32 /// * ia_tx - an incoming acknowledgement sender that sends an acknowledgement message to the transport (or GDUDP) layer
33 /// * pending - a hashmap containing a message key as the key and a hashmap of derived packets to store the packets until all are received and message can be reassembled
34 /// * kad_tx - a sender to send a tuple of the sender and the kad message to a kademlia dht
35 /// * gossip_tx - a sender to send a tuple of the sender address and the message to the gossip instance
36 ///
37 pub fn new(
38 om_tx: Sender<(SocketAddr, Message)>,
39 ia_tx: Sender<AckMessage>,
40 pending: HashMap<InnerKey, HashMap<usize, Packet>>,
41 kad_tx: Sender<(SocketAddr, KadMessage)>,
42 gossip_tx: Sender<(SocketAddr, Message)>
43 ) -> MessageHandler {
44 MessageHandler {
45 om_tx,
46 ia_tx,
47 pending,
48 kad_tx,
49 gossip_tx,
50 }
51
52 }
53
54 /// Receives a message to the UDP socket buffer and processes the packet
55 ///
56 /// # Arguments
57 ///
58 /// * sock - the UDP socket to read messages into the buffer from
59 /// * buf - the buffer to write incoming bytes to
60 /// * local - the local socket address.
61 pub fn recv_msg(&mut self, sock: &UdpSocket, buf: &mut [u8], local: SocketAddr) {
62 let res = sock.recv_from(buf);
63 match res {
64 Ok((amt, src)) => {
65 info!("Received {:?} bytes from {:?}", amt, src);
66 if let Some(packet) = self.process_packet(local, buf.to_vec(), amt, src) {
67 self.insert_packet(packet, src)
68 }
69 }
70 Err(_) => {}
71 }
72 }
73
74 /// Processes the packet, sends an acknowledgement if requested, and returns the packet
75 ///
76 /// # Arguments
77 ///
78 /// * local - the local nodes socket address
79 /// * buf - a vector of u8 bytes to write packet bytes to
80 /// * amt - the number of bytes received by the socket
81 /// * src - the sender of the message
82 ///
83 pub fn process_packet(&self, local: SocketAddr, buf: Vec<u8>, amt: usize, src: SocketAddr) -> Option<Packet> {
84 if let Some(packet) = Packet::from_bytes(&buf[..amt]){
85 if packet.ret == GDUdp::RETURN_RECEIPT {
86 let ack = AckMessage {
87 packet_id: packet.id,
88 packet_number: packet.n,
89 src: local.to_string().as_bytes().to_vec()
90 };
91 let header = Header::Ack;
92 let message = Message {
93 head: header,
94 msg: ack.as_bytes().unwrap()
95 };
96
97 if let Err(_) = self.om_tx.clone().send((src, message)) {
98 println!("Error sending ack message to transport thread");
99 }
100 }
101 return Some(packet)
102 }
103 None
104 }
105
106 /// Inserts a packet into the pending table, and checks if all the packets for the message they're dervied
107 /// from. If so it reassembles the message and calls handle_message
108 ///
109 /// # Arguments
110 ///
111 /// * packet - the packet received
112 /// * src - the sender of the packet
113 ///
114 pub fn insert_packet(&mut self, packet: Packet, src: SocketAddr) {
115 if let Some(map) = self.pending.clone().get_mut(&packet.id) {
116 map.entry(packet.n).or_insert(packet.clone());
117 self.pending.insert(packet.id, map.clone());
118 if map.len() == packet.total_n {
119 if let Some(message) = self.assemble_packets(packet.clone(), map.clone()) {
120 self.handle_message(message, src);
121 }
122 }
123 } else {
124 if packet.total_n == 1 {
125 let bytes = hex::decode(&packet.bytes).unwrap();
126 if let Some(message) = Message::from_bytes(&bytes) {
127 self.handle_message(message, src);
128 }
129 } else {
130 let mut map = HashMap::new();
131 map.insert(packet.n, packet.clone());
132 self.pending.insert(packet.id, map);
133 }
134 }
135 }
136
137 /// Assembles the packets and returns a message
138 ///
139 /// # Arguments
140 ///
141 /// * packet - the final packet received, used to get the total number of packets
142 /// * map - the map of all the bytes for the message that needs to be assembled
143 ///
144 fn assemble_packets(&self, packet: Packet, map: HashMap<usize, Packet>) -> Option<Message> {
145 let mut bytes = vec![];
146 (1..=packet.total_n)
147 .into_iter()
148 .for_each(|n| {
149 let converted = hex::decode(&map[&n].bytes.clone()).unwrap();
150 bytes.extend(converted)
151 });
152 Message::from_bytes(&bytes)
153 }
154
155 /// Handles and routes a message to the proper component
156 ///
157 /// # Arguments
158 ///
159 /// * message - the message to be routed
160 /// * src - the sender of the message
161 ///
162 fn handle_message(&self, message: Message, src: SocketAddr) {
163 match message.head {
164 Header::Request | Header::Response => {
165 if let Some(msg) = KadMessage::from_bytes(&message.msg) {
166 if let Err(_) = self.kad_tx.send((src, msg)) {
167 println!("Error sending to kad");
168 }
169 }
170 }
171 Header::Ack => {
172 if let Some(ack) = AckMessage::from_bytes(&message.msg) {
173 if let Err(_) = self.ia_tx.send(ack) {
174 println!("Error sending ack message")
175 }
176 }
177 }
178 Header::Gossip => {
179 if let Err(_) = self.gossip_tx.send((src, message)) {
180 println!("Error sending to gossip");
181 }
182 }
183 }
184 }
185}