noir_compute/operator/join/mod.rs
1//! Structures for building the join operators.
2//!
3//! The actual operators are [`Stream::join`], [`Stream::left_join`], [`Stream::outer_join`] and
4//! [`Stream::join_with`].
5
6use std::marker::PhantomData;
7
8pub use local_hash::JoinStreamLocalHash;
9pub use local_sort_merge::JoinStreamLocalSortMerge;
10pub use ship::{ShipBroadcastRight, ShipHash, ShipStrategy};
11
12pub use crate::operator::join::ship::{JoinStreamShipBroadcastRight, JoinStreamShipHash};
13use crate::operator::{Data, DataKey, ExchangeData, KeyerFn, Operator};
14use crate::stream::{KeyedStream, Stream};
15
16mod keyed_join;
17mod local_hash;
18mod local_sort_merge;
19mod ship;
20
21/// Type alias for a pair of joined items in an inner join.
22pub type InnerJoinTuple<Out1, Out2> = (Out1, Out2);
23/// Type alias for a pair of joined items in a left join.
24pub type LeftJoinTuple<Out1, Out2> = (Out1, Option<Out2>);
25/// Type alias for a pair of joined items in an outer join.
26pub type OuterJoinTuple<Out1, Out2> = (Option<Out1>, Option<Out2>);
27
28/// The variant of the join, either a inner, a left or a full outer join.
29#[derive(Clone, Debug)]
30pub(crate) enum JoinVariant {
31 /// The join is full inner.
32 Inner,
33 /// The join is a left outer join..
34 ///
35 /// This means that all the left elements will appear at least once in the output.
36 Left,
37 /// The join is full outer.
38 ///
39 /// This means that all the elements will appear in at least one output tuple.
40 Outer,
41}
42
43impl JoinVariant {
44 /// Whether this variant is left outer (either left or full outer).
45 pub(crate) fn left_outer(&self) -> bool {
46 matches!(self, JoinVariant::Left | JoinVariant::Outer)
47 }
48
49 /// Whether this variant is right outer (i.e. full outer since we don't support right join).
50 pub(crate) fn right_outer(&self) -> bool {
51 matches!(self, JoinVariant::Outer)
52 }
53}
54
55/// Intermediate stream type for building the join between two streams.
56///
57/// This type has methods for selecting the ship strategy of the join, later you will be able to
58/// select the local strategy, and finally the variant of the join.
59pub struct JoinStream<
60 Key,
61 Out1: ExchangeData,
62 Out2: ExchangeData,
63 OperatorChain1,
64 OperatorChain2,
65 Keyer1,
66 Keyer2,
67> where
68 OperatorChain1: Operator<Out = Out1>,
69 OperatorChain2: Operator<Out = Out2>,
70 Keyer1: KeyerFn<Key, Out1>,
71 Keyer2: KeyerFn<Key, Out2>,
72{
73 /// The stream of the left side.
74 pub(crate) lhs: Stream<OperatorChain1>,
75 /// The stream of the right side.
76 pub(crate) rhs: Stream<OperatorChain2>,
77 /// The function for extracting the join key from the left stream.
78 pub(crate) keyer1: Keyer1,
79 /// The function for extracting the join key from the right stream.
80 pub(crate) keyer2: Keyer2,
81
82 _key: PhantomData<Key>,
83}
84
85impl<Out: ExchangeData, OperatorChain> Stream<OperatorChain>
86where
87 OperatorChain: Operator<Out = Out> + 'static,
88{
89 /// Given two stream, create a stream with all the pairs (left item from the left stream, right
90 /// item from the right), such that the key obtained with `keyer1` on an item from the left is
91 /// equal to the key obtained with `keyer2` on an item from the right.
92 ///
93 /// This is an inner join, very similar to `SELECT a, b FROM a JOIN b ON keyer1(a) = keyer2(b)`.
94 ///
95 /// This is a shortcut for: `self.join_with(...).ship_hash().local_hash().inner()`.
96 ///
97 /// **Note**: this operator will split the current block.
98 ///
99 /// ## Example
100 ///
101 /// ```
102 /// # use noir_compute::{StreamContext, RuntimeConfig};
103 /// # use noir_compute::operator::source::IteratorSource;
104 /// # let mut env = StreamContext::new(RuntimeConfig::local(1));
105 /// let s1 = env.stream_iter(0..5u8);
106 /// let s2 = env.stream_iter(0..5i32);
107 /// let res = s1.join(s2, |n| (n % 5) as i32, |n| n % 2).drop_key().collect_vec();
108 ///
109 /// env.execute_blocking();
110 ///
111 /// let mut res = res.get().unwrap();
112 /// res.sort_unstable();
113 /// assert_eq!(res, vec![(0, 0), (0, 2), (0, 4), (1, 1), (1, 3)]);
114 /// ```
115 pub fn join<Out2: ExchangeData, OperatorChain2, Key, Keyer1, Keyer2>(
116 self,
117 rhs: Stream<OperatorChain2>,
118 keyer1: Keyer1,
119 keyer2: Keyer2,
120 ) -> KeyedStream<impl Operator<Out = (Key, InnerJoinTuple<Out, Out2>)>>
121 where
122 Key: DataKey,
123 OperatorChain2: Operator<Out = Out2> + 'static,
124 Keyer1: Fn(&Out) -> Key + KeyerFn<Key, Out>,
125 Keyer2: Fn(&Out2) -> Key + KeyerFn<Key, Out2>,
126 {
127 self.join_with(rhs, keyer1, keyer2)
128 .ship_hash()
129 .local_hash()
130 .inner()
131 }
132
133 /// Given two stream, create a stream with all the pairs (left item from the left stream, right
134 /// item from the right), such that the key obtained with `keyer1` on an item from the left is
135 /// equal to the key obtained with `keyer2` on an item from the right.
136 ///
137 /// This is a **left** join, meaning that if an item from the left does not find and element
138 /// from the right with which make a pair, an extra pair `(left, None)` is generated. If you
139 /// want to have a _right_ join, you just need to switch the two sides and use a left join.
140 ///
141 /// This is very similar to `SELECT a, b FROM a LEFT JOIN b ON keyer1(a) = keyer2(b)`.
142 ///
143 /// This is a shortcut for: `self.join_with(...).ship_hash().local_hash().left()`.
144 ///
145 /// **Note**: this operator will split the current block.
146 ///
147 /// ## Example
148 ///
149 /// ```
150 /// # use noir_compute::{StreamContext, RuntimeConfig};
151 /// # use noir_compute::operator::source::IteratorSource;
152 /// # let mut env = StreamContext::new(RuntimeConfig::local(1));
153 /// let s1 = env.stream_iter(0..5u8);
154 /// let s2 = env.stream_iter(0..5i32);
155 /// let res = s1.left_join(s2, |n| (n % 5) as i32, |n| n % 2).drop_key().collect_vec();
156 ///
157 /// env.execute_blocking();
158 ///
159 /// let mut res = res.get().unwrap();
160 /// res.sort_unstable();
161 /// assert_eq!(res, vec![(0, Some(0)), (0, Some(2)), (0, Some(4)), (1, Some(1)), (1, Some(3)), (2, None), (3, None), (4, None)]);
162 /// ```
163 pub fn left_join<Out2: ExchangeData, OperatorChain2, Key, Keyer1, Keyer2>(
164 self,
165 rhs: Stream<OperatorChain2>,
166 keyer1: Keyer1,
167 keyer2: Keyer2,
168 ) -> KeyedStream<impl Operator<Out = (Key, LeftJoinTuple<Out, Out2>)>>
169 where
170 Key: DataKey,
171 OperatorChain2: Operator<Out = Out2> + 'static,
172 Keyer1: Fn(&Out) -> Key + KeyerFn<Key, Out>,
173 Keyer2: Fn(&Out2) -> Key + KeyerFn<Key, Out2>,
174 {
175 self.join_with(rhs, keyer1, keyer2)
176 .ship_hash()
177 .local_hash()
178 .left()
179 }
180
181 /// Given two stream, create a stream with all the pairs (left item from the left stream, right
182 /// item from the right), such that the key obtained with `keyer1` on an item from the left is
183 /// equal to the key obtained with `keyer2` on an item from the right.
184 ///
185 /// This is a **full-outer** join, meaning that if an item from the left does not find and element
186 /// from the right with which make a pair, an extra pair `(left, None)` is generated. Similarly
187 /// if an element from the right does not appear in any pair, a new one is generated with
188 /// `(None, right)`.
189 ///
190 /// This is very similar to `SELECT a, b FROM a FULL OUTER JOIN b ON keyer1(a) = keyer2(b)`.
191 ///
192 /// This is a shortcut for: `self.join_with(...).ship_hash().local_hash().outer()`.
193 ///
194 /// **Note**: this operator will split the current block.
195 ///
196 /// ## Example
197 ///
198 /// ```
199 /// # use noir_compute::{StreamContext, RuntimeConfig};
200 /// # use noir_compute::operator::source::IteratorSource;
201 /// # let mut env = StreamContext::new(RuntimeConfig::local(1));
202 /// let s1 = env.stream_iter(0..5u8);
203 /// let s2 = env.stream_iter(0..5i32);
204 /// let res = s1.outer_join(s2, |n| (n % 5) as i32, |n| n % 2).drop_key().collect_vec();
205 ///
206 /// env.execute_blocking();
207 ///
208 /// let mut res = res.get().unwrap();
209 /// res.sort_unstable();
210 /// assert_eq!(res, vec![(Some(0), Some(0)), (Some(0), Some(2)), (Some(0), Some(4)), (Some(1), Some(1)), (Some(1), Some(3)), (Some(2), None), (Some(3), None), (Some(4), None)]);
211 /// ```
212 pub fn outer_join<Out2: ExchangeData, OperatorChain2, Key, Keyer1, Keyer2>(
213 self,
214 rhs: Stream<OperatorChain2>,
215 keyer1: Keyer1,
216 keyer2: Keyer2,
217 ) -> KeyedStream<impl Operator<Out = (Key, OuterJoinTuple<Out, Out2>)>>
218 where
219 Key: DataKey,
220 OperatorChain2: Operator<Out = Out2> + 'static,
221 Keyer1: Fn(&Out) -> Key + KeyerFn<Key, Out>,
222 Keyer2: Fn(&Out2) -> Key + KeyerFn<Key, Out2>,
223 {
224 self.join_with(rhs, keyer1, keyer2)
225 .ship_hash()
226 .local_hash()
227 .outer()
228 }
229
230 /// Given two streams, start building a join operator.
231 ///
232 /// The returned type allows you to customize the behaviour of the join. You can select which
233 /// ship strategy and which local strategy to use.
234 ///
235 /// **Ship strategies**
236 ///
237 /// - _hash_: the hash of the key is used to select where to send the elements
238 /// - _broadcast right_: the left stream is left locally, while the right stream is broadcasted
239 ///
240 /// **Local strategies**
241 ///
242 /// - _hash_: build an hashmap to match the tuples
243 /// - _sort and merge_: collect all the tuples, sort them by key and merge them
244 ///
245 /// The first strategy to select is the _ship strategy_. After choosing that you have to select
246 /// the local strategy.
247 ///
248 /// ## Example
249 ///
250 /// ```
251 /// # use noir_compute::{StreamContext, RuntimeConfig};
252 /// # use noir_compute::operator::source::IteratorSource;
253 /// # let mut env = StreamContext::new(RuntimeConfig::local(1));
254 /// let s1 = env.stream_iter(0..5u8);
255 /// let s2 = env.stream_iter(0..5i32);
256 /// let j = s1.join_with(s2, |n| (n % 5) as i32, |n| n % 2).ship_hash();
257 /// ```
258 ///
259 /// ```
260 /// # use noir_compute::{StreamContext, RuntimeConfig};
261 /// # use noir_compute::operator::source::IteratorSource;
262 /// # let mut env = StreamContext::new(RuntimeConfig::local(1));
263 /// let s1 = env.stream_iter(0..5u8);
264 /// let s2 = env.stream_iter(0..5i32);
265 /// let j = s1.join_with(s2, |n| (n % 5) as i32, |n| n % 2).ship_broadcast_right();
266 /// ```
267 pub fn join_with<Out2: ExchangeData, OperatorChain2, Key, Keyer1, Keyer2>(
268 self,
269 rhs: Stream<OperatorChain2>,
270 keyer1: Keyer1,
271 keyer2: Keyer2,
272 ) -> JoinStream<Key, Out, Out2, OperatorChain, OperatorChain2, Keyer1, Keyer2>
273 where
274 OperatorChain2: Operator<Out = Out2>,
275 Keyer1: Fn(&Out) -> Key + KeyerFn<Key, Out>,
276 Keyer2: Fn(&Out2) -> Key + KeyerFn<Key, Out2>,
277 {
278 JoinStream {
279 lhs: self,
280 rhs,
281 keyer1,
282 keyer2,
283 _key: PhantomData,
284 }
285 }
286}
287
288impl<
289 Key: Data,
290 Out1: ExchangeData,
291 Out2: ExchangeData,
292 OperatorChain1,
293 OperatorChain2,
294 Keyer1,
295 Keyer2,
296 > JoinStream<Key, Out1, Out2, OperatorChain1, OperatorChain2, Keyer1, Keyer2>
297where
298 OperatorChain1: Operator<Out = Out1> + 'static,
299 OperatorChain2: Operator<Out = Out2> + 'static,
300 Keyer1: KeyerFn<Key, Out1>,
301 Keyer2: KeyerFn<Key, Out2>,
302{
303 /// Use the Hash Repartition strategy.
304 ///
305 /// With this strategy the two streams are shuffled (like a group-by), pointing the message with
306 /// the same key to the same replica. The key must be hashable.
307 pub fn ship_hash(self) -> JoinStreamShipHash<Key, Out1, Out2, Keyer1, Keyer2>
308 where
309 Key: DataKey,
310 {
311 JoinStreamShipHash::new(self)
312 }
313
314 /// Use the Broadcast-Forward strategy.
315 ///
316 /// The left side won't be sent to the network, while the right side is broadcasted. This is
317 /// recommended when the left side is really big and the left side really small.
318 ///
319 /// This does not require the key to be hashable.
320 pub fn ship_broadcast_right(
321 self,
322 ) -> JoinStreamShipBroadcastRight<Key, Out1, Out2, Keyer1, Keyer2> {
323 JoinStreamShipBroadcastRight::new(self)
324 }
325}