1#![allow(clippy::type_complexity)]
2
3use std::collections::{HashMap, HashSet, VecDeque};
4use std::fmt::Display;
5use std::marker::PhantomData;
6
7use crate::block::{BlockStructure, OperatorStructure};
8use crate::network::Coord;
9use crate::operator::join::ship::{ShipBroadcastRight, ShipHash, ShipStrategy};
10use crate::operator::join::{InnerJoinTuple, JoinVariant, LeftJoinTuple, OuterJoinTuple};
11use crate::operator::start::{BinaryElement, BinaryStartOperator};
12use crate::operator::{DataKey, ExchangeData, KeyerFn, Operator, StreamElement};
13use crate::scheduler::ExecutionMetadata;
14use crate::stream::{KeyedStream, Stream};
15
16#[derive(Debug, Clone)]
18struct SideHashMap<Key: DataKey, Out> {
19 data: HashMap<Key, Vec<Out>, crate::block::GroupHasherBuilder>,
23 keys: HashSet<Key>,
27 ended: bool,
29 count: usize,
31}
32
33impl<Key: DataKey, Out> Default for SideHashMap<Key, Out> {
34 fn default() -> Self {
35 Self {
36 data: Default::default(),
37 keys: Default::default(),
38 ended: false,
39 count: 0,
40 }
41 }
42}
43
44#[derive(Clone, Debug)]
49struct JoinLocalHash<
50 Key: DataKey,
51 Out1: ExchangeData,
52 Out2: ExchangeData,
53 Keyer1: KeyerFn<Key, Out1>,
54 Keyer2: KeyerFn<Key, Out2>,
55 OperatorChain: Operator<Out = BinaryElement<Out1, Out2>>,
56> {
57 prev: OperatorChain,
58 coord: Coord,
59
60 left: SideHashMap<Key, Out1>,
62 right: SideHashMap<Key, Out2>,
64
65 keyer1: Keyer1,
66 keyer2: Keyer2,
67
68 variant: JoinVariant,
73 buffer: VecDeque<(Key, OuterJoinTuple<Out1, Out2>)>,
75}
76
77impl<
78 Key: DataKey,
79 Out1: ExchangeData,
80 Out2: ExchangeData,
81 Keyer1: KeyerFn<Key, Out1>,
82 Keyer2: KeyerFn<Key, Out2>,
83 OperatorChain: Operator<Out = BinaryElement<Out1, Out2>>,
84 > Display for JoinLocalHash<Key, Out1, Out2, Keyer1, Keyer2, OperatorChain>
85{
86 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87 write!(
88 f,
89 "{} -> JoinLocalHash<{}>",
90 self.prev,
91 std::any::type_name::<Key>()
92 )
93 }
94}
95
96impl<
97 Key: DataKey,
98 Out1: ExchangeData,
99 Out2: ExchangeData,
100 Keyer1: KeyerFn<Key, Out1>,
101 Keyer2: KeyerFn<Key, Out2>,
102 OperatorChain: Operator<Out = BinaryElement<Out1, Out2>>,
103 > JoinLocalHash<Key, Out1, Out2, Keyer1, Keyer2, OperatorChain>
104{
105 fn new(prev: OperatorChain, variant: JoinVariant, keyer1: Keyer1, keyer2: Keyer2) -> Self {
106 Self {
107 prev,
108 coord: Default::default(),
109 left: Default::default(),
110 right: Default::default(),
111 keyer1,
112 keyer2,
113 variant,
114 buffer: Default::default(),
115 }
116 }
117
118 fn add_item<OutL: ExchangeData, OutR: ExchangeData>(
122 (key, item): (Key, OutL),
123 left: &mut SideHashMap<Key, OutL>,
124 right: &mut SideHashMap<Key, OutR>,
125 left_outer: bool,
126 right_outer: bool,
127 buffer: &mut VecDeque<(Key, OuterJoinTuple<Out1, Out2>)>,
128 make_pair: impl Fn(Option<OutL>, Option<OutR>) -> OuterJoinTuple<Out1, Out2>,
129 ) {
130 left.count += 1;
131 if let Some(right) = right.data.get(&key) {
132 for rhs in right {
134 buffer.push_back((
135 key.clone(),
136 make_pair(Some(item.clone()), Some(rhs.clone())),
137 ));
138 }
139 } else if right.ended && left_outer {
140 buffer.push_back((key.clone(), make_pair(Some(item.clone()), None)));
143 } else {
144 }
147 if right_outer {
148 left.keys.insert(key.clone());
149 }
150 if !right.ended {
151 left.data.entry(key).or_default().push(item);
152 }
153 }
154
155 fn side_ended<OutL, OutR>(
159 right_outer: bool,
160 left: &mut SideHashMap<Key, OutL>,
161 right: &mut SideHashMap<Key, OutR>,
162 buffer: &mut VecDeque<(Key, OuterJoinTuple<Out1, Out2>)>,
163 make_pair: impl Fn(Option<OutL>, Option<OutR>) -> OuterJoinTuple<Out1, Out2>,
164 ) {
165 if right_outer {
166 for (key, right) in right.data.drain() {
170 if !left.keys.contains(&key) {
171 for rhs in right {
172 buffer.push_back((key.clone(), make_pair(None, Some(rhs))));
173 }
174 }
175 }
176 } else {
177 right.data.clear();
179 }
180 left.keys.clear();
182 left.ended = true;
183 }
184}
185
186impl<
187 Key: DataKey,
188 Out1: ExchangeData,
189 Out2: ExchangeData,
190 Keyer1: KeyerFn<Key, Out1>,
191 Keyer2: KeyerFn<Key, Out2>,
192 OperatorChain: Operator<Out = BinaryElement<Out1, Out2>>,
193 > Operator for JoinLocalHash<Key, Out1, Out2, Keyer1, Keyer2, OperatorChain>
194{
195 type Out = (Key, OuterJoinTuple<Out1, Out2>);
196
197 fn setup(&mut self, metadata: &mut ExecutionMetadata) {
198 self.coord = metadata.coord;
199 self.prev.setup(metadata);
200 }
201
202 fn next(&mut self) -> StreamElement<(Key, OuterJoinTuple<Out1, Out2>)> {
203 while self.buffer.is_empty() {
204 match self.prev.next() {
205 StreamElement::Item(BinaryElement::Left(item)) => Self::add_item(
206 ((self.keyer1)(&item), item),
207 &mut self.left,
208 &mut self.right,
209 self.variant.left_outer(),
210 self.variant.right_outer(),
211 &mut self.buffer,
212 |x, y| (x, y),
213 ),
214 StreamElement::Item(BinaryElement::Right(item)) => Self::add_item(
215 ((self.keyer2)(&item), item),
216 &mut self.right,
217 &mut self.left,
218 self.variant.right_outer(),
219 self.variant.left_outer(),
220 &mut self.buffer,
221 |x, y| (y, x),
222 ),
223 StreamElement::Item(BinaryElement::LeftEnd) => {
224 log::debug!(
225 "Left side of join ended with {} elements on the left \
226 and {} elements on the right",
227 self.left.count,
228 self.right.count
229 );
230 Self::side_ended(
231 self.variant.right_outer(),
232 &mut self.left,
233 &mut self.right,
234 &mut self.buffer,
235 |x, y| (x, y),
236 )
237 }
238 StreamElement::Item(BinaryElement::RightEnd) => {
239 log::debug!(
240 "Right side of join ended with {} elements on the left \
241 and {} elements on the right",
242 self.left.count,
243 self.right.count
244 );
245 Self::side_ended(
246 self.variant.left_outer(),
247 &mut self.right,
248 &mut self.left,
249 &mut self.buffer,
250 |x, y| (y, x),
251 )
252 }
253 StreamElement::FlushAndRestart => {
254 assert!(self.left.ended);
255 assert!(self.right.ended);
256 assert!(self.left.data.is_empty());
257 assert!(self.right.data.is_empty());
258 assert!(self.left.keys.is_empty());
259 assert!(self.right.keys.is_empty());
260 self.left.ended = false;
261 self.left.count = 0;
262 self.right.ended = false;
263 self.right.count = 0;
264 log::debug!("JoinLocalHash at {} emitted FlushAndRestart", self.coord);
265 return StreamElement::FlushAndRestart;
266 }
267 StreamElement::Terminate => return StreamElement::Terminate,
268 StreamElement::FlushBatch => return StreamElement::FlushBatch,
269 StreamElement::Watermark(_) | StreamElement::Timestamped(_, _) => {
270 panic!("Cannot yet join timestamped streams")
271 }
272 }
273 }
274
275 let item = self.buffer.pop_front().unwrap();
276 StreamElement::Item(item)
277 }
278
279 fn structure(&self) -> BlockStructure {
280 self.prev
281 .structure()
282 .add_operator(
283 OperatorStructure::new::<(Key, OuterJoinTuple<Out1, Out2>), _>("JoinLocalHash"),
284 )
285 }
286}
287
288pub struct JoinStreamLocalHash<
295 Key: DataKey,
296 Out1: ExchangeData,
297 Out2: ExchangeData,
298 Keyer1: KeyerFn<Key, Out1>,
299 Keyer2: KeyerFn<Key, Out2>,
300 ShipStrat: ShipStrategy,
301> {
302 stream: Stream<BinaryStartOperator<Out1, Out2>>,
303 keyer1: Keyer1,
304 keyer2: Keyer2,
305 _key: PhantomData<Key>,
306 _s: PhantomData<ShipStrat>,
307}
308
309impl<
310 Key: DataKey,
311 Out1: ExchangeData,
312 Out2: ExchangeData,
313 Keyer1,
314 Keyer2,
315 ShipStrat: ShipStrategy,
316 > JoinStreamLocalHash<Key, Out1, Out2, Keyer1, Keyer2, ShipStrat>
317where
318 Keyer1: KeyerFn<Key, Out1>,
319 Keyer2: KeyerFn<Key, Out2>,
320{
321 pub(crate) fn new(
322 stream: Stream<BinaryStartOperator<Out1, Out2>>,
323 keyer1: Keyer1,
324 keyer2: Keyer2,
325 ) -> Self {
326 Self {
327 stream,
328 keyer1,
329 keyer2,
330 _key: Default::default(),
331 _s: Default::default(),
332 }
333 }
334}
335
336impl<Key: DataKey, Out1: ExchangeData, Out2: ExchangeData, Keyer1, Keyer2>
337 JoinStreamLocalHash<Key, Out1, Out2, Keyer1, Keyer2, ShipHash>
338where
339 Keyer1: KeyerFn<Key, Out1>,
340 Keyer2: KeyerFn<Key, Out2>,
341{
342 pub fn inner(self) -> KeyedStream<impl Operator<Out = (Key, InnerJoinTuple<Out1, Out2>)>> {
352 let keyer1 = self.keyer1;
353 let keyer2 = self.keyer2;
354 let inner = self
355 .stream
356 .add_operator(|prev| JoinLocalHash::new(prev, JoinVariant::Inner, keyer1, keyer2));
357 KeyedStream(inner).map(|(_key, (lhs, rhs))| (lhs.unwrap(), rhs.unwrap()))
358 }
359
360 pub fn left(self) -> KeyedStream<impl Operator<Out = (Key, LeftJoinTuple<Out1, Out2>)>> {
374 let keyer1 = self.keyer1;
375 let keyer2 = self.keyer2;
376 let inner = self
377 .stream
378 .add_operator(|prev| JoinLocalHash::new(prev, JoinVariant::Left, keyer1, keyer2));
379 KeyedStream(inner).map(|(_key, (lhs, rhs))| (lhs.unwrap(), rhs))
380 }
381
382 pub fn outer(self) -> KeyedStream<impl Operator<Out = (Key, OuterJoinTuple<Out1, Out2>)>> {
397 let keyer1 = self.keyer1;
398 let keyer2 = self.keyer2;
399 let inner = self
400 .stream
401 .add_operator(|prev| JoinLocalHash::new(prev, JoinVariant::Outer, keyer1, keyer2));
402 KeyedStream(inner)
403 }
404}
405
406impl<Key: DataKey, Out1: ExchangeData, Out2: ExchangeData, Keyer1, Keyer2>
407 JoinStreamLocalHash<Key, Out1, Out2, Keyer1, Keyer2, ShipBroadcastRight>
408where
409 Keyer1: KeyerFn<Key, Out1>,
410 Keyer2: KeyerFn<Key, Out2>,
411{
412 pub fn inner(self) -> Stream<impl Operator<Out = (Key, InnerJoinTuple<Out1, Out2>)>> {
422 let keyer1 = self.keyer1;
423 let keyer2 = self.keyer2;
424 self.stream
425 .add_operator(|prev| JoinLocalHash::new(prev, JoinVariant::Inner, keyer1, keyer2))
426 .map(|(key, (lhs, rhs))| (key, (lhs.unwrap(), rhs.unwrap())))
427 }
428
429 pub fn left(self) -> Stream<impl Operator<Out = (Key, LeftJoinTuple<Out1, Out2>)>> {
443 let keyer1 = self.keyer1;
444 let keyer2 = self.keyer2;
445 self.stream
446 .add_operator(|prev| JoinLocalHash::new(prev, JoinVariant::Left, keyer1, keyer2))
447 .map(|(key, (lhs, rhs))| (key, (lhs.unwrap(), rhs)))
448 }
449}