Skip to main content

datafusion_physical_plan/joins/
join_hash_map.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This file contains the implementation of the `JoinHashMap` struct, which
19//! is used to store the mapping between hash values based on the build side
20//! ["on" values] to a list of indices with this key's value.
21
22use std::fmt::{self, Debug};
23use std::ops::Sub;
24
25use arrow::datatypes::ArrowNativeType;
26use hashbrown::HashTable;
27use hashbrown::hash_table::Entry::{Occupied, Vacant};
28
29/// Maps a `u64` hash value based on the build side ["on" values] to a list of indices with this key's value.
30///
31/// By allocating a `HashMap` with capacity for *at least* the number of rows for entries at the build side,
32/// we make sure that we don't have to re-hash the hashmap, which needs access to the key (the hash in this case) value.
33///
34/// E.g. 1 -> [3, 6, 8] indicates that the column values map to rows 3, 6 and 8 for hash value 1
35/// As the key is a hash value, we need to check possible hash collisions in the probe stage
36/// During this stage it might be the case that a row is contained the same hashmap value,
37/// but the values don't match. Those are checked in the `equal_rows_arr` method.
38///
39/// The indices (values) are stored in a separate chained list stored as `Vec<u32>` or `Vec<u64>`.
40///
41/// The first value (+1) is stored in the hashmap, whereas the next value is stored in array at the position value.
42///
43/// The chain can be followed until the value "0" has been reached, meaning the end of the list.
44/// Also see chapter 5.3 of [Balancing vectorized query execution with bandwidth-optimized storage](https://dare.uva.nl/search?identifier=5ccbb60a-38b8-4eeb-858a-e7735dd37487)
45///
46/// # Example
47///
48/// ``` text
49/// See the example below:
50///
51/// Insert (10,1)            <-- insert hash value 10 with row index 1
52/// map:
53/// ----------
54/// | 10 | 2 |
55/// ----------
56/// next:
57/// ---------------------
58/// | 0 | 0 | 0 | 0 | 0 |
59/// ---------------------
60/// Insert (20,2)
61/// map:
62/// ----------
63/// | 10 | 2 |
64/// | 20 | 3 |
65/// ----------
66/// next:
67/// ---------------------
68/// | 0 | 0 | 0 | 0 | 0 |
69/// ---------------------
70/// Insert (10,3)           <-- collision! row index 3 has a hash value of 10 as well
71/// map:
72/// ----------
73/// | 10 | 4 |
74/// | 20 | 3 |
75/// ----------
76/// next:
77/// ---------------------
78/// | 0 | 0 | 0 | 2 | 0 |  <--- hash value 10 maps to 4,2 (which means indices values 3,1)
79/// ---------------------
80/// Insert (10,4)          <-- another collision! row index 4 ALSO has a hash value of 10
81/// map:
82/// ---------
83/// | 10 | 5 |
84/// | 20 | 3 |
85/// ---------
86/// next:
87/// ---------------------
88/// | 0 | 0 | 0 | 2 | 4 | <--- hash value 10 maps to 5,4,2 (which means indices values 4,3,1)
89/// ---------------------
90/// ```
91///
92/// Here we have an option between creating a `JoinHashMapType` using `u32` or `u64` indices
93/// based on how many rows were being used for indices.
94///
95/// At runtime we choose between using `JoinHashMapU32` and `JoinHashMapU64` which oth implement
96/// `JoinHashMapType`.
97///
98/// ## Note on use of this trait as a public API
99/// This is currently a public trait but is mainly intended for internal use within DataFusion.
100/// For example, we may compare references to `JoinHashMapType` implementations by pointer equality
101/// rather than deep equality of contents, as deep equality would be expensive and in our usage
102/// patterns it is impossible for two different hash maps to have identical contents in a practical sense.
103pub trait JoinHashMapType: Send + Sync {
104    fn extend_zero(&mut self, len: usize);
105
106    fn update_from_iter<'a>(
107        &mut self,
108        iter: Box<dyn Iterator<Item = (usize, &'a u64)> + Send + 'a>,
109        deleted_offset: usize,
110    );
111
112    fn get_matched_indices<'a>(
113        &self,
114        iter: Box<dyn Iterator<Item = (usize, &'a u64)> + 'a>,
115        deleted_offset: Option<usize>,
116    ) -> (Vec<u32>, Vec<u64>);
117
118    fn get_matched_indices_with_limit_offset(
119        &self,
120        hash_values: &[u64],
121        limit: usize,
122        offset: JoinHashMapOffset,
123        input_indices: &mut Vec<u32>,
124        match_indices: &mut Vec<u64>,
125    ) -> Option<JoinHashMapOffset>;
126
127    /// Returns `true` if the join hash map contains no entries.
128    fn is_empty(&self) -> bool;
129
130    /// Returns the number of entries in the join hash map.
131    fn len(&self) -> usize;
132}
133
134pub struct JoinHashMapU32 {
135    // Stores hash value to last row index
136    map: HashTable<(u64, u32)>,
137    // Stores indices in chained list data structure
138    next: Vec<u32>,
139}
140
141impl JoinHashMapU32 {
142    #[cfg(test)]
143    pub(crate) fn new(map: HashTable<(u64, u32)>, next: Vec<u32>) -> Self {
144        Self { map, next }
145    }
146
147    pub fn with_capacity(cap: usize) -> Self {
148        Self {
149            map: HashTable::with_capacity(cap),
150            next: vec![0; cap],
151        }
152    }
153}
154
155impl Debug for JoinHashMapU32 {
156    fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result {
157        Ok(())
158    }
159}
160
161impl JoinHashMapType for JoinHashMapU32 {
162    fn extend_zero(&mut self, _: usize) {}
163
164    fn update_from_iter<'a>(
165        &mut self,
166        iter: Box<dyn Iterator<Item = (usize, &'a u64)> + Send + 'a>,
167        deleted_offset: usize,
168    ) {
169        update_from_iter::<u32>(&mut self.map, &mut self.next, iter, deleted_offset);
170    }
171
172    fn get_matched_indices<'a>(
173        &self,
174        iter: Box<dyn Iterator<Item = (usize, &'a u64)> + 'a>,
175        deleted_offset: Option<usize>,
176    ) -> (Vec<u32>, Vec<u64>) {
177        get_matched_indices::<u32>(&self.map, &self.next, iter, deleted_offset)
178    }
179
180    fn get_matched_indices_with_limit_offset(
181        &self,
182        hash_values: &[u64],
183        limit: usize,
184        offset: JoinHashMapOffset,
185        input_indices: &mut Vec<u32>,
186        match_indices: &mut Vec<u64>,
187    ) -> Option<JoinHashMapOffset> {
188        get_matched_indices_with_limit_offset::<u32>(
189            &self.map,
190            &self.next,
191            hash_values,
192            limit,
193            offset,
194            input_indices,
195            match_indices,
196        )
197    }
198
199    fn is_empty(&self) -> bool {
200        self.map.is_empty()
201    }
202
203    fn len(&self) -> usize {
204        self.map.len()
205    }
206}
207
208pub struct JoinHashMapU64 {
209    // Stores hash value to last row index
210    map: HashTable<(u64, u64)>,
211    // Stores indices in chained list data structure
212    next: Vec<u64>,
213}
214
215impl JoinHashMapU64 {
216    #[cfg(test)]
217    pub(crate) fn new(map: HashTable<(u64, u64)>, next: Vec<u64>) -> Self {
218        Self { map, next }
219    }
220
221    pub fn with_capacity(cap: usize) -> Self {
222        Self {
223            map: HashTable::with_capacity(cap),
224            next: vec![0; cap],
225        }
226    }
227}
228
229impl Debug for JoinHashMapU64 {
230    fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result {
231        Ok(())
232    }
233}
234
235impl JoinHashMapType for JoinHashMapU64 {
236    fn extend_zero(&mut self, _: usize) {}
237
238    fn update_from_iter<'a>(
239        &mut self,
240        iter: Box<dyn Iterator<Item = (usize, &'a u64)> + Send + 'a>,
241        deleted_offset: usize,
242    ) {
243        update_from_iter::<u64>(&mut self.map, &mut self.next, iter, deleted_offset);
244    }
245
246    fn get_matched_indices<'a>(
247        &self,
248        iter: Box<dyn Iterator<Item = (usize, &'a u64)> + 'a>,
249        deleted_offset: Option<usize>,
250    ) -> (Vec<u32>, Vec<u64>) {
251        get_matched_indices::<u64>(&self.map, &self.next, iter, deleted_offset)
252    }
253
254    fn get_matched_indices_with_limit_offset(
255        &self,
256        hash_values: &[u64],
257        limit: usize,
258        offset: JoinHashMapOffset,
259        input_indices: &mut Vec<u32>,
260        match_indices: &mut Vec<u64>,
261    ) -> Option<JoinHashMapOffset> {
262        get_matched_indices_with_limit_offset::<u64>(
263            &self.map,
264            &self.next,
265            hash_values,
266            limit,
267            offset,
268            input_indices,
269            match_indices,
270        )
271    }
272
273    fn is_empty(&self) -> bool {
274        self.map.is_empty()
275    }
276
277    fn len(&self) -> usize {
278        self.map.len()
279    }
280}
281
282// Type of offsets for obtaining indices from JoinHashMap.
283pub(crate) type JoinHashMapOffset = (usize, Option<u64>);
284
285/// Traverses the chain of matching indices, collecting results up to the remaining limit.
286/// Returns `Some(offset)` if the limit was reached and there are more results to process,
287/// or `None` if the chain was fully traversed.
288#[inline(always)]
289fn traverse_chain<T>(
290    next_chain: &[T],
291    input_idx: usize,
292    start_chain_idx: T,
293    remaining: &mut usize,
294    input_indices: &mut Vec<u32>,
295    match_indices: &mut Vec<u64>,
296    is_last_input: bool,
297) -> Option<JoinHashMapOffset>
298where
299    T: Copy + TryFrom<usize> + PartialOrd + Into<u64> + Sub<Output = T>,
300    <T as TryFrom<usize>>::Error: Debug,
301    T: ArrowNativeType,
302{
303    let zero = T::usize_as(0);
304    let one = T::usize_as(1);
305    let mut match_row_idx = start_chain_idx - one;
306
307    loop {
308        match_indices.push(match_row_idx.into());
309        input_indices.push(input_idx as u32);
310        *remaining -= 1;
311
312        let next = next_chain[match_row_idx.into() as usize];
313
314        if *remaining == 0 {
315            // Limit reached - return offset for next call
316            return if is_last_input && next == zero {
317                // Finished processing the last input row
318                None
319            } else {
320                Some((input_idx, Some(next.into())))
321            };
322        }
323        if next == zero {
324            // End of chain
325            return None;
326        }
327        match_row_idx = next - one;
328    }
329}
330
331pub fn update_from_iter<'a, T>(
332    map: &mut HashTable<(u64, T)>,
333    next: &mut [T],
334    iter: Box<dyn Iterator<Item = (usize, &'a u64)> + Send + 'a>,
335    deleted_offset: usize,
336) where
337    T: Copy + TryFrom<usize> + PartialOrd,
338    <T as TryFrom<usize>>::Error: Debug,
339{
340    for (row, &hash_value) in iter {
341        let entry = map.entry(
342            hash_value,
343            |&(hash, _)| hash_value == hash,
344            |&(hash, _)| hash,
345        );
346
347        match entry {
348            Occupied(mut occupied_entry) => {
349                // Already exists: add index to next array
350                let (_, index) = occupied_entry.get_mut();
351                let prev_index = *index;
352                // Store new value inside hashmap
353                *index = T::try_from(row + 1).unwrap();
354                // Update chained Vec at `row` with previous value
355                next[row - deleted_offset] = prev_index;
356            }
357            Vacant(vacant_entry) => {
358                vacant_entry.insert((hash_value, T::try_from(row + 1).unwrap()));
359            }
360        }
361    }
362}
363
364pub fn get_matched_indices<'a, T>(
365    map: &HashTable<(u64, T)>,
366    next: &[T],
367    iter: Box<dyn Iterator<Item = (usize, &'a u64)> + 'a>,
368    deleted_offset: Option<usize>,
369) -> (Vec<u32>, Vec<u64>)
370where
371    T: Copy + TryFrom<usize> + PartialOrd + Into<u64> + Sub<Output = T>,
372    <T as TryFrom<usize>>::Error: Debug,
373{
374    let mut input_indices = vec![];
375    let mut match_indices = vec![];
376    let zero = T::try_from(0).unwrap();
377    let one = T::try_from(1).unwrap();
378
379    for (row_idx, hash_value) in iter {
380        // Get the hash and find it in the index
381        if let Some((_, index)) = map.find(*hash_value, |(hash, _)| *hash_value == *hash)
382        {
383            let mut i = *index - one;
384            loop {
385                let match_row_idx = if let Some(offset) = deleted_offset {
386                    let offset = T::try_from(offset).unwrap();
387                    // This arguments means that we prune the next index way before here.
388                    if i < offset {
389                        // End of the list due to pruning
390                        break;
391                    }
392                    i - offset
393                } else {
394                    i
395                };
396                match_indices.push(match_row_idx.into());
397                input_indices.push(row_idx as u32);
398                // Follow the chain to get the next index value
399                let next_chain = next[match_row_idx.into() as usize];
400                if next_chain == zero {
401                    // end of list
402                    break;
403                }
404                i = next_chain - one;
405            }
406        }
407    }
408
409    (input_indices, match_indices)
410}
411
412pub fn get_matched_indices_with_limit_offset<T>(
413    map: &HashTable<(u64, T)>,
414    next_chain: &[T],
415    hash_values: &[u64],
416    limit: usize,
417    offset: JoinHashMapOffset,
418    input_indices: &mut Vec<u32>,
419    match_indices: &mut Vec<u64>,
420) -> Option<JoinHashMapOffset>
421where
422    T: Copy + TryFrom<usize> + PartialOrd + Into<u64> + Sub<Output = T>,
423    <T as TryFrom<usize>>::Error: Debug,
424    T: ArrowNativeType,
425{
426    // Clear the buffer before producing new results
427    input_indices.clear();
428    match_indices.clear();
429    let one = T::try_from(1).unwrap();
430
431    // Check if hashmap consists of unique values
432    // If so, we can skip the chain traversal
433    if map.len() == next_chain.len() {
434        let start = offset.0;
435        let end = (start + limit).min(hash_values.len());
436        for (i, &hash) in hash_values[start..end].iter().enumerate() {
437            if let Some((_, idx)) = map.find(hash, |(h, _)| hash == *h) {
438                input_indices.push(start as u32 + i as u32);
439                match_indices.push((*idx - one).into());
440            }
441        }
442        return if end == hash_values.len() {
443            None
444        } else {
445            Some((end, None))
446        };
447    }
448
449    let mut remaining_output = limit;
450
451    // Calculate initial `hash_values` index before iterating
452    let to_skip = match offset {
453        // None `initial_next_idx` indicates that `initial_idx` processing hasn't been started
454        (idx, None) => idx,
455        // Zero `initial_next_idx` indicates that `initial_idx` has been processed during
456        // previous iteration, and it should be skipped
457        (idx, Some(0)) => idx + 1,
458        // Otherwise, process remaining `initial_idx` matches by traversing `next_chain`,
459        // to start with the next index
460        (idx, Some(next_idx)) => {
461            let next_idx: T = T::usize_as(next_idx as usize);
462            let is_last = idx == hash_values.len() - 1;
463            if let Some(next_offset) = traverse_chain(
464                next_chain,
465                idx,
466                next_idx,
467                &mut remaining_output,
468                input_indices,
469                match_indices,
470                is_last,
471            ) {
472                return Some(next_offset);
473            }
474            idx + 1
475        }
476    };
477
478    let hash_values_len = hash_values.len();
479    for (i, &hash) in hash_values[to_skip..].iter().enumerate() {
480        let row_idx = to_skip + i;
481        if let Some((_, idx)) = map.find(hash, |(h, _)| hash == *h) {
482            let idx: T = *idx;
483            let is_last = row_idx == hash_values_len - 1;
484            if let Some(next_offset) = traverse_chain(
485                next_chain,
486                row_idx,
487                idx,
488                &mut remaining_output,
489                input_indices,
490                match_indices,
491                is_last,
492            ) {
493                return Some(next_offset);
494            }
495        }
496    }
497    None
498}