tcb/graph/communication/
reader.rs1use super::msg_types::StreamMessages;
2use crate::graph::structs::message::Message;
3use crate::graph::structs::message_type::ClientPeerMiddleware;
4use bincode::{deserialize, deserialize_from};
5use crossbeam::Sender;
6use std::net::TcpStream;
7use std::sync::{Arc, Barrier};
8use std::usize;
9
10pub fn start(
27 stream: TcpStream,
28 middleware_channel: Sender<ClientPeerMiddleware>,
29 local_id: usize,
30 peer_id: usize,
31 setup_end_barrier: Arc<Barrier>,
32) {
33 setup_end_barrier.wait();
34
35 loop {
36 match deserialize_from::<_, StreamMessages>(&stream) {
37 Ok(decoded_msg_type) => match decoded_msg_type {
38 StreamMessages::Message { msg } => {
39 handle_received_peer_msg(msg, &middleware_channel);
40 }
41
42 StreamMessages::Close => {
43 break;
44 }
45 m => {
46 println!("ERROR: Reader received unexpected type - {:?}", m);
47 break;
48 }
49 },
50 Err(e) => {
51 println!(
52 "ERROR: {} is closing a connection with: {}\n\t{}",
53 local_id, peer_id, e
54 );
55 break;
56 }
57 }
58 }
59}
60
61fn handle_received_peer_msg(msg: Vec<u8>, send_main_mid: &Sender<ClientPeerMiddleware>) {
62 let decoded_msg: Message = deserialize(&msg)
64 .expect("ERROR: Couldn't deserialize the Message type after reading from the stream");
65
66 let peer_msg = ClientPeerMiddleware::Peer { msg: decoded_msg };
67
68 send_main_mid
70 .send(peer_msg)
71 .expect("ERROR: Failed to send message to main middleware thread");
72}