use super::handshake;
use super::msg_types::StreamMessages;
use crate::configuration::middleware_configuration::Configuration;
use bincode::{serialize_into, serialized_size};
use crossbeam::crossbeam_channel::RecvTimeoutError;
use crossbeam::Receiver;
use std::io::{BufWriter, Write};
use std::net::TcpStream;
use std::ops::Mul;
use std::sync::{Arc, Barrier};
use std::time::Duration;
pub fn start(
stream: TcpStream,
middleware_channel: Receiver<(Arc<Barrier>, Arc<Vec<u8>>)>,
local_id: usize,
configuration: Arc<Configuration>,
) {
handshake::send_handshake(&stream, local_id);
let peer_id = handshake::finish_protocol(&stream);
let mut buffered_messages: usize = 0;
let mut buffered_bytes: u64 = 0;
let mut sender_timeout_flag: bool = true;
let mut timeout: Duration = configuration.get_stream_sender_timeout();
let mut stream = BufWriter::new(stream);
loop {
match middleware_channel.recv_timeout(timeout) {
Ok((message_barrier, msg)) => {
if !sender_timeout_flag {
sender_timeout_flag = true;
timeout = configuration.get_stream_sender_timeout();
}
message_barrier.wait();
let stream_msg = StreamMessages::Message {
msg: (*msg).clone(),
};
match serialize_into::<_, StreamMessages>(&mut stream, &stream_msg) {
Ok(_) => {
buffered_messages += 1;
buffered_bytes += serialized_size::<StreamMessages>(&stream_msg).unwrap();
}
Err(_) => {
println!(
"WARN: Stream was closed between {} and {}",
local_id, peer_id
);
break;
}
}
}
Err(e) => {
match e {
RecvTimeoutError::Disconnected => {
let stream_msg = StreamMessages::Close;
serialize_into::<_, StreamMessages>(&mut stream, &stream_msg).unwrap();
break;
}
_ => {}
}
check_buffer_flush(
&mut sender_timeout_flag,
&mut stream,
&mut buffered_messages,
&mut buffered_bytes,
&mut timeout,
&configuration,
true,
);
}
}
check_buffer_flush(
&mut sender_timeout_flag,
&mut stream,
&mut buffered_messages,
&mut buffered_bytes,
&mut timeout,
&configuration,
false,
);
}
}
pub fn calculate_timeout(
timeout_flag: bool,
timeout: Duration,
config: &Arc<Configuration>,
) -> Duration {
let ret_timeout: Duration;
if timeout_flag {
ret_timeout = config.batching.get_lower_timeout();
} else {
if timeout.as_micros() * 2 <= config.batching.get_upper_timeout().as_micros() {
ret_timeout = timeout.mul(2);
} else {
ret_timeout = config.batching.get_upper_timeout();
}
}
ret_timeout
}
pub fn check_buffer_flush(
sender_timeout_flag: &mut bool,
stream: &mut BufWriter<TcpStream>,
buffered_messages: &mut usize,
buffered_bytes: &mut u64,
timeout: &mut Duration,
configuration: &Arc<Configuration>,
error: bool,
) {
if *buffered_messages >= configuration.batching.message_number
|| *buffered_bytes > configuration.batching.size
|| (error && *buffered_messages > 0)
{
if error && *sender_timeout_flag {
*sender_timeout_flag = false;
}
stream.flush().expect("ERROR: Could not flush stream!");
*buffered_messages = 0;
*buffered_bytes = 0;
} else {
if error && *sender_timeout_flag {
*sender_timeout_flag = false;
}
if error {
*timeout = calculate_timeout(*sender_timeout_flag, *timeout, configuration);
}
}
}