use crate::ContainerBuilder;
use crate::communication::Push;
use crate::container::{DrainContainer, PushInto};
use crate::dataflow::channels::Message;
pub trait Distributor<C> {
fn partition<T: Clone, P: Push<Message<T, C>>>(&mut self, container: &mut C, time: &T, pushers: &mut [P]);
fn flush<T: Clone, P: Push<Message<T, C>>>(&mut self, time: &T, pushers: &mut [P]);
fn relax(&mut self) { }
}
pub struct DrainContainerDistributor<CB, H> {
builders: Vec<CB>,
hash_func: H,
}
impl<CB: Default, H> DrainContainerDistributor<CB, H> {
pub fn new(hash_func: H, peers: usize) -> Self {
Self {
builders: std::iter::repeat_with(Default::default).take(peers).collect(),
hash_func,
}
}
}
impl<CB, H> Distributor<CB::Container> for DrainContainerDistributor<CB, H>
where
CB: ContainerBuilder<Container: DrainContainer> + for<'a> PushInto<<CB::Container as DrainContainer>::Item<'a>>,
for<'a> H: FnMut(&<CB::Container as DrainContainer>::Item<'a>) -> u64,
{
fn partition<T: Clone, P: Push<Message<T, CB::Container>>>(&mut self, container: &mut CB::Container, time: &T, pushers: &mut [P]) {
debug_assert_eq!(self.builders.len(), pushers.len());
if pushers.len().is_power_of_two() {
let mask = (pushers.len() - 1) as u64;
for datum in container.drain() {
let index = ((self.hash_func)(&datum) & mask) as usize;
self.builders[index].push_into(datum);
while let Some(produced) = self.builders[index].extract() {
Message::push_at(produced, time.clone(), &mut pushers[index]);
}
}
}
else {
let num_pushers = pushers.len() as u64;
for datum in container.drain() {
let index = ((self.hash_func)(&datum) % num_pushers) as usize;
self.builders[index].push_into(datum);
while let Some(produced) = self.builders[index].extract() {
Message::push_at(produced, time.clone(), &mut pushers[index]);
}
}
}
}
fn flush<T: Clone, P: Push<Message<T, CB::Container>>>(&mut self, time: &T, pushers: &mut [P]) {
for (builder, pusher) in self.builders.iter_mut().zip(pushers.iter_mut()) {
while let Some(container) = builder.finish() {
Message::push_at(container, time.clone(), pusher);
}
}
}
fn relax(&mut self) {
for builder in &mut self.builders {
builder.relax();
}
}
}
pub struct Exchange<T, P, D> {
pushers: Vec<P>,
current: Option<T>,
distributor: D,
}
impl<T: Clone, P, D> Exchange<T, P, D> {
pub fn new(pushers: Vec<P>, distributor: D) -> Exchange<T, P, D> {
Exchange {
pushers,
current: None,
distributor,
}
}
}
impl<T: Eq+Clone, C, P, D> Push<Message<T, C>> for Exchange<T, P, D>
where
P: Push<Message<T, C>>,
D: Distributor<C>,
{
#[inline(never)]
fn push(&mut self, message: &mut Option<Message<T, C>>) {
if self.pushers.len() == 1 {
self.pushers[0].push(message);
}
else if let Some(message) = message {
let time = &message.time;
let data = &mut message.data;
match self.current.as_ref() {
Some(current_time) if current_time != time => {
self.distributor.flush(current_time, &mut self.pushers);
self.current = Some(time.clone());
}
None => self.current = Some(time.clone()),
_ => {}
}
self.distributor.partition(data, time, &mut self.pushers);
}
else {
if let Some(time) = self.current.take() {
self.distributor.flush(&time, &mut self.pushers);
}
self.distributor.relax();
for index in 0..self.pushers.len() {
self.pushers[index].done();
}
}
}
}