1use std::fmt::{self, Debug};
23use std::ops::Sub;
24
25use arrow::datatypes::ArrowNativeType;
26use hashbrown::HashTable;
27use hashbrown::hash_table::Entry::{Occupied, Vacant};
28
29pub trait JoinHashMapType: Send + Sync {
104 fn extend_zero(&mut self, len: usize);
105
106 fn update_from_iter<'a>(
107 &mut self,
108 iter: Box<dyn Iterator<Item = (usize, &'a u64)> + Send + 'a>,
109 deleted_offset: usize,
110 );
111
112 fn get_matched_indices<'a>(
113 &self,
114 iter: Box<dyn Iterator<Item = (usize, &'a u64)> + 'a>,
115 deleted_offset: Option<usize>,
116 ) -> (Vec<u32>, Vec<u64>);
117
118 fn get_matched_indices_with_limit_offset(
119 &self,
120 hash_values: &[u64],
121 limit: usize,
122 offset: JoinHashMapOffset,
123 input_indices: &mut Vec<u32>,
124 match_indices: &mut Vec<u64>,
125 ) -> Option<JoinHashMapOffset>;
126
127 fn is_empty(&self) -> bool;
129
130 fn len(&self) -> usize;
132}
133
134pub struct JoinHashMapU32 {
135 map: HashTable<(u64, u32)>,
137 next: Vec<u32>,
139}
140
141impl JoinHashMapU32 {
142 #[cfg(test)]
143 pub(crate) fn new(map: HashTable<(u64, u32)>, next: Vec<u32>) -> Self {
144 Self { map, next }
145 }
146
147 pub fn with_capacity(cap: usize) -> Self {
148 Self {
149 map: HashTable::with_capacity(cap),
150 next: vec![0; cap],
151 }
152 }
153}
154
155impl Debug for JoinHashMapU32 {
156 fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result {
157 Ok(())
158 }
159}
160
161impl JoinHashMapType for JoinHashMapU32 {
162 fn extend_zero(&mut self, _: usize) {}
163
164 fn update_from_iter<'a>(
165 &mut self,
166 iter: Box<dyn Iterator<Item = (usize, &'a u64)> + Send + 'a>,
167 deleted_offset: usize,
168 ) {
169 update_from_iter::<u32>(&mut self.map, &mut self.next, iter, deleted_offset);
170 }
171
172 fn get_matched_indices<'a>(
173 &self,
174 iter: Box<dyn Iterator<Item = (usize, &'a u64)> + 'a>,
175 deleted_offset: Option<usize>,
176 ) -> (Vec<u32>, Vec<u64>) {
177 get_matched_indices::<u32>(&self.map, &self.next, iter, deleted_offset)
178 }
179
180 fn get_matched_indices_with_limit_offset(
181 &self,
182 hash_values: &[u64],
183 limit: usize,
184 offset: JoinHashMapOffset,
185 input_indices: &mut Vec<u32>,
186 match_indices: &mut Vec<u64>,
187 ) -> Option<JoinHashMapOffset> {
188 get_matched_indices_with_limit_offset::<u32>(
189 &self.map,
190 &self.next,
191 hash_values,
192 limit,
193 offset,
194 input_indices,
195 match_indices,
196 )
197 }
198
199 fn is_empty(&self) -> bool {
200 self.map.is_empty()
201 }
202
203 fn len(&self) -> usize {
204 self.map.len()
205 }
206}
207
208pub struct JoinHashMapU64 {
209 map: HashTable<(u64, u64)>,
211 next: Vec<u64>,
213}
214
215impl JoinHashMapU64 {
216 #[cfg(test)]
217 pub(crate) fn new(map: HashTable<(u64, u64)>, next: Vec<u64>) -> Self {
218 Self { map, next }
219 }
220
221 pub fn with_capacity(cap: usize) -> Self {
222 Self {
223 map: HashTable::with_capacity(cap),
224 next: vec![0; cap],
225 }
226 }
227}
228
229impl Debug for JoinHashMapU64 {
230 fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result {
231 Ok(())
232 }
233}
234
235impl JoinHashMapType for JoinHashMapU64 {
236 fn extend_zero(&mut self, _: usize) {}
237
238 fn update_from_iter<'a>(
239 &mut self,
240 iter: Box<dyn Iterator<Item = (usize, &'a u64)> + Send + 'a>,
241 deleted_offset: usize,
242 ) {
243 update_from_iter::<u64>(&mut self.map, &mut self.next, iter, deleted_offset);
244 }
245
246 fn get_matched_indices<'a>(
247 &self,
248 iter: Box<dyn Iterator<Item = (usize, &'a u64)> + 'a>,
249 deleted_offset: Option<usize>,
250 ) -> (Vec<u32>, Vec<u64>) {
251 get_matched_indices::<u64>(&self.map, &self.next, iter, deleted_offset)
252 }
253
254 fn get_matched_indices_with_limit_offset(
255 &self,
256 hash_values: &[u64],
257 limit: usize,
258 offset: JoinHashMapOffset,
259 input_indices: &mut Vec<u32>,
260 match_indices: &mut Vec<u64>,
261 ) -> Option<JoinHashMapOffset> {
262 get_matched_indices_with_limit_offset::<u64>(
263 &self.map,
264 &self.next,
265 hash_values,
266 limit,
267 offset,
268 input_indices,
269 match_indices,
270 )
271 }
272
273 fn is_empty(&self) -> bool {
274 self.map.is_empty()
275 }
276
277 fn len(&self) -> usize {
278 self.map.len()
279 }
280}
281
282pub(crate) type JoinHashMapOffset = (usize, Option<u64>);
284
285#[inline(always)]
289fn traverse_chain<T>(
290 next_chain: &[T],
291 input_idx: usize,
292 start_chain_idx: T,
293 remaining: &mut usize,
294 input_indices: &mut Vec<u32>,
295 match_indices: &mut Vec<u64>,
296 is_last_input: bool,
297) -> Option<JoinHashMapOffset>
298where
299 T: Copy + TryFrom<usize> + PartialOrd + Into<u64> + Sub<Output = T>,
300 <T as TryFrom<usize>>::Error: Debug,
301 T: ArrowNativeType,
302{
303 let zero = T::usize_as(0);
304 let one = T::usize_as(1);
305 let mut match_row_idx = start_chain_idx - one;
306
307 loop {
308 match_indices.push(match_row_idx.into());
309 input_indices.push(input_idx as u32);
310 *remaining -= 1;
311
312 let next = next_chain[match_row_idx.into() as usize];
313
314 if *remaining == 0 {
315 return if is_last_input && next == zero {
317 None
319 } else {
320 Some((input_idx, Some(next.into())))
321 };
322 }
323 if next == zero {
324 return None;
326 }
327 match_row_idx = next - one;
328 }
329}
330
331pub fn update_from_iter<'a, T>(
332 map: &mut HashTable<(u64, T)>,
333 next: &mut [T],
334 iter: Box<dyn Iterator<Item = (usize, &'a u64)> + Send + 'a>,
335 deleted_offset: usize,
336) where
337 T: Copy + TryFrom<usize> + PartialOrd,
338 <T as TryFrom<usize>>::Error: Debug,
339{
340 for (row, &hash_value) in iter {
341 let entry = map.entry(
342 hash_value,
343 |&(hash, _)| hash_value == hash,
344 |&(hash, _)| hash,
345 );
346
347 match entry {
348 Occupied(mut occupied_entry) => {
349 let (_, index) = occupied_entry.get_mut();
351 let prev_index = *index;
352 *index = T::try_from(row + 1).unwrap();
354 next[row - deleted_offset] = prev_index;
356 }
357 Vacant(vacant_entry) => {
358 vacant_entry.insert((hash_value, T::try_from(row + 1).unwrap()));
359 }
360 }
361 }
362}
363
364pub fn get_matched_indices<'a, T>(
365 map: &HashTable<(u64, T)>,
366 next: &[T],
367 iter: Box<dyn Iterator<Item = (usize, &'a u64)> + 'a>,
368 deleted_offset: Option<usize>,
369) -> (Vec<u32>, Vec<u64>)
370where
371 T: Copy + TryFrom<usize> + PartialOrd + Into<u64> + Sub<Output = T>,
372 <T as TryFrom<usize>>::Error: Debug,
373{
374 let mut input_indices = vec![];
375 let mut match_indices = vec![];
376 let zero = T::try_from(0).unwrap();
377 let one = T::try_from(1).unwrap();
378
379 for (row_idx, hash_value) in iter {
380 if let Some((_, index)) = map.find(*hash_value, |(hash, _)| *hash_value == *hash)
382 {
383 let mut i = *index - one;
384 loop {
385 let match_row_idx = if let Some(offset) = deleted_offset {
386 let offset = T::try_from(offset).unwrap();
387 if i < offset {
389 break;
391 }
392 i - offset
393 } else {
394 i
395 };
396 match_indices.push(match_row_idx.into());
397 input_indices.push(row_idx as u32);
398 let next_chain = next[match_row_idx.into() as usize];
400 if next_chain == zero {
401 break;
403 }
404 i = next_chain - one;
405 }
406 }
407 }
408
409 (input_indices, match_indices)
410}
411
412pub fn get_matched_indices_with_limit_offset<T>(
413 map: &HashTable<(u64, T)>,
414 next_chain: &[T],
415 hash_values: &[u64],
416 limit: usize,
417 offset: JoinHashMapOffset,
418 input_indices: &mut Vec<u32>,
419 match_indices: &mut Vec<u64>,
420) -> Option<JoinHashMapOffset>
421where
422 T: Copy + TryFrom<usize> + PartialOrd + Into<u64> + Sub<Output = T>,
423 <T as TryFrom<usize>>::Error: Debug,
424 T: ArrowNativeType,
425{
426 input_indices.clear();
428 match_indices.clear();
429 let one = T::try_from(1).unwrap();
430
431 if map.len() == next_chain.len() {
434 let start = offset.0;
435 let end = (start + limit).min(hash_values.len());
436 for (i, &hash) in hash_values[start..end].iter().enumerate() {
437 if let Some((_, idx)) = map.find(hash, |(h, _)| hash == *h) {
438 input_indices.push(start as u32 + i as u32);
439 match_indices.push((*idx - one).into());
440 }
441 }
442 return if end == hash_values.len() {
443 None
444 } else {
445 Some((end, None))
446 };
447 }
448
449 let mut remaining_output = limit;
450
451 let to_skip = match offset {
453 (idx, None) => idx,
455 (idx, Some(0)) => idx + 1,
458 (idx, Some(next_idx)) => {
461 let next_idx: T = T::usize_as(next_idx as usize);
462 let is_last = idx == hash_values.len() - 1;
463 if let Some(next_offset) = traverse_chain(
464 next_chain,
465 idx,
466 next_idx,
467 &mut remaining_output,
468 input_indices,
469 match_indices,
470 is_last,
471 ) {
472 return Some(next_offset);
473 }
474 idx + 1
475 }
476 };
477
478 let hash_values_len = hash_values.len();
479 for (i, &hash) in hash_values[to_skip..].iter().enumerate() {
480 let row_idx = to_skip + i;
481 if let Some((_, idx)) = map.find(hash, |(h, _)| hash == *h) {
482 let idx: T = *idx;
483 let is_last = row_idx == hash_values_len - 1;
484 if let Some(next_offset) = traverse_chain(
485 next_chain,
486 row_idx,
487 idx,
488 &mut remaining_output,
489 input_indices,
490 match_indices,
491 is_last,
492 ) {
493 return Some(next_offset);
494 }
495 }
496 }
497 None
498}