use std::{any::Any, fmt::Display, hash::Hash, sync::Arc};
use ahash::RandomState;
use arrow::{
array::{ArrayRef, UInt64Array},
datatypes::{DataType, Schema},
record_batch::RecordBatch,
};
use datafusion_common::Result;
use datafusion_common::hash_utils::{create_hashes, with_hashes};
use datafusion_expr::ColumnarValue;
use datafusion_physical_expr_common::physical_expr::{
DynHash, PhysicalExpr, PhysicalExprRef,
};
use crate::joins::Map;
#[derive(Clone, Debug)]
pub struct SeededRandomState {
random_state: RandomState,
seeds: (u64, u64, u64, u64),
}
impl SeededRandomState {
pub const fn with_seeds(k0: u64, k1: u64, k2: u64, k3: u64) -> Self {
Self {
random_state: RandomState::with_seeds(k0, k1, k2, k3),
seeds: (k0, k1, k2, k3),
}
}
pub fn random_state(&self) -> &RandomState {
&self.random_state
}
pub fn seeds(&self) -> (u64, u64, u64, u64) {
self.seeds
}
}
pub struct HashExpr {
on_columns: Vec<PhysicalExprRef>,
random_state: SeededRandomState,
description: String,
}
impl HashExpr {
pub fn new(
on_columns: Vec<PhysicalExprRef>,
random_state: SeededRandomState,
description: String,
) -> Self {
Self {
on_columns,
random_state,
description,
}
}
pub fn on_columns(&self) -> &[PhysicalExprRef] {
&self.on_columns
}
pub fn seeds(&self) -> (u64, u64, u64, u64) {
self.random_state.seeds()
}
pub fn description(&self) -> &str {
&self.description
}
}
impl std::fmt::Debug for HashExpr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let cols = self
.on_columns
.iter()
.map(|e| e.to_string())
.collect::<Vec<_>>()
.join(", ");
let (s1, s2, s3, s4) = self.seeds();
write!(f, "{}({cols}, [{s1},{s2},{s3},{s4}])", self.description)
}
}
impl Hash for HashExpr {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.on_columns.dyn_hash(state);
self.description.hash(state);
self.seeds().hash(state);
}
}
impl PartialEq for HashExpr {
fn eq(&self, other: &Self) -> bool {
self.on_columns == other.on_columns
&& self.description == other.description
&& self.seeds() == other.seeds()
}
}
impl Eq for HashExpr {}
impl Display for HashExpr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.description)
}
}
impl PhysicalExpr for HashExpr {
fn as_any(&self) -> &dyn Any {
self
}
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
self.on_columns.iter().collect()
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(Arc::new(HashExpr::new(
children,
self.random_state.clone(),
self.description.clone(),
)))
}
fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
Ok(DataType::UInt64)
}
fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
Ok(false)
}
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
let num_rows = batch.num_rows();
let keys_values = evaluate_columns(&self.on_columns, batch)?;
let mut hashes_buffer = vec![0; num_rows];
create_hashes(
&keys_values,
self.random_state.random_state(),
&mut hashes_buffer,
)?;
Ok(ColumnarValue::Array(Arc::new(UInt64Array::from(
hashes_buffer,
))))
}
fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.description)
}
}
pub struct HashTableLookupExpr {
on_columns: Vec<PhysicalExprRef>,
random_state: SeededRandomState,
map: Arc<Map>,
description: String,
}
impl HashTableLookupExpr {
pub fn new(
on_columns: Vec<PhysicalExprRef>,
random_state: SeededRandomState,
map: Arc<Map>,
description: String,
) -> Self {
Self {
on_columns,
random_state,
map,
description,
}
}
}
impl std::fmt::Debug for HashTableLookupExpr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let cols = self
.on_columns
.iter()
.map(|e| e.to_string())
.collect::<Vec<_>>()
.join(", ");
let (s1, s2, s3, s4) = self.random_state.seeds();
write!(f, "{}({cols}, [{s1},{s2},{s3},{s4}])", self.description)
}
}
impl Hash for HashTableLookupExpr {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.on_columns.dyn_hash(state);
self.description.hash(state);
self.random_state.seeds().hash(state);
Arc::as_ptr(&self.map).hash(state);
}
}
impl PartialEq for HashTableLookupExpr {
fn eq(&self, other: &Self) -> bool {
self.on_columns == other.on_columns
&& self.description == other.description
&& self.random_state.seeds() == other.random_state.seeds()
&& Arc::ptr_eq(&self.map, &other.map)
}
}
impl Eq for HashTableLookupExpr {}
impl Display for HashTableLookupExpr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.description)
}
}
impl PhysicalExpr for HashTableLookupExpr {
fn as_any(&self) -> &dyn Any {
self
}
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
self.on_columns.iter().collect()
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(Arc::new(HashTableLookupExpr::new(
children,
self.random_state.clone(),
Arc::clone(&self.map),
self.description.clone(),
)))
}
fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
Ok(DataType::Boolean)
}
fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
Ok(false)
}
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
let join_keys = evaluate_columns(&self.on_columns, batch)?;
match self.map.as_ref() {
Map::HashMap(map) => {
with_hashes(&join_keys, self.random_state.random_state(), |hashes| {
let array = map.contain_hashes(hashes);
Ok(ColumnarValue::Array(Arc::new(array)))
})
}
Map::ArrayMap(map) => {
let array = map.contain_keys(&join_keys)?;
Ok(ColumnarValue::Array(Arc::new(array)))
}
}
}
fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.description)
}
}
fn evaluate_columns(
columns: &[PhysicalExprRef],
batch: &RecordBatch,
) -> Result<Vec<ArrayRef>> {
let num_rows = batch.num_rows();
columns
.iter()
.map(|c| c.evaluate(batch)?.into_array(num_rows))
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::joins::join_hash_map::JoinHashMapU32;
use datafusion_physical_expr::expressions::Column;
use std::collections::hash_map::DefaultHasher;
use std::hash::Hasher;
fn compute_hash<T: Hash>(value: &T) -> u64 {
let mut hasher = DefaultHasher::new();
value.hash(&mut hasher);
hasher.finish()
}
#[test]
fn test_hash_expr_eq_same() {
let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
let col_b: PhysicalExprRef = Arc::new(Column::new("b", 1));
let expr1 = HashExpr::new(
vec![Arc::clone(&col_a), Arc::clone(&col_b)],
SeededRandomState::with_seeds(1, 2, 3, 4),
"test_hash".to_string(),
);
let expr2 = HashExpr::new(
vec![Arc::clone(&col_a), Arc::clone(&col_b)],
SeededRandomState::with_seeds(1, 2, 3, 4),
"test_hash".to_string(),
);
assert_eq!(expr1, expr2);
}
#[test]
fn test_hash_expr_eq_different_columns() {
let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
let col_b: PhysicalExprRef = Arc::new(Column::new("b", 1));
let col_c: PhysicalExprRef = Arc::new(Column::new("c", 2));
let expr1 = HashExpr::new(
vec![Arc::clone(&col_a), Arc::clone(&col_b)],
SeededRandomState::with_seeds(1, 2, 3, 4),
"test_hash".to_string(),
);
let expr2 = HashExpr::new(
vec![Arc::clone(&col_a), Arc::clone(&col_c)],
SeededRandomState::with_seeds(1, 2, 3, 4),
"test_hash".to_string(),
);
assert_ne!(expr1, expr2);
}
#[test]
fn test_hash_expr_eq_different_description() {
let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
let expr1 = HashExpr::new(
vec![Arc::clone(&col_a)],
SeededRandomState::with_seeds(1, 2, 3, 4),
"hash_one".to_string(),
);
let expr2 = HashExpr::new(
vec![Arc::clone(&col_a)],
SeededRandomState::with_seeds(1, 2, 3, 4),
"hash_two".to_string(),
);
assert_ne!(expr1, expr2);
}
#[test]
fn test_hash_expr_eq_different_seeds() {
let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
let expr1 = HashExpr::new(
vec![Arc::clone(&col_a)],
SeededRandomState::with_seeds(1, 2, 3, 4),
"test_hash".to_string(),
);
let expr2 = HashExpr::new(
vec![Arc::clone(&col_a)],
SeededRandomState::with_seeds(5, 6, 7, 8),
"test_hash".to_string(),
);
assert_ne!(expr1, expr2);
}
#[test]
fn test_hash_expr_hash_consistency() {
let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
let col_b: PhysicalExprRef = Arc::new(Column::new("b", 1));
let expr1 = HashExpr::new(
vec![Arc::clone(&col_a), Arc::clone(&col_b)],
SeededRandomState::with_seeds(1, 2, 3, 4),
"test_hash".to_string(),
);
let expr2 = HashExpr::new(
vec![Arc::clone(&col_a), Arc::clone(&col_b)],
SeededRandomState::with_seeds(1, 2, 3, 4),
"test_hash".to_string(),
);
assert_eq!(expr1, expr2);
assert_eq!(compute_hash(&expr1), compute_hash(&expr2));
}
#[test]
fn test_hash_table_lookup_expr_eq_same() {
let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
let hash_map =
Arc::new(Map::HashMap(Box::new(JoinHashMapU32::with_capacity(10))));
let expr1 = HashTableLookupExpr::new(
vec![Arc::clone(&col_a)],
SeededRandomState::with_seeds(1, 2, 3, 4),
Arc::clone(&hash_map),
"lookup".to_string(),
);
let expr2 = HashTableLookupExpr::new(
vec![Arc::clone(&col_a)],
SeededRandomState::with_seeds(1, 2, 3, 4),
Arc::clone(&hash_map),
"lookup".to_string(),
);
assert_eq!(expr1, expr2);
}
#[test]
fn test_hash_table_lookup_expr_eq_different_columns() {
let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
let col_b: PhysicalExprRef = Arc::new(Column::new("b", 1));
let hash_map =
Arc::new(Map::HashMap(Box::new(JoinHashMapU32::with_capacity(10))));
let expr1 = HashTableLookupExpr::new(
vec![Arc::clone(&col_a)],
SeededRandomState::with_seeds(1, 2, 3, 4),
Arc::clone(&hash_map),
"lookup".to_string(),
);
let expr2 = HashTableLookupExpr::new(
vec![Arc::clone(&col_b)],
SeededRandomState::with_seeds(1, 2, 3, 4),
Arc::clone(&hash_map),
"lookup".to_string(),
);
assert_ne!(expr1, expr2);
}
#[test]
fn test_hash_table_lookup_expr_eq_different_description() {
let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
let hash_map =
Arc::new(Map::HashMap(Box::new(JoinHashMapU32::with_capacity(10))));
let expr1 = HashTableLookupExpr::new(
vec![Arc::clone(&col_a)],
SeededRandomState::with_seeds(1, 2, 3, 4),
Arc::clone(&hash_map),
"lookup_one".to_string(),
);
let expr2 = HashTableLookupExpr::new(
vec![Arc::clone(&col_a)],
SeededRandomState::with_seeds(1, 2, 3, 4),
Arc::clone(&hash_map),
"lookup_two".to_string(),
);
assert_ne!(expr1, expr2);
}
#[test]
fn test_hash_table_lookup_expr_eq_different_hash_map() {
let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
let hash_map1 =
Arc::new(Map::HashMap(Box::new(JoinHashMapU32::with_capacity(10))));
let hash_map2 =
Arc::new(Map::HashMap(Box::new(JoinHashMapU32::with_capacity(10))));
let expr1 = HashTableLookupExpr::new(
vec![Arc::clone(&col_a)],
SeededRandomState::with_seeds(1, 2, 3, 4),
hash_map1,
"lookup".to_string(),
);
let expr2 = HashTableLookupExpr::new(
vec![Arc::clone(&col_a)],
SeededRandomState::with_seeds(1, 2, 3, 4),
hash_map2,
"lookup".to_string(),
);
assert_ne!(expr1, expr2);
}
#[test]
fn test_hash_table_lookup_expr_hash_consistency() {
let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
let hash_map =
Arc::new(Map::HashMap(Box::new(JoinHashMapU32::with_capacity(10))));
let expr1 = HashTableLookupExpr::new(
vec![Arc::clone(&col_a)],
SeededRandomState::with_seeds(1, 2, 3, 4),
Arc::clone(&hash_map),
"lookup".to_string(),
);
let expr2 = HashTableLookupExpr::new(
vec![Arc::clone(&col_a)],
SeededRandomState::with_seeds(1, 2, 3, 4),
Arc::clone(&hash_map),
"lookup".to_string(),
);
assert_eq!(expr1, expr2);
assert_eq!(compute_hash(&expr1), compute_hash(&expr2));
}
}