use std::rc::Rc;
use columnar::{Index, Len};
use timely::logging::TimelyLogger;
use timely::dataflow::channels::pushers::{Exchange, exchange::Distributor};
use timely::dataflow::channels::Message;
use timely::dataflow::channels::pact::{LogPuller, LogPusher, ParallelizationContract};
use timely::progress::Timestamp;
use timely::worker::Worker;
use super::layout::ColumnarUpdate as Update;
use super::updates::UpdatesTyped;
use super::RecordedUpdates;
pub struct ValDistributor<U: Update, H> {
marker: std::marker::PhantomData<U>,
hashfunc: H,
pre_lens: Vec<usize>,
}
impl<U: Update, H: for<'a> FnMut(columnar::Ref<'a, U::Key>)->u64> Distributor<RecordedUpdates<U>> for ValDistributor<U, H> {
fn partition<T: Clone, P: timely::communication::Push<Message<T, RecordedUpdates<U>>>>(&mut self, container: &mut RecordedUpdates<U>, time: &T, pushers: &mut [P]) {
use super::updates::child_range;
let view = container.updates.view();
let keys_b = view.keys;
let mut outputs: Vec<UpdatesTyped<U>> = (0..pushers.len()).map(|_| UpdatesTyped::default()).collect();
for outer in 0..Len::len(&keys_b) {
self.pre_lens.clear();
self.pre_lens.extend(outputs.iter().map(|o| o.keys.values.len()));
if pushers.len().is_power_of_two() {
let mask = (pushers.len() - 1) as u64;
for k in child_range(keys_b.bounds, outer) {
let key = keys_b.values.get(k);
let h = (self.hashfunc)(key);
let idx = (h & mask) as usize;
outputs[idx].extend_from_keys(view, k..k+1);
}
}
else {
let pushers_len = pushers.len() as u64;
for k in child_range(keys_b.bounds, outer) {
let key = keys_b.values.get(k);
let h = (self.hashfunc)(key);
let idx = (h % pushers_len) as usize;
outputs[idx].extend_from_keys(view, k..k+1);
}
}
for (output, &pre) in outputs.iter_mut().zip(self.pre_lens.iter()) {
if output.keys.values.len() > pre {
output.keys.bounds.push(output.keys.values.len() as u64);
}
}
}
let total_records = container.records;
let non_empty: usize = outputs.iter().filter(|o| !o.keys.values.is_empty()).count();
let mut first_records = total_records.saturating_sub(non_empty.saturating_sub(1));
for (pusher, output) in pushers.iter_mut().zip(outputs) {
if !output.keys.values.is_empty() {
let recorded = RecordedUpdates { updates: output.into(), records: first_records, consolidated: container.consolidated };
first_records = 1;
let mut recorded = recorded;
Message::push_at(&mut recorded, time.clone(), pusher);
}
}
}
fn flush<T: Clone, P: timely::communication::Push<Message<T, RecordedUpdates<U>>>>(&mut self, _time: &T, _pushers: &mut [P]) { }
fn relax(&mut self) { }
}
pub struct ValPact<H> {
pub hashfunc: H,
}
impl<T, U, H> ParallelizationContract<T, RecordedUpdates<U>> for ValPact<H>
where
T: Timestamp,
U: Update,
H: for<'a> FnMut(columnar::Ref<'a, U::Key>)->u64 + 'static,
{
type Pusher = Exchange<
T,
LogPusher<Box<dyn timely::communication::Push<Message<T, RecordedUpdates<U>>>>>,
ValDistributor<U, H>
>;
type Puller = LogPuller<Box<dyn timely::communication::Pull<Message<T, RecordedUpdates<U>>>>>;
fn connect(self, worker: &Worker, identifier: usize, address: Rc<[usize]>, logging: Option<TimelyLogger>) -> (Self::Pusher, Self::Puller) {
let (senders, receiver) = worker.allocate::<Message<T, RecordedUpdates<U>>>(identifier, address);
let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, worker.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
let distributor = ValDistributor {
marker: std::marker::PhantomData,
hashfunc: self.hashfunc,
pre_lens: Vec::new(),
};
(Exchange::new(senders, distributor), LogPuller::new(receiver, worker.index(), identifier, logging.clone()))
}
}