timely/dataflow/channels/pushers/
exchange.rs

1//! The exchange pattern distributes pushed data between many target pushees.
2
3use timely_container::PushPartitioned;
4use crate::{Container, Data};
5use crate::communication::Push;
6use crate::dataflow::channels::{BundleCore, Message};
7
8// TODO : Software write combining
9/// Distributes records among target pushees according to a distribution function.
10pub struct Exchange<T, C: Container, D, P: Push<BundleCore<T, C>>, H: FnMut(&D) -> u64> {
11    pushers: Vec<P>,
12    buffers: Vec<C>,
13    current: Option<T>,
14    hash_func: H,
15    phantom: std::marker::PhantomData<D>,
16}
17
18impl<T: Clone, C: Container, D: Data, P: Push<BundleCore<T, C>>, H: FnMut(&D) -> u64>  Exchange<T, C, D, P, H> {
19    /// Allocates a new `Exchange` from a supplied set of pushers and a distribution function.
20    pub fn new(pushers: Vec<P>, key: H) -> Exchange<T, C, D, P, H> {
21        let mut buffers = vec![];
22        for _ in 0..pushers.len() {
23            buffers.push(Default::default());
24        }
25        Exchange {
26            pushers,
27            hash_func: key,
28            buffers,
29            current: None,
30            phantom: std::marker::PhantomData,
31        }
32    }
33    #[inline]
34    fn flush(&mut self, index: usize) {
35        if !self.buffers[index].is_empty() {
36            if let Some(ref time) = self.current {
37                Message::push_at(&mut self.buffers[index], time.clone(), &mut self.pushers[index]);
38            }
39        }
40    }
41}
42
43impl<T: Eq+Data, C: Container, D: Data, P: Push<BundleCore<T, C>>, H: FnMut(&D) -> u64> Push<BundleCore<T, C>> for Exchange<T, C, D, P, H>
44where
45    C: PushPartitioned<Item=D>
46{
47    #[inline(never)]
48    fn push(&mut self, message: &mut Option<BundleCore<T, C>>) {
49        // if only one pusher, no exchange
50        if self.pushers.len() == 1 {
51            self.pushers[0].push(message);
52        }
53        else if let Some(message) = message {
54
55            let message = message.as_mut();
56            let time = &message.time;
57            let data = &mut message.data;
58
59            // if the time isn't right, flush everything.
60            if self.current.as_ref().map_or(false, |x| x != time) {
61                for index in 0..self.pushers.len() {
62                    self.flush(index);
63                }
64            }
65            self.current = Some(time.clone());
66
67            let hash_func = &mut self.hash_func;
68
69            // if the number of pushers is a power of two, use a mask
70            if (self.pushers.len() & (self.pushers.len() - 1)) == 0 {
71                let mask = (self.pushers.len() - 1) as u64;
72                let pushers = &mut self.pushers;
73                data.push_partitioned(
74                    &mut self.buffers,
75                    move |datum| ((hash_func)(datum) & mask) as usize,
76                    |index, buffer| {
77                            Message::push_at(buffer, time.clone(), &mut pushers[index]);
78                    }
79                );
80            }
81            // as a last resort, use mod (%)
82            else {
83                let num_pushers = self.pushers.len() as u64;
84                let pushers = &mut self.pushers;
85                data.push_partitioned(
86                    &mut self.buffers,
87                    move |datum| ((hash_func)(datum) % num_pushers) as usize,
88                    |index, buffer| {
89                        Message::push_at(buffer, time.clone(), &mut pushers[index]);
90                    }
91                );
92            }
93
94        }
95        else {
96            // flush
97            for index in 0..self.pushers.len() {
98                self.flush(index);
99                self.pushers[index].push(&mut None);
100            }
101        }
102    }
103}