Skip to main content

tcb/graph/communication/
reader.rs

1use super::msg_types::StreamMessages;
2use crate::graph::structs::message::Message;
3use crate::graph::structs::message_type::ClientPeerMiddleware;
4use bincode::{deserialize, deserialize_from};
5use crossbeam::Sender;
6use std::net::TcpStream;
7use std::sync::{Arc, Barrier};
8use std::usize;
9
10/**
11 * Starts a Reader thread that receives messages from a stream
12 * and sends them to the middleware.
13 *
14 * # Arguments
15 *
16 * `stream` - TCP stream between the peers.
17 *
18 * `middleware_channel` - Channel from the the Reader to the Middleware.
19 *
20 * `local_id` - Local peer's globally unique id.
21 *
22 * `peer_id` - Other peer's globally unique id.
23 *
24 * `setup_end_barrier` - Barrier signalling the middleware connected to every peer.
25 */
26pub fn start(
27    stream: TcpStream,
28    middleware_channel: Sender<ClientPeerMiddleware>,
29    local_id: usize,
30    peer_id: usize,
31    setup_end_barrier: Arc<Barrier>,
32) {
33    setup_end_barrier.wait();
34
35    loop {
36        match deserialize_from::<_, StreamMessages>(&stream) {
37            Ok(decoded_msg_type) => match decoded_msg_type {
38                StreamMessages::Message { msg } => {
39                    handle_received_peer_msg(msg, &middleware_channel);
40                }
41
42                StreamMessages::Close => {
43                    break;
44                }
45                m => {
46                    println!("ERROR: Reader received unexpected type - {:?}", m);
47                    break;
48                }
49            },
50            Err(e) => {
51                println!(
52                    "ERROR: {} is closing a connection with: {}\n\t{}",
53                    local_id, peer_id, e
54                );
55                break;
56            }
57        }
58    }
59}
60
61fn handle_received_peer_msg(msg: Vec<u8>, send_main_mid: &Sender<ClientPeerMiddleware>) {
62    //Deserializing the vec of bytes to Message struct
63    let decoded_msg: Message = deserialize(&msg)
64        .expect("ERROR: Couldn't deserialize the Message type after reading from the stream");
65
66    let peer_msg = ClientPeerMiddleware::Peer { msg: decoded_msg };
67
68    //Sending the payload to the main middleware thread
69    send_main_mid
70        .send(peer_msg)
71        .expect("ERROR: Failed to send message to main middleware thread");
72}