Skip to main content

differential_dataflow/columnar/
exchange.rs

1//! Exchange / parallelization contract for `RecordedUpdates`.
2//!
3//! `ValPact` is the PACT used when shuffling columnar updates across workers;
4//! `ValDistributor` is the per-worker partitioner it constructs.
5
6use std::rc::Rc;
7
8use columnar::{Index, Len};
9use timely::logging::TimelyLogger;
10use timely::dataflow::channels::pushers::{Exchange, exchange::Distributor};
11use timely::dataflow::channels::Message;
12use timely::dataflow::channels::pact::{LogPuller, LogPusher, ParallelizationContract};
13use timely::progress::Timestamp;
14use timely::worker::Worker;
15
16use super::layout::ColumnarUpdate as Update;
17use super::updates::UpdatesTyped;
18use super::RecordedUpdates;
19
20/// Distributor that routes `RecordedUpdates` records to workers by hashing keys.
21pub struct ValDistributor<U: Update, H> {
22    marker: std::marker::PhantomData<U>,
23    hashfunc: H,
24    pre_lens: Vec<usize>,
25}
26
27impl<U: Update, H: for<'a> FnMut(columnar::Ref<'a, U::Key>)->u64> Distributor<RecordedUpdates<U>> for ValDistributor<U, H> {
28    // TODO: For unsorted UpdatesTyped (stride-1 outer keys), each key is its own outer group,
29    // so the per-group pre_lens snapshot and seal check costs O(keys × workers). Should
30    // either batch keys by destination first, or detect stride-1 outer bounds and use a
31    // simpler single-pass partitioning that seals once at the end.
32    fn partition<T: Clone, P: timely::communication::Push<Message<T, RecordedUpdates<U>>>>(&mut self, container: &mut RecordedUpdates<U>, time: &T, pushers: &mut [P]) {
33        use super::updates::child_range;
34
35        let view = container.updates.view();
36        let keys_b = view.keys;
37        let mut outputs: Vec<UpdatesTyped<U>> = (0..pushers.len()).map(|_| UpdatesTyped::default()).collect();
38
39        // Each outer key group becomes a separate run in the destination.
40        for outer in 0..Len::len(&keys_b) {
41            self.pre_lens.clear();
42            self.pre_lens.extend(outputs.iter().map(|o| o.keys.values.len()));
43            if pushers.len().is_power_of_two() {
44                let mask = (pushers.len() - 1) as u64;
45                for k in child_range(keys_b.bounds, outer) {
46                    let key = keys_b.values.get(k);
47                    let h = (self.hashfunc)(key);
48                    let idx = (h & mask) as usize;
49                    outputs[idx].extend_from_keys(view, k..k+1);
50                }
51            }
52            else {
53                let pushers_len = pushers.len() as u64;
54                for k in child_range(keys_b.bounds, outer) {
55                    let key = keys_b.values.get(k);
56                    let h = (self.hashfunc)(key);
57                    let idx = (h % pushers_len) as usize;
58                    outputs[idx].extend_from_keys(view, k..k+1);
59                }
60            }
61            for (output, &pre) in outputs.iter_mut().zip(self.pre_lens.iter()) {
62                if output.keys.values.len() > pre {
63                    output.keys.bounds.push(output.keys.values.len() as u64);
64                }
65            }
66        }
67
68        // Distribute the input's record count across non-empty outputs.
69        let total_records = container.records;
70        let non_empty: usize = outputs.iter().filter(|o| !o.keys.values.is_empty()).count();
71        let mut first_records = total_records.saturating_sub(non_empty.saturating_sub(1));
72        for (pusher, output) in pushers.iter_mut().zip(outputs) {
73            if !output.keys.values.is_empty() {
74                let recorded = RecordedUpdates { updates: output.into(), records: first_records, consolidated: container.consolidated };
75                first_records = 1;
76                let mut recorded = recorded;
77                Message::push_at(&mut recorded, time.clone(), pusher);
78            }
79        }
80    }
81    fn flush<T: Clone, P: timely::communication::Push<Message<T, RecordedUpdates<U>>>>(&mut self, _time: &T, _pushers: &mut [P]) { }
82    fn relax(&mut self) { }
83}
84
85/// PACT for shuffling `RecordedUpdates` containers by hashing keys.
86pub struct ValPact<H> {
87    /// Hash function applied to each key reference.
88    pub hashfunc: H,
89}
90
91impl<T, U, H> ParallelizationContract<T, RecordedUpdates<U>> for ValPact<H>
92where
93    T: Timestamp,
94    U: Update,
95    H: for<'a> FnMut(columnar::Ref<'a, U::Key>)->u64 + 'static,
96{
97    type Pusher = Exchange<
98        T,
99        LogPusher<Box<dyn timely::communication::Push<Message<T, RecordedUpdates<U>>>>>,
100        ValDistributor<U, H>
101    >;
102    type Puller = LogPuller<Box<dyn timely::communication::Pull<Message<T, RecordedUpdates<U>>>>>;
103
104    fn connect(self, worker: &Worker, identifier: usize, address: Rc<[usize]>, logging: Option<TimelyLogger>) -> (Self::Pusher, Self::Puller) {
105        let (senders, receiver) = worker.allocate::<Message<T, RecordedUpdates<U>>>(identifier, address);
106        let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, worker.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
107        let distributor = ValDistributor {
108            marker: std::marker::PhantomData,
109            hashfunc: self.hashfunc,
110            pre_lens: Vec::new(),
111        };
112        (Exchange::new(senders, distributor), LogPuller::new(receiver, worker.index(), identifier, logging.clone()))
113    }
114}