use super::dot::Dot;
use super::graph::GRAPH;
use super::message_types::ClientMessage;
use crate::configuration::middleware_configuration::Configuration;
use crate::graph::structs::message::Message;
use crate::graph::structs::message_type::ClientPeerMiddleware;
use bincode::serialize;
use crossbeam::{Receiver, Sender};
use std::sync::{Arc, Barrier};
pub fn start(
local_id: usize,
peer_addresses: Vec<String>,
receive_channel: Receiver<ClientPeerMiddleware>,
client: Sender<ClientMessage>,
peer_channels: Vec<Sender<(Arc<Barrier>, Arc<Vec<u8>>)>>,
configuration: Arc<Configuration>,
) {
let mut tcb = GRAPH::new(
local_id,
peer_addresses.len() + 1,
client.clone(),
Arc::clone(&configuration),
);
loop {
match receive_channel.recv() {
Ok(ClientPeerMiddleware::Client { dot, msg, context }) => {
handle_message_from_client(&mut tcb, msg, &peer_channels, context, dot);
}
Ok(ClientPeerMiddleware::Peer { msg }) => {
tcb.receive(msg);
}
Ok(ClientPeerMiddleware::Setup) => {}
Ok(ClientPeerMiddleware::Stable { dot }) => {
tcb.deletestable(dot);
}
Ok(ClientPeerMiddleware::End) => {
handle_finished_setup(&client);
break;
}
Err(_) => {
break;
}
}
}
}
fn handle_message_from_client(
tcb: &mut GRAPH,
payload: Vec<u8>,
channels: &Vec<Sender<(Arc<Barrier>, Arc<Vec<u8>>)>>,
context: Vec<Dot>,
dot: Dot,
) {
let message = Message::new(payload, dot, context);
tcb.dequeue(message.clone());
let encoded_message: Vec<u8> =
serialize(&message).expect("ERROR: Couldn't serialize the CLIENT message");
let arc_msg = Arc::new(encoded_message);
let stream_sender_barrier = Arc::new(Barrier::new(channels.len()));
for channel in channels {
match &channel.send((Arc::clone(&stream_sender_barrier), Arc::clone(&arc_msg))) {
Ok(_) => {}
Err(e) => {
println!("ERROR: Could not send message to sender threads\n\t- {}", e);
}
}
}
}
fn handle_finished_setup(client: &Sender<ClientMessage>) {
client
.send(ClientMessage::Empty)
.expect("ERROR: Failed to send the finishing SETUP message to client");
}