Skip to main content

datafusion_physical_plan/joins/hash_join/
partitioned_hash_eval.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Hash computation and hash table lookup expressions for dynamic filtering
19
20use std::{any::Any, fmt::Display, hash::Hash, sync::Arc};
21
22use ahash::RandomState;
23use arrow::{
24    array::{BooleanArray, UInt64Array},
25    buffer::MutableBuffer,
26    datatypes::{DataType, Schema},
27    util::bit_util,
28};
29use datafusion_common::{Result, internal_datafusion_err, internal_err};
30use datafusion_expr::ColumnarValue;
31use datafusion_physical_expr_common::physical_expr::{
32    DynHash, PhysicalExpr, PhysicalExprRef,
33};
34
35use crate::{hash_utils::create_hashes, joins::utils::JoinHashMapType};
36
37/// RandomState wrapper that preserves the seeds used to create it.
38///
39/// This is needed because ahash's `RandomState` doesn't expose its seeds after creation,
40/// but we need them for serialization (e.g., protobuf serde).
41#[derive(Clone, Debug)]
42pub struct SeededRandomState {
43    random_state: RandomState,
44    seeds: (u64, u64, u64, u64),
45}
46
47impl SeededRandomState {
48    /// Create a new SeededRandomState with the given seeds.
49    pub const fn with_seeds(k0: u64, k1: u64, k2: u64, k3: u64) -> Self {
50        Self {
51            random_state: RandomState::with_seeds(k0, k1, k2, k3),
52            seeds: (k0, k1, k2, k3),
53        }
54    }
55
56    /// Get the inner RandomState.
57    pub fn random_state(&self) -> &RandomState {
58        &self.random_state
59    }
60
61    /// Get the seeds used to create this RandomState.
62    pub fn seeds(&self) -> (u64, u64, u64, u64) {
63        self.seeds
64    }
65}
66
67/// Physical expression that computes hash values for a set of columns
68///
69/// This expression computes the hash of join key columns using a specific RandomState.
70/// It returns a UInt64Array containing the hash values.
71///
72/// This is used for:
73/// - Computing routing hashes (with RepartitionExec's 0,0,0,0 seeds)
74/// - Computing lookup hashes (with HashJoin's 'J','O','I','N' seeds)
75pub struct HashExpr {
76    /// Columns to hash
77    on_columns: Vec<PhysicalExprRef>,
78    /// Random state for hashing (with seeds preserved for serialization)
79    random_state: SeededRandomState,
80    /// Description for display
81    description: String,
82}
83
84impl HashExpr {
85    /// Create a new HashExpr
86    ///
87    /// # Arguments
88    /// * `on_columns` - Columns to hash
89    /// * `random_state` - SeededRandomState for hashing
90    /// * `description` - Description for debugging (e.g., "hash_repartition", "hash_join")
91    pub fn new(
92        on_columns: Vec<PhysicalExprRef>,
93        random_state: SeededRandomState,
94        description: String,
95    ) -> Self {
96        Self {
97            on_columns,
98            random_state,
99            description,
100        }
101    }
102
103    /// Get the columns being hashed.
104    pub fn on_columns(&self) -> &[PhysicalExprRef] {
105        &self.on_columns
106    }
107
108    /// Get the seeds used for hashing.
109    pub fn seeds(&self) -> (u64, u64, u64, u64) {
110        self.random_state.seeds()
111    }
112
113    /// Get the description.
114    pub fn description(&self) -> &str {
115        &self.description
116    }
117}
118
119impl std::fmt::Debug for HashExpr {
120    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121        let cols = self
122            .on_columns
123            .iter()
124            .map(|e| e.to_string())
125            .collect::<Vec<_>>()
126            .join(", ");
127        let (s1, s2, s3, s4) = self.seeds();
128        write!(f, "{}({cols}, [{s1},{s2},{s3},{s4}])", self.description)
129    }
130}
131
132impl Hash for HashExpr {
133    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
134        self.on_columns.dyn_hash(state);
135        self.description.hash(state);
136        self.seeds().hash(state);
137    }
138}
139
140impl PartialEq for HashExpr {
141    fn eq(&self, other: &Self) -> bool {
142        self.on_columns == other.on_columns
143            && self.description == other.description
144            && self.seeds() == other.seeds()
145    }
146}
147
148impl Eq for HashExpr {}
149
150impl Display for HashExpr {
151    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152        write!(f, "{}", self.description)
153    }
154}
155
156impl PhysicalExpr for HashExpr {
157    fn as_any(&self) -> &dyn Any {
158        self
159    }
160
161    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
162        self.on_columns.iter().collect()
163    }
164
165    fn with_new_children(
166        self: Arc<Self>,
167        children: Vec<Arc<dyn PhysicalExpr>>,
168    ) -> Result<Arc<dyn PhysicalExpr>> {
169        Ok(Arc::new(HashExpr::new(
170            children,
171            self.random_state.clone(),
172            self.description.clone(),
173        )))
174    }
175
176    fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
177        Ok(DataType::UInt64)
178    }
179
180    fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
181        Ok(false)
182    }
183
184    fn evaluate(
185        &self,
186        batch: &arrow::record_batch::RecordBatch,
187    ) -> Result<ColumnarValue> {
188        let num_rows = batch.num_rows();
189
190        // Evaluate columns
191        let keys_values = self
192            .on_columns
193            .iter()
194            .map(|c| c.evaluate(batch)?.into_array(num_rows))
195            .collect::<Result<Vec<_>>>()?;
196
197        // Compute hashes
198        let mut hashes_buffer = vec![0; num_rows];
199        create_hashes(
200            &keys_values,
201            self.random_state.random_state(),
202            &mut hashes_buffer,
203        )?;
204
205        Ok(ColumnarValue::Array(Arc::new(UInt64Array::from(
206            hashes_buffer,
207        ))))
208    }
209
210    fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211        write!(f, "{}", self.description)
212    }
213}
214
215/// Physical expression that checks if hash values exist in a hash table
216///
217/// Takes a UInt64Array of hash values and checks membership in a hash table.
218/// Returns a BooleanArray indicating which hashes exist.
219pub struct HashTableLookupExpr {
220    /// Expression that computes hash values (should be a HashExpr)
221    hash_expr: PhysicalExprRef,
222    /// Hash table to check against
223    hash_map: Arc<dyn JoinHashMapType>,
224    /// Description for display
225    description: String,
226}
227
228impl HashTableLookupExpr {
229    /// Create a new HashTableLookupExpr
230    ///
231    /// # Arguments
232    /// * `hash_expr` - Expression that computes hash values
233    /// * `hash_map` - Hash table to check membership
234    /// * `description` - Description for debugging
235    ///
236    /// # Note
237    /// This is public for internal testing purposes only and is not
238    /// guaranteed to be stable across versions.
239    pub fn new(
240        hash_expr: PhysicalExprRef,
241        hash_map: Arc<dyn JoinHashMapType>,
242        description: String,
243    ) -> Self {
244        Self {
245            hash_expr,
246            hash_map,
247            description,
248        }
249    }
250}
251
252impl std::fmt::Debug for HashTableLookupExpr {
253    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254        write!(f, "{}({:?})", self.description, self.hash_expr)
255    }
256}
257
258impl Hash for HashTableLookupExpr {
259    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
260        self.hash_expr.dyn_hash(state);
261        self.description.hash(state);
262        // Note that we compare hash_map by pointer equality.
263        // Actually comparing the contents of the hash maps would be expensive.
264        // The way these hash maps are used in actuality is that HashJoinExec creates
265        // one per partition per query execution, thus it is never possible for two different
266        // hash maps to have the same content in practice.
267        // Theoretically this is a public API and users could create identical hash maps,
268        // but that seems unlikely and not worth paying the cost of deep comparison all the time.
269        Arc::as_ptr(&self.hash_map).hash(state);
270    }
271}
272
273impl PartialEq for HashTableLookupExpr {
274    fn eq(&self, other: &Self) -> bool {
275        // Note that we compare hash_map by pointer equality.
276        // Actually comparing the contents of the hash maps would be expensive.
277        // The way these hash maps are used in actuality is that HashJoinExec creates
278        // one per partition per query execution, thus it is never possible for two different
279        // hash maps to have the same content in practice.
280        // Theoretically this is a public API and users could create identical hash maps,
281        // but that seems unlikely and not worth paying the cost of deep comparison all the time.
282        self.hash_expr.as_ref() == other.hash_expr.as_ref()
283            && self.description == other.description
284            && Arc::ptr_eq(&self.hash_map, &other.hash_map)
285    }
286}
287
288impl Eq for HashTableLookupExpr {}
289
290impl Display for HashTableLookupExpr {
291    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
292        write!(f, "{}", self.description)
293    }
294}
295
296impl PhysicalExpr for HashTableLookupExpr {
297    fn as_any(&self) -> &dyn Any {
298        self
299    }
300
301    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
302        vec![&self.hash_expr]
303    }
304
305    fn with_new_children(
306        self: Arc<Self>,
307        children: Vec<Arc<dyn PhysicalExpr>>,
308    ) -> Result<Arc<dyn PhysicalExpr>> {
309        if children.len() != 1 {
310            return internal_err!(
311                "HashTableLookupExpr expects exactly 1 child, got {}",
312                children.len()
313            );
314        }
315        Ok(Arc::new(HashTableLookupExpr::new(
316            Arc::clone(&children[0]),
317            Arc::clone(&self.hash_map),
318            self.description.clone(),
319        )))
320    }
321
322    fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
323        Ok(DataType::Boolean)
324    }
325
326    fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
327        Ok(false)
328    }
329
330    fn evaluate(
331        &self,
332        batch: &arrow::record_batch::RecordBatch,
333    ) -> Result<ColumnarValue> {
334        let num_rows = batch.num_rows();
335
336        // Evaluate hash expression to get hash values
337        let hash_array = self.hash_expr.evaluate(batch)?.into_array(num_rows)?;
338        let hash_array = hash_array.as_any().downcast_ref::<UInt64Array>().ok_or(
339            internal_datafusion_err!(
340                "HashTableLookupExpr expects UInt64Array from hash expression"
341            ),
342        )?;
343
344        // Check each hash against the hash table
345        let mut buf = MutableBuffer::from_len_zeroed(bit_util::ceil(num_rows, 8));
346        for (idx, hash_value) in hash_array.values().iter().enumerate() {
347            // Use get_matched_indices to check - if it returns any indices, the hash exists
348            let (matched_indices, _) = self
349                .hash_map
350                .get_matched_indices(Box::new(std::iter::once((idx, hash_value))), None);
351
352            if !matched_indices.is_empty() {
353                bit_util::set_bit(buf.as_slice_mut(), idx);
354            }
355        }
356
357        Ok(ColumnarValue::Array(Arc::new(
358            BooleanArray::new_from_packed(buf, 0, num_rows),
359        )))
360    }
361
362    fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
363        write!(f, "{}", self.description)
364    }
365}
366
367#[cfg(test)]
368mod tests {
369    use super::*;
370    use crate::joins::join_hash_map::JoinHashMapU32;
371    use datafusion_physical_expr::expressions::Column;
372    use std::collections::hash_map::DefaultHasher;
373    use std::hash::Hasher;
374
375    fn compute_hash<T: Hash>(value: &T) -> u64 {
376        let mut hasher = DefaultHasher::new();
377        value.hash(&mut hasher);
378        hasher.finish()
379    }
380
381    #[test]
382    fn test_hash_expr_eq_same() {
383        let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
384        let col_b: PhysicalExprRef = Arc::new(Column::new("b", 1));
385
386        let expr1 = HashExpr::new(
387            vec![Arc::clone(&col_a), Arc::clone(&col_b)],
388            SeededRandomState::with_seeds(1, 2, 3, 4),
389            "test_hash".to_string(),
390        );
391
392        let expr2 = HashExpr::new(
393            vec![Arc::clone(&col_a), Arc::clone(&col_b)],
394            SeededRandomState::with_seeds(1, 2, 3, 4),
395            "test_hash".to_string(),
396        );
397
398        assert_eq!(expr1, expr2);
399    }
400
401    #[test]
402    fn test_hash_expr_eq_different_columns() {
403        let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
404        let col_b: PhysicalExprRef = Arc::new(Column::new("b", 1));
405        let col_c: PhysicalExprRef = Arc::new(Column::new("c", 2));
406
407        let expr1 = HashExpr::new(
408            vec![Arc::clone(&col_a), Arc::clone(&col_b)],
409            SeededRandomState::with_seeds(1, 2, 3, 4),
410            "test_hash".to_string(),
411        );
412
413        let expr2 = HashExpr::new(
414            vec![Arc::clone(&col_a), Arc::clone(&col_c)],
415            SeededRandomState::with_seeds(1, 2, 3, 4),
416            "test_hash".to_string(),
417        );
418
419        assert_ne!(expr1, expr2);
420    }
421
422    #[test]
423    fn test_hash_expr_eq_different_description() {
424        let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
425
426        let expr1 = HashExpr::new(
427            vec![Arc::clone(&col_a)],
428            SeededRandomState::with_seeds(1, 2, 3, 4),
429            "hash_one".to_string(),
430        );
431
432        let expr2 = HashExpr::new(
433            vec![Arc::clone(&col_a)],
434            SeededRandomState::with_seeds(1, 2, 3, 4),
435            "hash_two".to_string(),
436        );
437
438        assert_ne!(expr1, expr2);
439    }
440
441    #[test]
442    fn test_hash_expr_eq_different_seeds() {
443        let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
444
445        let expr1 = HashExpr::new(
446            vec![Arc::clone(&col_a)],
447            SeededRandomState::with_seeds(1, 2, 3, 4),
448            "test_hash".to_string(),
449        );
450
451        let expr2 = HashExpr::new(
452            vec![Arc::clone(&col_a)],
453            SeededRandomState::with_seeds(5, 6, 7, 8),
454            "test_hash".to_string(),
455        );
456
457        assert_ne!(expr1, expr2);
458    }
459
460    #[test]
461    fn test_hash_expr_hash_consistency() {
462        let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
463        let col_b: PhysicalExprRef = Arc::new(Column::new("b", 1));
464
465        let expr1 = HashExpr::new(
466            vec![Arc::clone(&col_a), Arc::clone(&col_b)],
467            SeededRandomState::with_seeds(1, 2, 3, 4),
468            "test_hash".to_string(),
469        );
470
471        let expr2 = HashExpr::new(
472            vec![Arc::clone(&col_a), Arc::clone(&col_b)],
473            SeededRandomState::with_seeds(1, 2, 3, 4),
474            "test_hash".to_string(),
475        );
476
477        // Equal expressions should have equal hashes
478        assert_eq!(expr1, expr2);
479        assert_eq!(compute_hash(&expr1), compute_hash(&expr2));
480    }
481
482    #[test]
483    fn test_hash_table_lookup_expr_eq_same() {
484        let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
485        let hash_expr: PhysicalExprRef = Arc::new(HashExpr::new(
486            vec![Arc::clone(&col_a)],
487            SeededRandomState::with_seeds(1, 2, 3, 4),
488            "inner_hash".to_string(),
489        ));
490        let hash_map: Arc<dyn JoinHashMapType> =
491            Arc::new(JoinHashMapU32::with_capacity(10));
492
493        let expr1 = HashTableLookupExpr::new(
494            Arc::clone(&hash_expr),
495            Arc::clone(&hash_map),
496            "lookup".to_string(),
497        );
498
499        let expr2 = HashTableLookupExpr::new(
500            Arc::clone(&hash_expr),
501            Arc::clone(&hash_map),
502            "lookup".to_string(),
503        );
504
505        assert_eq!(expr1, expr2);
506    }
507
508    #[test]
509    fn test_hash_table_lookup_expr_eq_different_hash_expr() {
510        let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
511        let col_b: PhysicalExprRef = Arc::new(Column::new("b", 1));
512
513        let hash_expr1: PhysicalExprRef = Arc::new(HashExpr::new(
514            vec![Arc::clone(&col_a)],
515            SeededRandomState::with_seeds(1, 2, 3, 4),
516            "inner_hash".to_string(),
517        ));
518
519        let hash_expr2: PhysicalExprRef = Arc::new(HashExpr::new(
520            vec![Arc::clone(&col_b)],
521            SeededRandomState::with_seeds(1, 2, 3, 4),
522            "inner_hash".to_string(),
523        ));
524
525        let hash_map: Arc<dyn JoinHashMapType> =
526            Arc::new(JoinHashMapU32::with_capacity(10));
527
528        let expr1 = HashTableLookupExpr::new(
529            Arc::clone(&hash_expr1),
530            Arc::clone(&hash_map),
531            "lookup".to_string(),
532        );
533
534        let expr2 = HashTableLookupExpr::new(
535            Arc::clone(&hash_expr2),
536            Arc::clone(&hash_map),
537            "lookup".to_string(),
538        );
539
540        assert_ne!(expr1, expr2);
541    }
542
543    #[test]
544    fn test_hash_table_lookup_expr_eq_different_description() {
545        let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
546        let hash_expr: PhysicalExprRef = Arc::new(HashExpr::new(
547            vec![Arc::clone(&col_a)],
548            SeededRandomState::with_seeds(1, 2, 3, 4),
549            "inner_hash".to_string(),
550        ));
551        let hash_map: Arc<dyn JoinHashMapType> =
552            Arc::new(JoinHashMapU32::with_capacity(10));
553
554        let expr1 = HashTableLookupExpr::new(
555            Arc::clone(&hash_expr),
556            Arc::clone(&hash_map),
557            "lookup_one".to_string(),
558        );
559
560        let expr2 = HashTableLookupExpr::new(
561            Arc::clone(&hash_expr),
562            Arc::clone(&hash_map),
563            "lookup_two".to_string(),
564        );
565
566        assert_ne!(expr1, expr2);
567    }
568
569    #[test]
570    fn test_hash_table_lookup_expr_eq_different_hash_map() {
571        let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
572        let hash_expr: PhysicalExprRef = Arc::new(HashExpr::new(
573            vec![Arc::clone(&col_a)],
574            SeededRandomState::with_seeds(1, 2, 3, 4),
575            "inner_hash".to_string(),
576        ));
577
578        // Two different Arc pointers (even with same content) should not be equal
579        let hash_map1: Arc<dyn JoinHashMapType> =
580            Arc::new(JoinHashMapU32::with_capacity(10));
581        let hash_map2: Arc<dyn JoinHashMapType> =
582            Arc::new(JoinHashMapU32::with_capacity(10));
583
584        let expr1 = HashTableLookupExpr::new(
585            Arc::clone(&hash_expr),
586            hash_map1,
587            "lookup".to_string(),
588        );
589
590        let expr2 = HashTableLookupExpr::new(
591            Arc::clone(&hash_expr),
592            hash_map2,
593            "lookup".to_string(),
594        );
595
596        // Different Arc pointers means not equal (uses Arc::ptr_eq)
597        assert_ne!(expr1, expr2);
598    }
599
600    #[test]
601    fn test_hash_table_lookup_expr_hash_consistency() {
602        let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
603        let hash_expr: PhysicalExprRef = Arc::new(HashExpr::new(
604            vec![Arc::clone(&col_a)],
605            SeededRandomState::with_seeds(1, 2, 3, 4),
606            "inner_hash".to_string(),
607        ));
608        let hash_map: Arc<dyn JoinHashMapType> =
609            Arc::new(JoinHashMapU32::with_capacity(10));
610
611        let expr1 = HashTableLookupExpr::new(
612            Arc::clone(&hash_expr),
613            Arc::clone(&hash_map),
614            "lookup".to_string(),
615        );
616
617        let expr2 = HashTableLookupExpr::new(
618            Arc::clone(&hash_expr),
619            Arc::clone(&hash_map),
620            "lookup".to_string(),
621        );
622
623        // Equal expressions should have equal hashes
624        assert_eq!(expr1, expr2);
625        assert_eq!(compute_hash(&expr1), compute_hash(&expr2));
626    }
627}