use std::io::{self, Write};
use std::sync::mpsc::{Sender, Receiver};
use crate::networking::MessageHeader;
use super::bytes_slab::{BytesRefill, BytesSlab};
use super::bytes_exchange::MergeQueue;
use super::stream::Stream;
use timely_logging::Logger;
use crate::logging::{CommunicationEvent, CommunicationEventBuilder, MessageEvent, StateEvent};
fn tcp_panic(context: &'static str, cause: io::Error) -> ! {
panic!("timely communication error: {}: {}", context, cause)
}
pub fn recv_loop<S>(
mut reader: S,
targets: Vec<Receiver<MergeQueue>>,
worker_offset: usize,
process: usize,
remote: usize,
refill: BytesRefill,
logger: Option<Logger<CommunicationEventBuilder>>
)
where
S: Stream,
{
let mut logger = logger.map(|logger| logger.into_typed::<CommunicationEvent>());
logger.as_mut().map(|l| l.log(StateEvent { send: false, process, remote, start: true }));
let mut targets: Vec<MergeQueue> = targets.into_iter().map(|x| x.recv().expect("Failed to receive MergeQueue")).collect();
let mut buffer = BytesSlab::new(20, refill);
let mut stageds = Vec::with_capacity(targets.len());
for _ in 0 .. targets.len() {
stageds.push(Vec::new());
}
let mut active = true;
while active {
buffer.ensure_capacity(1);
assert!(!buffer.empty().is_empty());
let read = match reader.read(buffer.empty()) {
Err(x) => tcp_panic("reading data", x),
Ok(0) => {
tcp_panic(
"reading data",
std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "socket closed"),
);
}
Ok(n) => n,
};
buffer.make_valid(read);
while let Some(header) = MessageHeader::try_read(buffer.valid()) {
let peeled_bytes = header.required_bytes();
let bytes = buffer.extract(peeled_bytes);
logger.as_mut().map(|logger| {
logger.log(MessageEvent { is_send: false, header, });
});
if header.length > 0 {
for target in header.target_lower .. header.target_upper {
stageds[target - worker_offset].push(bytes.clone());
}
}
else {
active = false;
if !buffer.valid().is_empty() {
panic!("Clean shutdown followed by data.");
}
buffer.ensure_capacity(1);
if reader.read(buffer.empty()).unwrap_or_else(|e| tcp_panic("reading EOF", e)) > 0 {
panic!("Clean shutdown followed by data.");
}
}
}
for (index, staged) in stageds.iter_mut().enumerate() {
use crate::allocator::zero_copy::bytes_exchange::BytesPush;
targets[index].extend(staged.drain(..));
}
}
logger.as_mut().map(|l| l.log(StateEvent { send: false, process, remote, start: false, }));
}
pub fn send_loop<S: Stream>(
writer: S,
sources: Vec<Sender<MergeQueue>>,
process: usize,
remote: usize,
logger: Option<Logger<CommunicationEventBuilder>>)
{
let mut logger = logger.map(|logger| logger.into_typed::<CommunicationEvent>());
logger.as_mut().map(|l| l.log(StateEvent { send: true, process, remote, start: true, }));
let mut sources: Vec<MergeQueue> = sources.into_iter().map(|x| {
let buzzer = crate::buzzer::Buzzer::default();
let queue = MergeQueue::new(buzzer);
x.send(queue.clone()).expect("failed to send MergeQueue");
queue
}).collect();
let mut writer = ::std::io::BufWriter::with_capacity(1 << 16, writer);
let mut stash = Vec::new();
while !sources.is_empty() {
for source in sources.iter_mut() {
use crate::allocator::zero_copy::bytes_exchange::BytesPull;
source.drain_into(&mut stash);
}
if stash.is_empty() {
writer.flush().unwrap_or_else(|e| tcp_panic("flushing writer", e));
sources.retain(|source| !source.is_complete());
if !sources.is_empty() {
std::thread::park();
}
}
else {
for bytes in stash.drain(..) {
logger.as_mut().map(|logger| {
let mut offset = 0;
while let Some(header) = MessageHeader::try_read(&bytes[offset..]) {
logger.log(MessageEvent { is_send: true, header, });
offset += header.required_bytes();
}
});
writer.write_all(&bytes[..]).unwrap_or_else(|e| tcp_panic("writing data", e));
}
}
}
let header = MessageHeader {
channel: 0,
source: 0,
target_lower: 0,
target_upper: 0,
length: 0,
seqno: 0,
};
header.write_to(&mut writer).unwrap_or_else(|e| tcp_panic("writing data", e));
writer.flush().unwrap_or_else(|e| tcp_panic("flushing writer", e));
writer.get_mut().shutdown(::std::net::Shutdown::Write).unwrap_or_else(|e| tcp_panic("shutting down writer", e));
logger.as_mut().map(|logger| logger.log(MessageEvent { is_send: true, header }));
logger.as_mut().map(|l| l.log(StateEvent { send: true, process, remote, start: false, }));
}