timely/dataflow/channels/pushers/
exchange.rs1use timely_container::PushPartitioned;
4use crate::{Container, Data};
5use crate::communication::Push;
6use crate::dataflow::channels::{BundleCore, Message};
7
8pub struct Exchange<T, C: Container, D, P: Push<BundleCore<T, C>>, H: FnMut(&D) -> u64> {
11 pushers: Vec<P>,
12 buffers: Vec<C>,
13 current: Option<T>,
14 hash_func: H,
15 phantom: std::marker::PhantomData<D>,
16}
17
18impl<T: Clone, C: Container, D: Data, P: Push<BundleCore<T, C>>, H: FnMut(&D) -> u64> Exchange<T, C, D, P, H> {
19 pub fn new(pushers: Vec<P>, key: H) -> Exchange<T, C, D, P, H> {
21 let mut buffers = vec![];
22 for _ in 0..pushers.len() {
23 buffers.push(Default::default());
24 }
25 Exchange {
26 pushers,
27 hash_func: key,
28 buffers,
29 current: None,
30 phantom: std::marker::PhantomData,
31 }
32 }
33 #[inline]
34 fn flush(&mut self, index: usize) {
35 if !self.buffers[index].is_empty() {
36 if let Some(ref time) = self.current {
37 Message::push_at(&mut self.buffers[index], time.clone(), &mut self.pushers[index]);
38 }
39 }
40 }
41}
42
43impl<T: Eq+Data, C: Container, D: Data, P: Push<BundleCore<T, C>>, H: FnMut(&D) -> u64> Push<BundleCore<T, C>> for Exchange<T, C, D, P, H>
44where
45 C: PushPartitioned<Item=D>
46{
47 #[inline(never)]
48 fn push(&mut self, message: &mut Option<BundleCore<T, C>>) {
49 if self.pushers.len() == 1 {
51 self.pushers[0].push(message);
52 }
53 else if let Some(message) = message {
54
55 let message = message.as_mut();
56 let time = &message.time;
57 let data = &mut message.data;
58
59 if self.current.as_ref().map_or(false, |x| x != time) {
61 for index in 0..self.pushers.len() {
62 self.flush(index);
63 }
64 }
65 self.current = Some(time.clone());
66
67 let hash_func = &mut self.hash_func;
68
69 if (self.pushers.len() & (self.pushers.len() - 1)) == 0 {
71 let mask = (self.pushers.len() - 1) as u64;
72 let pushers = &mut self.pushers;
73 data.push_partitioned(
74 &mut self.buffers,
75 move |datum| ((hash_func)(datum) & mask) as usize,
76 |index, buffer| {
77 Message::push_at(buffer, time.clone(), &mut pushers[index]);
78 }
79 );
80 }
81 else {
83 let num_pushers = self.pushers.len() as u64;
84 let pushers = &mut self.pushers;
85 data.push_partitioned(
86 &mut self.buffers,
87 move |datum| ((hash_func)(datum) % num_pushers) as usize,
88 |index, buffer| {
89 Message::push_at(buffer, time.clone(), &mut pushers[index]);
90 }
91 );
92 }
93
94 }
95 else {
96 for index in 0..self.pushers.len() {
98 self.flush(index);
99 self.pushers[index].push(&mut None);
100 }
101 }
102 }
103}