use std::rc::Rc;
use std::cell::RefCell;
use std::collections::{VecDeque, HashMap, hash_map::Entry};
use std::sync::mpsc::{Sender, Receiver};
use timely_bytes::arc::Bytes;
use crate::networking::MessageHeader;
use crate::{Allocate, Push, Pull};
use crate::allocator::{Process, ProcessBuilder, Exchangeable};
use crate::allocator::canary::Canary;
use crate::allocator::zero_copy::bytes_slab::BytesRefill;
use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue};
use super::push_pull::{Pusher, PullerInner};
pub struct TcpBuilder {
inner: ProcessBuilder,
index: usize, peers: usize, futures: Vec<Receiver<MergeQueue>>, promises: Vec<Sender<MergeQueue>>, refill: BytesRefill,
}
pub(crate) fn new_vector(
allocators: Vec<ProcessBuilder>,
my_process: usize,
processes: usize,
refill: BytesRefill,
) -> (Vec<TcpBuilder>,
Vec<Vec<Sender<MergeQueue>>>,
Vec<Vec<Receiver<MergeQueue>>>)
{
let threads = allocators.len();
let (network_promises, worker_futures) = crate::promise_futures(processes-1, threads);
let (worker_promises, network_futures) = crate::promise_futures(threads, processes-1);
let builders =
allocators
.into_iter()
.zip(worker_promises)
.zip(worker_futures)
.enumerate()
.map(|(index, ((inner, promises), futures))| {
TcpBuilder {
inner,
index: my_process * threads + index,
peers: threads * processes,
promises,
futures,
refill: refill.clone(),
}})
.collect();
(builders, network_promises, network_futures)
}
impl TcpBuilder {
pub fn build(self) -> TcpAllocator {
let mut recvs = Vec::with_capacity(self.peers);
for promise in self.promises.into_iter() {
let buzzer = crate::buzzer::Buzzer::default();
let queue = MergeQueue::new(buzzer);
promise.send(queue.clone()).expect("Failed to send MergeQueue");
recvs.push(queue.clone());
}
let mut sends = Vec::with_capacity(self.peers);
for pusher in self.futures.into_iter() {
let queue = pusher.recv().expect("Failed to receive push queue");
let sendpoint = SendEndpoint::new(queue, self.refill.clone());
sends.push(Rc::new(RefCell::new(sendpoint)));
}
TcpAllocator {
inner: self.inner.build(),
index: self.index,
peers: self.peers,
canaries: Rc::new(RefCell::new(Vec::new())),
channel_id_bound: None,
staged: Vec::new(),
sends,
recvs,
to_local: HashMap::new(),
}
}
}
pub struct TcpAllocator {
inner: Process,
index: usize, peers: usize,
staged: Vec<Bytes>, canaries: Rc<RefCell<Vec<usize>>>,
channel_id_bound: Option<usize>,
sends: Vec<Rc<RefCell<SendEndpoint<MergeQueue>>>>, recvs: Vec<MergeQueue>, to_local: HashMap<usize, Rc<RefCell<VecDeque<Bytes>>>>, }
impl Allocate for TcpAllocator {
fn index(&self) -> usize { self.index }
fn peers(&self) -> usize { self.peers }
fn allocate<T: Exchangeable>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>) {
if let Some(bound) = self.channel_id_bound {
assert!(bound < identifier);
}
self.channel_id_bound = Some(identifier);
let mut pushes = Vec::<Box<dyn Push<T>>>::new();
let inner_peers = self.inner.peers();
let (mut inner_sends, inner_recv) = self.inner.allocate(identifier);
for target_index in 0 .. self.peers() {
let mut process_id = target_index / inner_peers;
if process_id == self.index / inner_peers {
pushes.push(inner_sends.remove(0));
}
else {
let header = MessageHeader {
channel: identifier,
source: self.index,
target_lower: target_index,
target_upper: target_index + 1,
length: 0,
seqno: 0,
};
if process_id > self.index / inner_peers { process_id -= 1; }
pushes.push(Box::new(Pusher::new(header, Rc::clone(&self.sends[process_id]))));
}
}
let channel = Rc::clone(self.to_local.entry(identifier).or_default());
use crate::allocator::counters::Puller as CountPuller;
let canary = Canary::new(identifier, Rc::clone(&self.canaries));
let puller = Box::new(CountPuller::new(PullerInner::new(inner_recv, channel, canary), identifier, Rc::clone(self.events())));
(pushes, puller, )
}
fn broadcast<T: Exchangeable + Clone>(&mut self, identifier: usize) -> (Box<dyn Push<T>>, Box<dyn Pull<T>>) {
if let Some(bound) = self.channel_id_bound {
assert!(bound < identifier);
}
self.channel_id_bound = Some(identifier);
let mut pushes = Vec::<Box<dyn Push<T>>>::with_capacity(self.sends.len() + 1);
let inner_peers = self.inner.peers();
let (inner_send, inner_recv) = self.inner.broadcast(identifier);
pushes.push(inner_send);
for (mut index, send) in self.sends.iter().enumerate() {
if index >= self.index/inner_peers { index += 1; }
let header = MessageHeader {
channel: identifier,
source: self.index,
target_lower: index * inner_peers,
target_upper: index * inner_peers + inner_peers,
length: 0,
seqno: 0,
};
pushes.push(Box::new(Pusher::new(header, Rc::clone(send))))
}
let channel = Rc::clone(self.to_local.entry(identifier).or_default());
use crate::allocator::counters::Puller as CountPuller;
let canary = Canary::new(identifier, Rc::clone(&self.canaries));
let puller = Box::new(CountPuller::new(PullerInner::new(inner_recv, channel, canary), identifier, Rc::clone(self.events())));
let pushes = Box::new(crate::allocator::Broadcaster { spare: None, pushers: pushes });
(pushes, puller, )
}
#[inline(never)]
fn receive(&mut self) {
let mut canaries = self.canaries.borrow_mut();
for dropped_channel in canaries.drain(..) {
let _dropped =
self.to_local
.remove(&dropped_channel)
.expect("non-existent channel dropped");
}
::std::mem::drop(canaries);
self.inner.receive();
for recv in self.recvs.iter_mut() {
recv.drain_into(&mut self.staged);
}
let mut events = self.inner.events().borrow_mut();
for mut bytes in self.staged.drain(..) {
while !bytes.is_empty() {
if let Some(header) = MessageHeader::try_read(&bytes[..]) {
let mut peel = bytes.extract_to(header.required_bytes());
let _ = peel.extract_to(header.header_bytes());
events.push(header.channel);
match self.to_local.entry(header.channel) {
Entry::Vacant(entry) => {
if self.channel_id_bound.map(|b| b < header.channel).unwrap_or(true) {
entry.insert(Rc::new(RefCell::new(VecDeque::new())))
.borrow_mut()
.push_back(peel);
}
}
Entry::Occupied(mut entry) => {
entry.get_mut().borrow_mut().push_back(peel);
}
}
}
else {
println!("failed to read full header!");
}
}
}
}
fn release(&mut self) {
for send in self.sends.iter_mut() {
send.borrow_mut().publish();
}
}
fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
self.inner.events()
}
fn await_events(&self, duration: Option<std::time::Duration>) {
self.inner.await_events(duration);
}
}