use std::io;
use std::net::TcpStream;
use bincode;
use crossbeam;
use crossbeam_channel::{Receiver, Sender};
use log::{debug, info};
use serde::{de::DeserializeOwned, Serialize};
use hbbft::SourcedMessage;
pub struct CommsTask<'a, M> {
tx: &'a Sender<SourcedMessage<M, usize>>,
rx: &'a Receiver<M>,
stream: TcpStream,
pub node_index: usize,
}
impl<'a, M: Serialize + DeserializeOwned + Send + 'a> CommsTask<'a, M> {
pub fn new(
tx: &'a Sender<SourcedMessage<M, usize>>,
rx: &'a Receiver<M>,
stream: TcpStream,
node_index: usize,
) -> Self {
debug!(
"Creating comms task #{} for {:?}",
node_index,
stream.peer_addr().unwrap()
);
CommsTask {
tx,
rx,
stream,
node_index,
}
}
pub fn run(mut self) -> Result<(), Box<dyn std::any::Any + Send + 'static>> {
let tx = self.tx;
let rx = self.rx;
let mut stream1 = match self.stream.try_clone() {
Ok(stream) => stream,
Err(e) => return Err(Box::new(e)),
};
let node_index = self.node_index;
crossbeam::scope(move |scope| {
scope.spawn(move |_| {
loop {
let message = rx.recv().unwrap();
bincode::serialize_into(&mut stream1, &message)
.expect("message serialization failed");
}
});
debug!("Starting remote RX loop for node {}", node_index);
loop {
match bincode::deserialize_from(&mut self.stream) {
Ok(message) => {
tx.send(SourcedMessage {
source: node_index,
message,
})
.unwrap();
}
Err(err) => {
if let bincode::ErrorKind::Io(ref io_err) = *err {
if io_err.kind() == io::ErrorKind::UnexpectedEof {
info!("Node {} disconnected.", node_index);
break;
}
}
panic!("Node {} - Deserialization error {:?}", node_index, err);
}
}
}
})
}
}