udp2p_transport/
handler.rs

1#![allow(dead_code)]
2use std::sync::mpsc::Sender;
3use udp2p_protocol::protocol::{AckMessage, Message, KadMessage, Packet, Header, InnerKey};
4use std::net::{SocketAddr, UdpSocket};
5use udp2p_gd_udp::gd_udp::GDUdp;
6use std::collections::HashMap;
7use udp2p_utils::utils::ByteRep;
8use log::info;
9
10/// The core struct of the handler module
11/// Contains an outgoing message sender
12/// an incoming acknowldgement sender
13/// a hashmap of pending message packets
14/// a kad sender for sending messages to a kademlia instance
15/// and a gossip sender for sending messages to a gossip instance
16/// 
17/// TODO: make kad_tx and gossip_tx optional
18pub struct MessageHandler {
19    om_tx: Sender<(SocketAddr, Message)>,
20    ia_tx: Sender<AckMessage>,
21    pending: HashMap<InnerKey, HashMap<usize, Packet>>,
22    kad_tx: Sender<(SocketAddr, KadMessage)>,
23    gossip_tx: Sender<(SocketAddr, Message)>,
24}
25
26impl MessageHandler {
27    /// Creates a new mesasge handler instance
28    /// 
29    /// # Arguments
30    /// 
31    /// * om_tx - an outgoing message sender that sends a tuple of a SocketAddress (destination) and Message to send to the transport layer
32    /// * ia_tx - an incoming acknowledgement sender that sends an acknowledgement message to the transport (or GDUDP) layer
33    /// * pending - a hashmap containing a message key as the key and a hashmap of derived packets to store the packets until all are received and message can be reassembled
34    /// * kad_tx - a sender to send a tuple of the sender and the kad message to a kademlia dht
35    /// * gossip_tx - a sender to send a tuple of the sender address and the message to the gossip instance
36    /// 
37    pub fn new(
38        om_tx: Sender<(SocketAddr, Message)>,
39        ia_tx: Sender<AckMessage>,
40        pending: HashMap<InnerKey, HashMap<usize, Packet>>,
41        kad_tx: Sender<(SocketAddr, KadMessage)>,
42        gossip_tx: Sender<(SocketAddr, Message)>
43    ) -> MessageHandler {
44        MessageHandler {
45            om_tx,
46            ia_tx,
47            pending,
48            kad_tx,
49            gossip_tx,
50        }
51
52    }
53
54    /// Receives a message to the UDP socket buffer and processes the packet
55    /// 
56    /// # Arguments
57    /// 
58    /// * sock - the UDP socket to read messages into the buffer from
59    /// * buf - the buffer to write incoming bytes to
60    /// * local - the local socket address.
61    pub fn recv_msg(&mut self, sock: &UdpSocket, buf: &mut [u8], local: SocketAddr) {
62        let res = sock.recv_from(buf);
63        match res {
64            Ok((amt, src)) => {
65                info!("Received {:?} bytes from {:?}", amt, src);
66                if let Some(packet) = self.process_packet(local, buf.to_vec(), amt, src) {
67                    self.insert_packet(packet, src)
68                }
69            }
70            Err(_) => {}
71        }
72    }
73
74    /// Processes the packet, sends an acknowledgement if requested, and returns the packet
75    /// 
76    /// # Arguments
77    /// 
78    /// * local - the local nodes socket address
79    /// * buf - a vector of u8 bytes to write packet bytes to
80    /// * amt - the number of bytes received by the socket
81    /// * src - the sender of the message
82    /// 
83    pub fn process_packet(&self, local: SocketAddr, buf: Vec<u8>, amt: usize, src: SocketAddr) -> Option<Packet> {
84        if let Some(packet) = Packet::from_bytes(&buf[..amt]){
85            if packet.ret == GDUdp::RETURN_RECEIPT {
86                let ack = AckMessage {
87                    packet_id: packet.id,
88                    packet_number: packet.n,
89                    src: local.to_string().as_bytes().to_vec()
90                };
91                let header = Header::Ack;
92                let message = Message {
93                    head: header,
94                    msg: ack.as_bytes().unwrap()
95                };
96
97                if let Err(_) = self.om_tx.clone().send((src, message)) {
98                    println!("Error sending ack message to transport thread");
99                }
100            }
101        return Some(packet)
102    }
103        None 
104    }
105
106    /// Inserts a packet into the pending table, and checks if all the packets for the message they're dervied
107    /// from. If so it reassembles the message and calls handle_message
108    /// 
109    /// # Arguments
110    /// 
111    /// * packet - the packet received
112    /// * src - the sender of the packet
113    /// 
114    pub fn insert_packet(&mut self, packet: Packet, src: SocketAddr) {
115        if let Some(map) = self.pending.clone().get_mut(&packet.id) {
116            map.entry(packet.n).or_insert(packet.clone());
117            self.pending.insert(packet.id, map.clone());
118            if map.len() == packet.total_n {
119                if let Some(message) = self.assemble_packets(packet.clone(), map.clone()) {
120                    self.handle_message(message, src);
121                }
122            }
123        } else {
124            if packet.total_n == 1 {
125                let bytes = hex::decode(&packet.bytes).unwrap();
126                if let Some(message) = Message::from_bytes(&bytes) {
127                    self.handle_message(message, src);
128                }
129            } else {
130                let mut map = HashMap::new();
131                map.insert(packet.n, packet.clone());
132                self.pending.insert(packet.id, map);
133            }
134        }
135    }
136
137    /// Assembles the packets and returns a message
138    /// 
139    /// # Arguments
140    /// 
141    /// * packet - the final packet received, used to get the total number of packets
142    /// * map - the map of all the bytes for the message that needs to be assembled
143    /// 
144    fn assemble_packets(&self, packet: Packet, map: HashMap<usize, Packet>) -> Option<Message> {
145        let mut bytes = vec![];
146        (1..=packet.total_n)
147            .into_iter()
148            .for_each(|n| {
149                let converted = hex::decode(&map[&n].bytes.clone()).unwrap();
150                bytes.extend(converted)
151            });
152        Message::from_bytes(&bytes)
153    }
154    
155    /// Handles and routes a message to the proper component
156    /// 
157    /// # Arguments
158    /// 
159    /// * message - the message to be routed
160    /// * src - the sender of the message
161    /// 
162    fn handle_message(&self, message: Message, src: SocketAddr) {
163        match message.head {
164            Header::Request | Header::Response => {
165                if let Some(msg) = KadMessage::from_bytes(&message.msg) {
166                    if let Err(_) = self.kad_tx.send((src, msg)) {
167                        println!("Error sending to kad");
168                    }
169                }
170            }
171            Header::Ack => {
172                if let Some(ack) = AckMessage::from_bytes(&message.msg) {
173                    if let Err(_) = self.ia_tx.send(ack) {
174                        println!("Error sending ack message")
175                    }
176                }                
177            }
178            Header::Gossip => {
179                if let Err(_) = self.gossip_tx.send((src, message)) {
180                    println!("Error sending to gossip");
181                }
182            }
183        }
184    }
185}