noir_compute/operator/join/
local_hash.rs

1#![allow(clippy::type_complexity)]
2
3use std::collections::{HashMap, HashSet, VecDeque};
4use std::fmt::Display;
5use std::marker::PhantomData;
6
7use crate::block::{BlockStructure, OperatorStructure};
8use crate::network::Coord;
9use crate::operator::join::ship::{ShipBroadcastRight, ShipHash, ShipStrategy};
10use crate::operator::join::{InnerJoinTuple, JoinVariant, LeftJoinTuple, OuterJoinTuple};
11use crate::operator::start::{BinaryElement, BinaryStartOperator};
12use crate::operator::{DataKey, ExchangeData, KeyerFn, Operator, StreamElement};
13use crate::scheduler::ExecutionMetadata;
14use crate::stream::{KeyedStream, Stream};
15
16/// This type keeps the elements of a side of the join.
17#[derive(Debug, Clone)]
18struct SideHashMap<Key: DataKey, Out> {
19    /// The actual items on this side, grouped by key.
20    ///
21    /// Note that when the other side ends this map is emptied.
22    data: HashMap<Key, Vec<Out>, crate::block::GroupHasherBuilder>,
23    /// The set of all the keys seen.
24    ///
25    /// Note that when this side ends this set is emptied since it won't be used again.
26    keys: HashSet<Key>,
27    /// Whether this side has ended.
28    ended: bool,
29    /// The number of items received.
30    count: usize,
31}
32
33impl<Key: DataKey, Out> Default for SideHashMap<Key, Out> {
34    fn default() -> Self {
35        Self {
36            data: Default::default(),
37            keys: Default::default(),
38            ended: false,
39            count: 0,
40        }
41    }
42}
43
44/// This operator performs the join using the local hash strategy.
45///
46/// This operator is able to produce the outer join tuples (the most general type of join), but it
47/// can be asked to skip generating the `None` tuples if the join was actually inner.
48#[derive(Clone, Debug)]
49struct JoinLocalHash<
50    Key: DataKey,
51    Out1: ExchangeData,
52    Out2: ExchangeData,
53    Keyer1: KeyerFn<Key, Out1>,
54    Keyer2: KeyerFn<Key, Out2>,
55    OperatorChain: Operator<Out = BinaryElement<Out1, Out2>>,
56> {
57    prev: OperatorChain,
58    coord: Coord,
59
60    /// The content of the left side.
61    left: SideHashMap<Key, Out1>,
62    /// The content of the right side.
63    right: SideHashMap<Key, Out2>,
64
65    keyer1: Keyer1,
66    keyer2: Keyer2,
67
68    /// The variant of join to build.
69    ///
70    /// This is used for optimizing the behaviour in case of inner and left joins, avoiding to
71    /// generate useless tuples.
72    variant: JoinVariant,
73    /// The already generated tuples, but not yet returned.
74    buffer: VecDeque<(Key, OuterJoinTuple<Out1, Out2>)>,
75}
76
77impl<
78        Key: DataKey,
79        Out1: ExchangeData,
80        Out2: ExchangeData,
81        Keyer1: KeyerFn<Key, Out1>,
82        Keyer2: KeyerFn<Key, Out2>,
83        OperatorChain: Operator<Out = BinaryElement<Out1, Out2>>,
84    > Display for JoinLocalHash<Key, Out1, Out2, Keyer1, Keyer2, OperatorChain>
85{
86    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87        write!(
88            f,
89            "{} -> JoinLocalHash<{}>",
90            self.prev,
91            std::any::type_name::<Key>()
92        )
93    }
94}
95
96impl<
97        Key: DataKey,
98        Out1: ExchangeData,
99        Out2: ExchangeData,
100        Keyer1: KeyerFn<Key, Out1>,
101        Keyer2: KeyerFn<Key, Out2>,
102        OperatorChain: Operator<Out = BinaryElement<Out1, Out2>>,
103    > JoinLocalHash<Key, Out1, Out2, Keyer1, Keyer2, OperatorChain>
104{
105    fn new(prev: OperatorChain, variant: JoinVariant, keyer1: Keyer1, keyer2: Keyer2) -> Self {
106        Self {
107            prev,
108            coord: Default::default(),
109            left: Default::default(),
110            right: Default::default(),
111            keyer1,
112            keyer2,
113            variant,
114            buffer: Default::default(),
115        }
116    }
117
118    /// Add a new item on the _left_ side, storing the newly generated tuples inside the buffer.
119    ///
120    /// This can be used to add _right_ tuples by swapping left and right parameters.
121    fn add_item<OutL: ExchangeData, OutR: ExchangeData>(
122        (key, item): (Key, OutL),
123        left: &mut SideHashMap<Key, OutL>,
124        right: &mut SideHashMap<Key, OutR>,
125        left_outer: bool,
126        right_outer: bool,
127        buffer: &mut VecDeque<(Key, OuterJoinTuple<Out1, Out2>)>,
128        make_pair: impl Fn(Option<OutL>, Option<OutR>) -> OuterJoinTuple<Out1, Out2>,
129    ) {
130        left.count += 1;
131        if let Some(right) = right.data.get(&key) {
132            // the left item has at least one right matching element
133            for rhs in right {
134                buffer.push_back((
135                    key.clone(),
136                    make_pair(Some(item.clone()), Some(rhs.clone())),
137                ));
138            }
139        } else if right.ended && left_outer {
140            // if the left item has no right correspondent, but the right has already ended
141            // we might need to generate the outer tuple.
142            buffer.push_back((key.clone(), make_pair(Some(item.clone()), None)));
143        } else {
144            // either the rhs is not ended (so we cannot generate anything for now), or
145            // it's left inner, so we cannot generate left-outer tuples.
146        }
147        if right_outer {
148            left.keys.insert(key.clone());
149        }
150        if !right.ended {
151            left.data.entry(key).or_default().push(item);
152        }
153    }
154
155    /// Mark the left side as ended, generating all the remaining tuples if the join is outer.
156    ///
157    /// This can be used to mark also the right side by swapping the parameters.
158    fn side_ended<OutL, OutR>(
159        right_outer: bool,
160        left: &mut SideHashMap<Key, OutL>,
161        right: &mut SideHashMap<Key, OutR>,
162        buffer: &mut VecDeque<(Key, OuterJoinTuple<Out1, Out2>)>,
163        make_pair: impl Fn(Option<OutL>, Option<OutR>) -> OuterJoinTuple<Out1, Out2>,
164    ) {
165        if right_outer {
166            // left ended and this is a right-outer, so we need to generate (None, Some)
167            // tuples. For each value on the right side, before dropping the right hashmap,
168            // search if there was already a match.
169            for (key, right) in right.data.drain() {
170                if !left.keys.contains(&key) {
171                    for rhs in right {
172                        buffer.push_back((key.clone(), make_pair(None, Some(rhs))));
173                    }
174                }
175            }
176        } else {
177            // in any case, we won't need the right hashmap anymore.
178            right.data.clear();
179        }
180        // we will never look at it, and nothing will be inserted, drop it freeing some memory.
181        left.keys.clear();
182        left.ended = true;
183    }
184}
185
186impl<
187        Key: DataKey,
188        Out1: ExchangeData,
189        Out2: ExchangeData,
190        Keyer1: KeyerFn<Key, Out1>,
191        Keyer2: KeyerFn<Key, Out2>,
192        OperatorChain: Operator<Out = BinaryElement<Out1, Out2>>,
193    > Operator for JoinLocalHash<Key, Out1, Out2, Keyer1, Keyer2, OperatorChain>
194{
195    type Out = (Key, OuterJoinTuple<Out1, Out2>);
196
197    fn setup(&mut self, metadata: &mut ExecutionMetadata) {
198        self.coord = metadata.coord;
199        self.prev.setup(metadata);
200    }
201
202    fn next(&mut self) -> StreamElement<(Key, OuterJoinTuple<Out1, Out2>)> {
203        while self.buffer.is_empty() {
204            match self.prev.next() {
205                StreamElement::Item(BinaryElement::Left(item)) => Self::add_item(
206                    ((self.keyer1)(&item), item),
207                    &mut self.left,
208                    &mut self.right,
209                    self.variant.left_outer(),
210                    self.variant.right_outer(),
211                    &mut self.buffer,
212                    |x, y| (x, y),
213                ),
214                StreamElement::Item(BinaryElement::Right(item)) => Self::add_item(
215                    ((self.keyer2)(&item), item),
216                    &mut self.right,
217                    &mut self.left,
218                    self.variant.right_outer(),
219                    self.variant.left_outer(),
220                    &mut self.buffer,
221                    |x, y| (y, x),
222                ),
223                StreamElement::Item(BinaryElement::LeftEnd) => {
224                    log::debug!(
225                        "Left side of join ended with {} elements on the left \
226                        and {} elements on the right",
227                        self.left.count,
228                        self.right.count
229                    );
230                    Self::side_ended(
231                        self.variant.right_outer(),
232                        &mut self.left,
233                        &mut self.right,
234                        &mut self.buffer,
235                        |x, y| (x, y),
236                    )
237                }
238                StreamElement::Item(BinaryElement::RightEnd) => {
239                    log::debug!(
240                        "Right side of join ended with {} elements on the left \
241                        and {} elements on the right",
242                        self.left.count,
243                        self.right.count
244                    );
245                    Self::side_ended(
246                        self.variant.left_outer(),
247                        &mut self.right,
248                        &mut self.left,
249                        &mut self.buffer,
250                        |x, y| (y, x),
251                    )
252                }
253                StreamElement::FlushAndRestart => {
254                    assert!(self.left.ended);
255                    assert!(self.right.ended);
256                    assert!(self.left.data.is_empty());
257                    assert!(self.right.data.is_empty());
258                    assert!(self.left.keys.is_empty());
259                    assert!(self.right.keys.is_empty());
260                    self.left.ended = false;
261                    self.left.count = 0;
262                    self.right.ended = false;
263                    self.right.count = 0;
264                    log::debug!("JoinLocalHash at {} emitted FlushAndRestart", self.coord);
265                    return StreamElement::FlushAndRestart;
266                }
267                StreamElement::Terminate => return StreamElement::Terminate,
268                StreamElement::FlushBatch => return StreamElement::FlushBatch,
269                StreamElement::Watermark(_) | StreamElement::Timestamped(_, _) => {
270                    panic!("Cannot yet join timestamped streams")
271                }
272            }
273        }
274
275        let item = self.buffer.pop_front().unwrap();
276        StreamElement::Item(item)
277    }
278
279    fn structure(&self) -> BlockStructure {
280        self.prev
281            .structure()
282            .add_operator(
283                OperatorStructure::new::<(Key, OuterJoinTuple<Out1, Out2>), _>("JoinLocalHash"),
284            )
285    }
286}
287
288/// This is an intermediate type for building a join operator.
289///
290/// The ship strategy has already been selected and it's stored in `ShipStrat`, the local strategy
291/// is hash and now the join variant has to be selected.
292///
293/// Note that `outer` join is not supported if the ship strategy is `broadcast_right`.
294pub struct JoinStreamLocalHash<
295    Key: DataKey,
296    Out1: ExchangeData,
297    Out2: ExchangeData,
298    Keyer1: KeyerFn<Key, Out1>,
299    Keyer2: KeyerFn<Key, Out2>,
300    ShipStrat: ShipStrategy,
301> {
302    stream: Stream<BinaryStartOperator<Out1, Out2>>,
303    keyer1: Keyer1,
304    keyer2: Keyer2,
305    _key: PhantomData<Key>,
306    _s: PhantomData<ShipStrat>,
307}
308
309impl<
310        Key: DataKey,
311        Out1: ExchangeData,
312        Out2: ExchangeData,
313        Keyer1,
314        Keyer2,
315        ShipStrat: ShipStrategy,
316    > JoinStreamLocalHash<Key, Out1, Out2, Keyer1, Keyer2, ShipStrat>
317where
318    Keyer1: KeyerFn<Key, Out1>,
319    Keyer2: KeyerFn<Key, Out2>,
320{
321    pub(crate) fn new(
322        stream: Stream<BinaryStartOperator<Out1, Out2>>,
323        keyer1: Keyer1,
324        keyer2: Keyer2,
325    ) -> Self {
326        Self {
327            stream,
328            keyer1,
329            keyer2,
330            _key: Default::default(),
331            _s: Default::default(),
332        }
333    }
334}
335
336impl<Key: DataKey, Out1: ExchangeData, Out2: ExchangeData, Keyer1, Keyer2>
337    JoinStreamLocalHash<Key, Out1, Out2, Keyer1, Keyer2, ShipHash>
338where
339    Keyer1: KeyerFn<Key, Out1>,
340    Keyer2: KeyerFn<Key, Out2>,
341{
342    /// Finalize the join operator by specifying that this is an _inner join_.
343    ///
344    /// Given two stream, create a stream with all the pairs (left item from the left stream, right
345    /// item from the right), such that the key obtained with `keyer1` on an item from the left is
346    /// equal to the key obtained with `keyer2` on an item from the right.
347    ///
348    /// This is an inner join, very similarly to `SELECT a, b FROM a JOIN b ON keyer1(a) = keyer2(b)`.
349    ///
350    /// **Note**: this operator will split the current block.
351    pub fn inner(self) -> KeyedStream<impl Operator<Out = (Key, InnerJoinTuple<Out1, Out2>)>> {
352        let keyer1 = self.keyer1;
353        let keyer2 = self.keyer2;
354        let inner = self
355            .stream
356            .add_operator(|prev| JoinLocalHash::new(prev, JoinVariant::Inner, keyer1, keyer2));
357        KeyedStream(inner).map(|(_key, (lhs, rhs))| (lhs.unwrap(), rhs.unwrap()))
358    }
359
360    /// Finalize the join operator by specifying that this is a _left join_.
361    ///
362    /// Given two stream, create a stream with all the pairs (left item from the left stream, right
363    /// item from the right), such that the key obtained with `keyer1` on an item from the left is
364    /// equal to the key obtained with `keyer2` on an item from the right.
365    ///
366    /// This is a **left** join, meaning that if an item from the left does not find and element
367    /// from the right with which make a pair, an extra pair `(left, None)` is generated. If you
368    /// want to have a _right_ join, you just need to switch the two sides and use a left join.
369    ///
370    /// This is very similar to `SELECT a, b FROM a LEFT JOIN b ON keyer1(a) = keyer2(b)`.    
371    ///
372    /// **Note**: this operator will split the current block.
373    pub fn left(self) -> KeyedStream<impl Operator<Out = (Key, LeftJoinTuple<Out1, Out2>)>> {
374        let keyer1 = self.keyer1;
375        let keyer2 = self.keyer2;
376        let inner = self
377            .stream
378            .add_operator(|prev| JoinLocalHash::new(prev, JoinVariant::Left, keyer1, keyer2));
379        KeyedStream(inner).map(|(_key, (lhs, rhs))| (lhs.unwrap(), rhs))
380    }
381
382    /// Finalize the join operator by specifying that this is an _outer join_.
383    ///
384    /// Given two stream, create a stream with all the pairs (left item from the left stream, right
385    /// item from the right), such that the key obtained with `keyer1` on an item from the left is
386    /// equal to the key obtained with `keyer2` on an item from the right.
387    ///
388    /// This is a **full-outer** join, meaning that if an item from the left does not find and element
389    /// from the right with which make a pair, an extra pair `(left, None)` is generated. Similarly
390    /// if an element from the right does not appear in any pair, a new one is generated with
391    /// `(None, right)`.
392    ///
393    /// This is very similar to `SELECT a, b FROM a FULL OUTER JOIN b ON keyer1(a) = keyer2(b)`.
394    ///
395    /// **Note**: this operator will split the current block.
396    pub fn outer(self) -> KeyedStream<impl Operator<Out = (Key, OuterJoinTuple<Out1, Out2>)>> {
397        let keyer1 = self.keyer1;
398        let keyer2 = self.keyer2;
399        let inner = self
400            .stream
401            .add_operator(|prev| JoinLocalHash::new(prev, JoinVariant::Outer, keyer1, keyer2));
402        KeyedStream(inner)
403    }
404}
405
406impl<Key: DataKey, Out1: ExchangeData, Out2: ExchangeData, Keyer1, Keyer2>
407    JoinStreamLocalHash<Key, Out1, Out2, Keyer1, Keyer2, ShipBroadcastRight>
408where
409    Keyer1: KeyerFn<Key, Out1>,
410    Keyer2: KeyerFn<Key, Out2>,
411{
412    /// Finalize the join operator by specifying that this is an _inner join_.
413    ///
414    /// Given two stream, create a stream with all the pairs (left item from the left stream, right
415    /// item from the right), such that the key obtained with `keyer1` on an item from the left is
416    /// equal to the key obtained with `keyer2` on an item from the right.
417    ///
418    /// This is an inner join, very similarly to `SELECT a, b FROM a JOIN b ON keyer1(a) = keyer2(b)`.
419    ///
420    /// **Note**: this operator will split the current block.
421    pub fn inner(self) -> Stream<impl Operator<Out = (Key, InnerJoinTuple<Out1, Out2>)>> {
422        let keyer1 = self.keyer1;
423        let keyer2 = self.keyer2;
424        self.stream
425            .add_operator(|prev| JoinLocalHash::new(prev, JoinVariant::Inner, keyer1, keyer2))
426            .map(|(key, (lhs, rhs))| (key, (lhs.unwrap(), rhs.unwrap())))
427    }
428
429    /// Finalize the join operator by specifying that this is a _left join_.
430    ///
431    /// Given two stream, create a stream with all the pairs (left item from the left stream, right
432    /// item from the right), such that the key obtained with `keyer1` on an item from the left is
433    /// equal to the key obtained with `keyer2` on an item from the right.
434    ///
435    /// This is a **left** join, meaning that if an item from the left does not find and element
436    /// from the right with which make a pair, an extra pair `(left, None)` is generated. If you
437    /// want to have a _right_ join, you just need to switch the two sides and use a left join.
438    ///
439    /// This is very similar to `SELECT a, b FROM a LEFT JOIN b ON keyer1(a) = keyer2(b)`.    
440    ///
441    /// **Note**: this operator will split the current block.
442    pub fn left(self) -> Stream<impl Operator<Out = (Key, LeftJoinTuple<Out1, Out2>)>> {
443        let keyer1 = self.keyer1;
444        let keyer2 = self.keyer2;
445        self.stream
446            .add_operator(|prev| JoinLocalHash::new(prev, JoinVariant::Left, keyer1, keyer2))
447            .map(|(key, (lhs, rhs))| (key, (lhs.unwrap(), rhs)))
448    }
449}