differential_dataflow/columnar/
exchange.rs1use std::rc::Rc;
7
8use columnar::{Index, Len};
9use timely::logging::TimelyLogger;
10use timely::dataflow::channels::pushers::{Exchange, exchange::Distributor};
11use timely::dataflow::channels::Message;
12use timely::dataflow::channels::pact::{LogPuller, LogPusher, ParallelizationContract};
13use timely::progress::Timestamp;
14use timely::worker::Worker;
15
16use super::layout::ColumnarUpdate as Update;
17use super::updates::UpdatesTyped;
18use super::RecordedUpdates;
19
20pub struct ValDistributor<U: Update, H> {
22 marker: std::marker::PhantomData<U>,
23 hashfunc: H,
24 pre_lens: Vec<usize>,
25}
26
27impl<U: Update, H: for<'a> FnMut(columnar::Ref<'a, U::Key>)->u64> Distributor<RecordedUpdates<U>> for ValDistributor<U, H> {
28 fn partition<T: Clone, P: timely::communication::Push<Message<T, RecordedUpdates<U>>>>(&mut self, container: &mut RecordedUpdates<U>, time: &T, pushers: &mut [P]) {
33 use super::updates::child_range;
34
35 let view = container.updates.view();
36 let keys_b = view.keys;
37 let mut outputs: Vec<UpdatesTyped<U>> = (0..pushers.len()).map(|_| UpdatesTyped::default()).collect();
38
39 for outer in 0..Len::len(&keys_b) {
41 self.pre_lens.clear();
42 self.pre_lens.extend(outputs.iter().map(|o| o.keys.values.len()));
43 if pushers.len().is_power_of_two() {
44 let mask = (pushers.len() - 1) as u64;
45 for k in child_range(keys_b.bounds, outer) {
46 let key = keys_b.values.get(k);
47 let h = (self.hashfunc)(key);
48 let idx = (h & mask) as usize;
49 outputs[idx].extend_from_keys(view, k..k+1);
50 }
51 }
52 else {
53 let pushers_len = pushers.len() as u64;
54 for k in child_range(keys_b.bounds, outer) {
55 let key = keys_b.values.get(k);
56 let h = (self.hashfunc)(key);
57 let idx = (h % pushers_len) as usize;
58 outputs[idx].extend_from_keys(view, k..k+1);
59 }
60 }
61 for (output, &pre) in outputs.iter_mut().zip(self.pre_lens.iter()) {
62 if output.keys.values.len() > pre {
63 output.keys.bounds.push(output.keys.values.len() as u64);
64 }
65 }
66 }
67
68 let total_records = container.records;
70 let non_empty: usize = outputs.iter().filter(|o| !o.keys.values.is_empty()).count();
71 let mut first_records = total_records.saturating_sub(non_empty.saturating_sub(1));
72 for (pusher, output) in pushers.iter_mut().zip(outputs) {
73 if !output.keys.values.is_empty() {
74 let recorded = RecordedUpdates { updates: output.into(), records: first_records, consolidated: container.consolidated };
75 first_records = 1;
76 let mut recorded = recorded;
77 Message::push_at(&mut recorded, time.clone(), pusher);
78 }
79 }
80 }
81 fn flush<T: Clone, P: timely::communication::Push<Message<T, RecordedUpdates<U>>>>(&mut self, _time: &T, _pushers: &mut [P]) { }
82 fn relax(&mut self) { }
83}
84
85pub struct ValPact<H> {
87 pub hashfunc: H,
89}
90
91impl<T, U, H> ParallelizationContract<T, RecordedUpdates<U>> for ValPact<H>
92where
93 T: Timestamp,
94 U: Update,
95 H: for<'a> FnMut(columnar::Ref<'a, U::Key>)->u64 + 'static,
96{
97 type Pusher = Exchange<
98 T,
99 LogPusher<Box<dyn timely::communication::Push<Message<T, RecordedUpdates<U>>>>>,
100 ValDistributor<U, H>
101 >;
102 type Puller = LogPuller<Box<dyn timely::communication::Pull<Message<T, RecordedUpdates<U>>>>>;
103
104 fn connect(self, worker: &Worker, identifier: usize, address: Rc<[usize]>, logging: Option<TimelyLogger>) -> (Self::Pusher, Self::Puller) {
105 let (senders, receiver) = worker.allocate::<Message<T, RecordedUpdates<U>>>(identifier, address);
106 let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, worker.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
107 let distributor = ValDistributor {
108 marker: std::marker::PhantomData,
109 hashfunc: self.hashfunc,
110 pre_lens: Vec::new(),
111 };
112 (Exchange::new(senders, distributor), LogPuller::new(receiver, worker.index(), identifier, logging.clone()))
113 }
114}