noir_compute/operator/join/
mod.rs

1//! Structures for building the join operators.
2//!
3//! The actual operators are [`Stream::join`], [`Stream::left_join`], [`Stream::outer_join`] and
4//! [`Stream::join_with`].
5
6use std::marker::PhantomData;
7
8pub use local_hash::JoinStreamLocalHash;
9pub use local_sort_merge::JoinStreamLocalSortMerge;
10pub use ship::{ShipBroadcastRight, ShipHash, ShipStrategy};
11
12pub use crate::operator::join::ship::{JoinStreamShipBroadcastRight, JoinStreamShipHash};
13use crate::operator::{Data, DataKey, ExchangeData, KeyerFn, Operator};
14use crate::stream::{KeyedStream, Stream};
15
16mod keyed_join;
17mod local_hash;
18mod local_sort_merge;
19mod ship;
20
21/// Type alias for a pair of joined items in an inner join.
22pub type InnerJoinTuple<Out1, Out2> = (Out1, Out2);
23/// Type alias for a pair of joined items in a left join.
24pub type LeftJoinTuple<Out1, Out2> = (Out1, Option<Out2>);
25/// Type alias for a pair of joined items in an outer join.
26pub type OuterJoinTuple<Out1, Out2> = (Option<Out1>, Option<Out2>);
27
28/// The variant of the join, either a inner, a left or a full outer join.
29#[derive(Clone, Debug)]
30pub(crate) enum JoinVariant {
31    /// The join is full inner.
32    Inner,
33    /// The join is a left outer join..
34    ///
35    /// This means that all the left elements will appear at least once in the output.
36    Left,
37    /// The join is full outer.
38    ///
39    /// This means that all the elements will appear in at least one output tuple.
40    Outer,
41}
42
43impl JoinVariant {
44    /// Whether this variant is left outer (either left or full outer).
45    pub(crate) fn left_outer(&self) -> bool {
46        matches!(self, JoinVariant::Left | JoinVariant::Outer)
47    }
48
49    /// Whether this variant is right outer (i.e. full outer since we don't support right join).
50    pub(crate) fn right_outer(&self) -> bool {
51        matches!(self, JoinVariant::Outer)
52    }
53}
54
55/// Intermediate stream type for building the join between two streams.
56///
57/// This type has methods for selecting the ship strategy of the join, later you will be able to
58/// select the local strategy, and finally the variant of the join.
59pub struct JoinStream<
60    Key,
61    Out1: ExchangeData,
62    Out2: ExchangeData,
63    OperatorChain1,
64    OperatorChain2,
65    Keyer1,
66    Keyer2,
67> where
68    OperatorChain1: Operator<Out = Out1>,
69    OperatorChain2: Operator<Out = Out2>,
70    Keyer1: KeyerFn<Key, Out1>,
71    Keyer2: KeyerFn<Key, Out2>,
72{
73    /// The stream of the left side.
74    pub(crate) lhs: Stream<OperatorChain1>,
75    /// The stream of the right side.
76    pub(crate) rhs: Stream<OperatorChain2>,
77    /// The function for extracting the join key from the left stream.
78    pub(crate) keyer1: Keyer1,
79    /// The function for extracting the join key from the right stream.
80    pub(crate) keyer2: Keyer2,
81
82    _key: PhantomData<Key>,
83}
84
85impl<Out: ExchangeData, OperatorChain> Stream<OperatorChain>
86where
87    OperatorChain: Operator<Out = Out> + 'static,
88{
89    /// Given two stream, create a stream with all the pairs (left item from the left stream, right
90    /// item from the right), such that the key obtained with `keyer1` on an item from the left is
91    /// equal to the key obtained with `keyer2` on an item from the right.
92    ///
93    /// This is an inner join, very similar to `SELECT a, b FROM a JOIN b ON keyer1(a) = keyer2(b)`.
94    ///
95    /// This is a shortcut for: `self.join_with(...).ship_hash().local_hash().inner()`.
96    ///
97    /// **Note**: this operator will split the current block.
98    ///
99    /// ## Example
100    ///
101    /// ```
102    /// # use noir_compute::{StreamContext, RuntimeConfig};
103    /// # use noir_compute::operator::source::IteratorSource;
104    /// # let mut env = StreamContext::new(RuntimeConfig::local(1));
105    /// let s1 = env.stream_iter(0..5u8);
106    /// let s2 = env.stream_iter(0..5i32);
107    /// let res = s1.join(s2, |n| (n % 5) as i32, |n| n % 2).drop_key().collect_vec();
108    ///
109    /// env.execute_blocking();
110    ///
111    /// let mut res = res.get().unwrap();
112    /// res.sort_unstable();
113    /// assert_eq!(res, vec![(0, 0), (0, 2), (0, 4), (1, 1), (1, 3)]);
114    /// ```
115    pub fn join<Out2: ExchangeData, OperatorChain2, Key, Keyer1, Keyer2>(
116        self,
117        rhs: Stream<OperatorChain2>,
118        keyer1: Keyer1,
119        keyer2: Keyer2,
120    ) -> KeyedStream<impl Operator<Out = (Key, InnerJoinTuple<Out, Out2>)>>
121    where
122        Key: DataKey,
123        OperatorChain2: Operator<Out = Out2> + 'static,
124        Keyer1: Fn(&Out) -> Key + KeyerFn<Key, Out>,
125        Keyer2: Fn(&Out2) -> Key + KeyerFn<Key, Out2>,
126    {
127        self.join_with(rhs, keyer1, keyer2)
128            .ship_hash()
129            .local_hash()
130            .inner()
131    }
132
133    /// Given two stream, create a stream with all the pairs (left item from the left stream, right
134    /// item from the right), such that the key obtained with `keyer1` on an item from the left is
135    /// equal to the key obtained with `keyer2` on an item from the right.
136    ///
137    /// This is a **left** join, meaning that if an item from the left does not find and element
138    /// from the right with which make a pair, an extra pair `(left, None)` is generated. If you
139    /// want to have a _right_ join, you just need to switch the two sides and use a left join.
140    ///
141    /// This is very similar to `SELECT a, b FROM a LEFT JOIN b ON keyer1(a) = keyer2(b)`.
142    ///
143    /// This is a shortcut for: `self.join_with(...).ship_hash().local_hash().left()`.
144    ///
145    /// **Note**: this operator will split the current block.
146    ///
147    /// ## Example
148    ///
149    /// ```
150    /// # use noir_compute::{StreamContext, RuntimeConfig};
151    /// # use noir_compute::operator::source::IteratorSource;
152    /// # let mut env = StreamContext::new(RuntimeConfig::local(1));
153    /// let s1 = env.stream_iter(0..5u8);
154    /// let s2 = env.stream_iter(0..5i32);
155    /// let res = s1.left_join(s2, |n| (n % 5) as i32, |n| n % 2).drop_key().collect_vec();
156    ///
157    /// env.execute_blocking();
158    ///
159    /// let mut res = res.get().unwrap();
160    /// res.sort_unstable();
161    /// assert_eq!(res, vec![(0, Some(0)), (0, Some(2)), (0, Some(4)), (1, Some(1)), (1, Some(3)), (2, None), (3, None), (4, None)]);
162    /// ```
163    pub fn left_join<Out2: ExchangeData, OperatorChain2, Key, Keyer1, Keyer2>(
164        self,
165        rhs: Stream<OperatorChain2>,
166        keyer1: Keyer1,
167        keyer2: Keyer2,
168    ) -> KeyedStream<impl Operator<Out = (Key, LeftJoinTuple<Out, Out2>)>>
169    where
170        Key: DataKey,
171        OperatorChain2: Operator<Out = Out2> + 'static,
172        Keyer1: Fn(&Out) -> Key + KeyerFn<Key, Out>,
173        Keyer2: Fn(&Out2) -> Key + KeyerFn<Key, Out2>,
174    {
175        self.join_with(rhs, keyer1, keyer2)
176            .ship_hash()
177            .local_hash()
178            .left()
179    }
180
181    /// Given two stream, create a stream with all the pairs (left item from the left stream, right
182    /// item from the right), such that the key obtained with `keyer1` on an item from the left is
183    /// equal to the key obtained with `keyer2` on an item from the right.
184    ///
185    /// This is a **full-outer** join, meaning that if an item from the left does not find and element
186    /// from the right with which make a pair, an extra pair `(left, None)` is generated. Similarly
187    /// if an element from the right does not appear in any pair, a new one is generated with
188    /// `(None, right)`.
189    ///
190    /// This is very similar to `SELECT a, b FROM a FULL OUTER JOIN b ON keyer1(a) = keyer2(b)`.
191    ///
192    /// This is a shortcut for: `self.join_with(...).ship_hash().local_hash().outer()`.
193    ///
194    /// **Note**: this operator will split the current block.
195    ///
196    /// ## Example
197    ///
198    /// ```
199    /// # use noir_compute::{StreamContext, RuntimeConfig};
200    /// # use noir_compute::operator::source::IteratorSource;
201    /// # let mut env = StreamContext::new(RuntimeConfig::local(1));
202    /// let s1 = env.stream_iter(0..5u8);
203    /// let s2 = env.stream_iter(0..5i32);
204    /// let res = s1.outer_join(s2, |n| (n % 5) as i32, |n| n % 2).drop_key().collect_vec();
205    ///
206    /// env.execute_blocking();
207    ///
208    /// let mut res = res.get().unwrap();
209    /// res.sort_unstable();
210    /// assert_eq!(res, vec![(Some(0), Some(0)), (Some(0), Some(2)), (Some(0), Some(4)), (Some(1), Some(1)), (Some(1), Some(3)), (Some(2), None), (Some(3), None), (Some(4), None)]);
211    /// ```
212    pub fn outer_join<Out2: ExchangeData, OperatorChain2, Key, Keyer1, Keyer2>(
213        self,
214        rhs: Stream<OperatorChain2>,
215        keyer1: Keyer1,
216        keyer2: Keyer2,
217    ) -> KeyedStream<impl Operator<Out = (Key, OuterJoinTuple<Out, Out2>)>>
218    where
219        Key: DataKey,
220        OperatorChain2: Operator<Out = Out2> + 'static,
221        Keyer1: Fn(&Out) -> Key + KeyerFn<Key, Out>,
222        Keyer2: Fn(&Out2) -> Key + KeyerFn<Key, Out2>,
223    {
224        self.join_with(rhs, keyer1, keyer2)
225            .ship_hash()
226            .local_hash()
227            .outer()
228    }
229
230    /// Given two streams, start building a join operator.
231    ///
232    /// The returned type allows you to customize the behaviour of the join. You can select which
233    /// ship strategy and which local strategy to use.
234    ///
235    /// **Ship strategies**
236    ///
237    /// - _hash_: the hash of the key is used to select where to send the elements
238    /// - _broadcast right_: the left stream is left locally, while the right stream is broadcasted
239    ///
240    /// **Local strategies**
241    ///
242    /// - _hash_: build an hashmap to match the tuples
243    /// - _sort and merge_: collect all the tuples, sort them by key and merge them
244    ///
245    /// The first strategy to select is the _ship strategy_. After choosing that you have to select
246    /// the local strategy.
247    ///
248    /// ## Example
249    ///
250    /// ```
251    /// # use noir_compute::{StreamContext, RuntimeConfig};
252    /// # use noir_compute::operator::source::IteratorSource;
253    /// # let mut env = StreamContext::new(RuntimeConfig::local(1));
254    /// let s1 = env.stream_iter(0..5u8);
255    /// let s2 = env.stream_iter(0..5i32);
256    /// let j = s1.join_with(s2, |n| (n % 5) as i32, |n| n % 2).ship_hash();
257    /// ```
258    ///
259    /// ```
260    /// # use noir_compute::{StreamContext, RuntimeConfig};
261    /// # use noir_compute::operator::source::IteratorSource;
262    /// # let mut env = StreamContext::new(RuntimeConfig::local(1));
263    /// let s1 = env.stream_iter(0..5u8);
264    /// let s2 = env.stream_iter(0..5i32);
265    /// let j = s1.join_with(s2, |n| (n % 5) as i32, |n| n % 2).ship_broadcast_right();
266    /// ```
267    pub fn join_with<Out2: ExchangeData, OperatorChain2, Key, Keyer1, Keyer2>(
268        self,
269        rhs: Stream<OperatorChain2>,
270        keyer1: Keyer1,
271        keyer2: Keyer2,
272    ) -> JoinStream<Key, Out, Out2, OperatorChain, OperatorChain2, Keyer1, Keyer2>
273    where
274        OperatorChain2: Operator<Out = Out2>,
275        Keyer1: Fn(&Out) -> Key + KeyerFn<Key, Out>,
276        Keyer2: Fn(&Out2) -> Key + KeyerFn<Key, Out2>,
277    {
278        JoinStream {
279            lhs: self,
280            rhs,
281            keyer1,
282            keyer2,
283            _key: PhantomData,
284        }
285    }
286}
287
288impl<
289        Key: Data,
290        Out1: ExchangeData,
291        Out2: ExchangeData,
292        OperatorChain1,
293        OperatorChain2,
294        Keyer1,
295        Keyer2,
296    > JoinStream<Key, Out1, Out2, OperatorChain1, OperatorChain2, Keyer1, Keyer2>
297where
298    OperatorChain1: Operator<Out = Out1> + 'static,
299    OperatorChain2: Operator<Out = Out2> + 'static,
300    Keyer1: KeyerFn<Key, Out1>,
301    Keyer2: KeyerFn<Key, Out2>,
302{
303    /// Use the Hash Repartition strategy.
304    ///
305    /// With this strategy the two streams are shuffled (like a group-by), pointing the message with
306    /// the same key to the same replica. The key must be hashable.
307    pub fn ship_hash(self) -> JoinStreamShipHash<Key, Out1, Out2, Keyer1, Keyer2>
308    where
309        Key: DataKey,
310    {
311        JoinStreamShipHash::new(self)
312    }
313
314    /// Use the Broadcast-Forward strategy.
315    ///
316    /// The left side won't be sent to the network, while the right side is broadcasted. This is
317    /// recommended when the left side is really big and the left side really small.
318    ///
319    /// This does not require the key to be hashable.
320    pub fn ship_broadcast_right(
321        self,
322    ) -> JoinStreamShipBroadcastRight<Key, Out1, Out2, Keyer1, Keyer2> {
323        JoinStreamShipBroadcastRight::new(self)
324    }
325}