1use std::{any::Any, fmt::Display, hash::Hash, sync::Arc};
21
22use ahash::RandomState;
23use arrow::{
24 array::{BooleanArray, UInt64Array},
25 buffer::MutableBuffer,
26 datatypes::{DataType, Schema},
27 util::bit_util,
28};
29use datafusion_common::{Result, internal_datafusion_err, internal_err};
30use datafusion_expr::ColumnarValue;
31use datafusion_physical_expr_common::physical_expr::{
32 DynHash, PhysicalExpr, PhysicalExprRef,
33};
34
35use crate::{hash_utils::create_hashes, joins::utils::JoinHashMapType};
36
37#[derive(Clone, Debug)]
42pub struct SeededRandomState {
43 random_state: RandomState,
44 seeds: (u64, u64, u64, u64),
45}
46
47impl SeededRandomState {
48 pub const fn with_seeds(k0: u64, k1: u64, k2: u64, k3: u64) -> Self {
50 Self {
51 random_state: RandomState::with_seeds(k0, k1, k2, k3),
52 seeds: (k0, k1, k2, k3),
53 }
54 }
55
56 pub fn random_state(&self) -> &RandomState {
58 &self.random_state
59 }
60
61 pub fn seeds(&self) -> (u64, u64, u64, u64) {
63 self.seeds
64 }
65}
66
67pub struct HashExpr {
76 on_columns: Vec<PhysicalExprRef>,
78 random_state: SeededRandomState,
80 description: String,
82}
83
84impl HashExpr {
85 pub fn new(
92 on_columns: Vec<PhysicalExprRef>,
93 random_state: SeededRandomState,
94 description: String,
95 ) -> Self {
96 Self {
97 on_columns,
98 random_state,
99 description,
100 }
101 }
102
103 pub fn on_columns(&self) -> &[PhysicalExprRef] {
105 &self.on_columns
106 }
107
108 pub fn seeds(&self) -> (u64, u64, u64, u64) {
110 self.random_state.seeds()
111 }
112
113 pub fn description(&self) -> &str {
115 &self.description
116 }
117}
118
119impl std::fmt::Debug for HashExpr {
120 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121 let cols = self
122 .on_columns
123 .iter()
124 .map(|e| e.to_string())
125 .collect::<Vec<_>>()
126 .join(", ");
127 let (s1, s2, s3, s4) = self.seeds();
128 write!(f, "{}({cols}, [{s1},{s2},{s3},{s4}])", self.description)
129 }
130}
131
132impl Hash for HashExpr {
133 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
134 self.on_columns.dyn_hash(state);
135 self.description.hash(state);
136 self.seeds().hash(state);
137 }
138}
139
140impl PartialEq for HashExpr {
141 fn eq(&self, other: &Self) -> bool {
142 self.on_columns == other.on_columns
143 && self.description == other.description
144 && self.seeds() == other.seeds()
145 }
146}
147
148impl Eq for HashExpr {}
149
150impl Display for HashExpr {
151 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152 write!(f, "{}", self.description)
153 }
154}
155
156impl PhysicalExpr for HashExpr {
157 fn as_any(&self) -> &dyn Any {
158 self
159 }
160
161 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
162 self.on_columns.iter().collect()
163 }
164
165 fn with_new_children(
166 self: Arc<Self>,
167 children: Vec<Arc<dyn PhysicalExpr>>,
168 ) -> Result<Arc<dyn PhysicalExpr>> {
169 Ok(Arc::new(HashExpr::new(
170 children,
171 self.random_state.clone(),
172 self.description.clone(),
173 )))
174 }
175
176 fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
177 Ok(DataType::UInt64)
178 }
179
180 fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
181 Ok(false)
182 }
183
184 fn evaluate(
185 &self,
186 batch: &arrow::record_batch::RecordBatch,
187 ) -> Result<ColumnarValue> {
188 let num_rows = batch.num_rows();
189
190 let keys_values = self
192 .on_columns
193 .iter()
194 .map(|c| c.evaluate(batch)?.into_array(num_rows))
195 .collect::<Result<Vec<_>>>()?;
196
197 let mut hashes_buffer = vec![0; num_rows];
199 create_hashes(
200 &keys_values,
201 self.random_state.random_state(),
202 &mut hashes_buffer,
203 )?;
204
205 Ok(ColumnarValue::Array(Arc::new(UInt64Array::from(
206 hashes_buffer,
207 ))))
208 }
209
210 fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211 write!(f, "{}", self.description)
212 }
213}
214
215pub struct HashTableLookupExpr {
220 hash_expr: PhysicalExprRef,
222 hash_map: Arc<dyn JoinHashMapType>,
224 description: String,
226}
227
228impl HashTableLookupExpr {
229 pub fn new(
240 hash_expr: PhysicalExprRef,
241 hash_map: Arc<dyn JoinHashMapType>,
242 description: String,
243 ) -> Self {
244 Self {
245 hash_expr,
246 hash_map,
247 description,
248 }
249 }
250}
251
252impl std::fmt::Debug for HashTableLookupExpr {
253 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254 write!(f, "{}({:?})", self.description, self.hash_expr)
255 }
256}
257
258impl Hash for HashTableLookupExpr {
259 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
260 self.hash_expr.dyn_hash(state);
261 self.description.hash(state);
262 Arc::as_ptr(&self.hash_map).hash(state);
270 }
271}
272
273impl PartialEq for HashTableLookupExpr {
274 fn eq(&self, other: &Self) -> bool {
275 self.hash_expr.as_ref() == other.hash_expr.as_ref()
283 && self.description == other.description
284 && Arc::ptr_eq(&self.hash_map, &other.hash_map)
285 }
286}
287
288impl Eq for HashTableLookupExpr {}
289
290impl Display for HashTableLookupExpr {
291 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
292 write!(f, "{}", self.description)
293 }
294}
295
296impl PhysicalExpr for HashTableLookupExpr {
297 fn as_any(&self) -> &dyn Any {
298 self
299 }
300
301 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
302 vec![&self.hash_expr]
303 }
304
305 fn with_new_children(
306 self: Arc<Self>,
307 children: Vec<Arc<dyn PhysicalExpr>>,
308 ) -> Result<Arc<dyn PhysicalExpr>> {
309 if children.len() != 1 {
310 return internal_err!(
311 "HashTableLookupExpr expects exactly 1 child, got {}",
312 children.len()
313 );
314 }
315 Ok(Arc::new(HashTableLookupExpr::new(
316 Arc::clone(&children[0]),
317 Arc::clone(&self.hash_map),
318 self.description.clone(),
319 )))
320 }
321
322 fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
323 Ok(DataType::Boolean)
324 }
325
326 fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
327 Ok(false)
328 }
329
330 fn evaluate(
331 &self,
332 batch: &arrow::record_batch::RecordBatch,
333 ) -> Result<ColumnarValue> {
334 let num_rows = batch.num_rows();
335
336 let hash_array = self.hash_expr.evaluate(batch)?.into_array(num_rows)?;
338 let hash_array = hash_array.as_any().downcast_ref::<UInt64Array>().ok_or(
339 internal_datafusion_err!(
340 "HashTableLookupExpr expects UInt64Array from hash expression"
341 ),
342 )?;
343
344 let mut buf = MutableBuffer::from_len_zeroed(bit_util::ceil(num_rows, 8));
346 for (idx, hash_value) in hash_array.values().iter().enumerate() {
347 let (matched_indices, _) = self
349 .hash_map
350 .get_matched_indices(Box::new(std::iter::once((idx, hash_value))), None);
351
352 if !matched_indices.is_empty() {
353 bit_util::set_bit(buf.as_slice_mut(), idx);
354 }
355 }
356
357 Ok(ColumnarValue::Array(Arc::new(
358 BooleanArray::new_from_packed(buf, 0, num_rows),
359 )))
360 }
361
362 fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
363 write!(f, "{}", self.description)
364 }
365}
366
367#[cfg(test)]
368mod tests {
369 use super::*;
370 use crate::joins::join_hash_map::JoinHashMapU32;
371 use datafusion_physical_expr::expressions::Column;
372 use std::collections::hash_map::DefaultHasher;
373 use std::hash::Hasher;
374
375 fn compute_hash<T: Hash>(value: &T) -> u64 {
376 let mut hasher = DefaultHasher::new();
377 value.hash(&mut hasher);
378 hasher.finish()
379 }
380
381 #[test]
382 fn test_hash_expr_eq_same() {
383 let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
384 let col_b: PhysicalExprRef = Arc::new(Column::new("b", 1));
385
386 let expr1 = HashExpr::new(
387 vec![Arc::clone(&col_a), Arc::clone(&col_b)],
388 SeededRandomState::with_seeds(1, 2, 3, 4),
389 "test_hash".to_string(),
390 );
391
392 let expr2 = HashExpr::new(
393 vec![Arc::clone(&col_a), Arc::clone(&col_b)],
394 SeededRandomState::with_seeds(1, 2, 3, 4),
395 "test_hash".to_string(),
396 );
397
398 assert_eq!(expr1, expr2);
399 }
400
401 #[test]
402 fn test_hash_expr_eq_different_columns() {
403 let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
404 let col_b: PhysicalExprRef = Arc::new(Column::new("b", 1));
405 let col_c: PhysicalExprRef = Arc::new(Column::new("c", 2));
406
407 let expr1 = HashExpr::new(
408 vec![Arc::clone(&col_a), Arc::clone(&col_b)],
409 SeededRandomState::with_seeds(1, 2, 3, 4),
410 "test_hash".to_string(),
411 );
412
413 let expr2 = HashExpr::new(
414 vec![Arc::clone(&col_a), Arc::clone(&col_c)],
415 SeededRandomState::with_seeds(1, 2, 3, 4),
416 "test_hash".to_string(),
417 );
418
419 assert_ne!(expr1, expr2);
420 }
421
422 #[test]
423 fn test_hash_expr_eq_different_description() {
424 let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
425
426 let expr1 = HashExpr::new(
427 vec![Arc::clone(&col_a)],
428 SeededRandomState::with_seeds(1, 2, 3, 4),
429 "hash_one".to_string(),
430 );
431
432 let expr2 = HashExpr::new(
433 vec![Arc::clone(&col_a)],
434 SeededRandomState::with_seeds(1, 2, 3, 4),
435 "hash_two".to_string(),
436 );
437
438 assert_ne!(expr1, expr2);
439 }
440
441 #[test]
442 fn test_hash_expr_eq_different_seeds() {
443 let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
444
445 let expr1 = HashExpr::new(
446 vec![Arc::clone(&col_a)],
447 SeededRandomState::with_seeds(1, 2, 3, 4),
448 "test_hash".to_string(),
449 );
450
451 let expr2 = HashExpr::new(
452 vec![Arc::clone(&col_a)],
453 SeededRandomState::with_seeds(5, 6, 7, 8),
454 "test_hash".to_string(),
455 );
456
457 assert_ne!(expr1, expr2);
458 }
459
460 #[test]
461 fn test_hash_expr_hash_consistency() {
462 let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
463 let col_b: PhysicalExprRef = Arc::new(Column::new("b", 1));
464
465 let expr1 = HashExpr::new(
466 vec![Arc::clone(&col_a), Arc::clone(&col_b)],
467 SeededRandomState::with_seeds(1, 2, 3, 4),
468 "test_hash".to_string(),
469 );
470
471 let expr2 = HashExpr::new(
472 vec![Arc::clone(&col_a), Arc::clone(&col_b)],
473 SeededRandomState::with_seeds(1, 2, 3, 4),
474 "test_hash".to_string(),
475 );
476
477 assert_eq!(expr1, expr2);
479 assert_eq!(compute_hash(&expr1), compute_hash(&expr2));
480 }
481
482 #[test]
483 fn test_hash_table_lookup_expr_eq_same() {
484 let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
485 let hash_expr: PhysicalExprRef = Arc::new(HashExpr::new(
486 vec![Arc::clone(&col_a)],
487 SeededRandomState::with_seeds(1, 2, 3, 4),
488 "inner_hash".to_string(),
489 ));
490 let hash_map: Arc<dyn JoinHashMapType> =
491 Arc::new(JoinHashMapU32::with_capacity(10));
492
493 let expr1 = HashTableLookupExpr::new(
494 Arc::clone(&hash_expr),
495 Arc::clone(&hash_map),
496 "lookup".to_string(),
497 );
498
499 let expr2 = HashTableLookupExpr::new(
500 Arc::clone(&hash_expr),
501 Arc::clone(&hash_map),
502 "lookup".to_string(),
503 );
504
505 assert_eq!(expr1, expr2);
506 }
507
508 #[test]
509 fn test_hash_table_lookup_expr_eq_different_hash_expr() {
510 let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
511 let col_b: PhysicalExprRef = Arc::new(Column::new("b", 1));
512
513 let hash_expr1: PhysicalExprRef = Arc::new(HashExpr::new(
514 vec![Arc::clone(&col_a)],
515 SeededRandomState::with_seeds(1, 2, 3, 4),
516 "inner_hash".to_string(),
517 ));
518
519 let hash_expr2: PhysicalExprRef = Arc::new(HashExpr::new(
520 vec![Arc::clone(&col_b)],
521 SeededRandomState::with_seeds(1, 2, 3, 4),
522 "inner_hash".to_string(),
523 ));
524
525 let hash_map: Arc<dyn JoinHashMapType> =
526 Arc::new(JoinHashMapU32::with_capacity(10));
527
528 let expr1 = HashTableLookupExpr::new(
529 Arc::clone(&hash_expr1),
530 Arc::clone(&hash_map),
531 "lookup".to_string(),
532 );
533
534 let expr2 = HashTableLookupExpr::new(
535 Arc::clone(&hash_expr2),
536 Arc::clone(&hash_map),
537 "lookup".to_string(),
538 );
539
540 assert_ne!(expr1, expr2);
541 }
542
543 #[test]
544 fn test_hash_table_lookup_expr_eq_different_description() {
545 let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
546 let hash_expr: PhysicalExprRef = Arc::new(HashExpr::new(
547 vec![Arc::clone(&col_a)],
548 SeededRandomState::with_seeds(1, 2, 3, 4),
549 "inner_hash".to_string(),
550 ));
551 let hash_map: Arc<dyn JoinHashMapType> =
552 Arc::new(JoinHashMapU32::with_capacity(10));
553
554 let expr1 = HashTableLookupExpr::new(
555 Arc::clone(&hash_expr),
556 Arc::clone(&hash_map),
557 "lookup_one".to_string(),
558 );
559
560 let expr2 = HashTableLookupExpr::new(
561 Arc::clone(&hash_expr),
562 Arc::clone(&hash_map),
563 "lookup_two".to_string(),
564 );
565
566 assert_ne!(expr1, expr2);
567 }
568
569 #[test]
570 fn test_hash_table_lookup_expr_eq_different_hash_map() {
571 let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
572 let hash_expr: PhysicalExprRef = Arc::new(HashExpr::new(
573 vec![Arc::clone(&col_a)],
574 SeededRandomState::with_seeds(1, 2, 3, 4),
575 "inner_hash".to_string(),
576 ));
577
578 let hash_map1: Arc<dyn JoinHashMapType> =
580 Arc::new(JoinHashMapU32::with_capacity(10));
581 let hash_map2: Arc<dyn JoinHashMapType> =
582 Arc::new(JoinHashMapU32::with_capacity(10));
583
584 let expr1 = HashTableLookupExpr::new(
585 Arc::clone(&hash_expr),
586 hash_map1,
587 "lookup".to_string(),
588 );
589
590 let expr2 = HashTableLookupExpr::new(
591 Arc::clone(&hash_expr),
592 hash_map2,
593 "lookup".to_string(),
594 );
595
596 assert_ne!(expr1, expr2);
598 }
599
600 #[test]
601 fn test_hash_table_lookup_expr_hash_consistency() {
602 let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
603 let hash_expr: PhysicalExprRef = Arc::new(HashExpr::new(
604 vec![Arc::clone(&col_a)],
605 SeededRandomState::with_seeds(1, 2, 3, 4),
606 "inner_hash".to_string(),
607 ));
608 let hash_map: Arc<dyn JoinHashMapType> =
609 Arc::new(JoinHashMapU32::with_capacity(10));
610
611 let expr1 = HashTableLookupExpr::new(
612 Arc::clone(&hash_expr),
613 Arc::clone(&hash_map),
614 "lookup".to_string(),
615 );
616
617 let expr2 = HashTableLookupExpr::new(
618 Arc::clone(&hash_expr),
619 Arc::clone(&hash_map),
620 "lookup".to_string(),
621 );
622
623 assert_eq!(expr1, expr2);
625 assert_eq!(compute_hash(&expr1), compute_hash(&expr2));
626 }
627}