use arrow_schema::DataType;
use num_traits::AsPrimitive;
use std::mem::size_of;
use crate::joins::MapOffset;
use crate::joins::chain::traverse_chain;
use arrow::array::{Array, ArrayRef, AsArray, BooleanArray};
use arrow::buffer::BooleanBuffer;
use arrow::datatypes::ArrowNumericType;
use datafusion_common::{Result, ScalarValue, internal_err};
macro_rules! downcast_supported_integer {
($DATA_TYPE:expr => ($METHOD:ident $(, $ARGS:expr)*)) => {
match $DATA_TYPE {
arrow::datatypes::DataType::Int8 => ArrayMap::$METHOD::<arrow::datatypes::Int8Type>($($ARGS),*),
arrow::datatypes::DataType::Int16 => ArrayMap::$METHOD::<arrow::datatypes::Int16Type>($($ARGS),*),
arrow::datatypes::DataType::Int32 => ArrayMap::$METHOD::<arrow::datatypes::Int32Type>($($ARGS),*),
arrow::datatypes::DataType::Int64 => ArrayMap::$METHOD::<arrow::datatypes::Int64Type>($($ARGS),*),
arrow::datatypes::DataType::UInt8 => ArrayMap::$METHOD::<arrow::datatypes::UInt8Type>($($ARGS),*),
arrow::datatypes::DataType::UInt16 => ArrayMap::$METHOD::<arrow::datatypes::UInt16Type>($($ARGS),*),
arrow::datatypes::DataType::UInt32 => ArrayMap::$METHOD::<arrow::datatypes::UInt32Type>($($ARGS),*),
arrow::datatypes::DataType::UInt64 => ArrayMap::$METHOD::<arrow::datatypes::UInt64Type>($($ARGS),*),
_ => {
return internal_err!(
"Unsupported type for ArrayMap: {:?}",
$DATA_TYPE
);
}
}
};
}
#[derive(Debug)]
pub struct ArrayMap {
data: Vec<u32>,
offset: u64,
next: Vec<u32>,
num_of_distinct_key: usize,
}
impl ArrayMap {
pub fn is_supported_type(data_type: &DataType) -> bool {
matches!(
data_type,
DataType::Int8
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64
)
}
pub(crate) fn key_to_u64(v: &ScalarValue) -> Option<u64> {
match v {
ScalarValue::Int8(Some(v)) => Some(*v as u64),
ScalarValue::Int16(Some(v)) => Some(*v as u64),
ScalarValue::Int32(Some(v)) => Some(*v as u64),
ScalarValue::Int64(Some(v)) => Some(*v as u64),
ScalarValue::UInt8(Some(v)) => Some(*v as u64),
ScalarValue::UInt16(Some(v)) => Some(*v as u64),
ScalarValue::UInt32(Some(v)) => Some(*v as u64),
ScalarValue::UInt64(Some(v)) => Some(*v),
_ => None,
}
}
pub fn estimate_memory_size(min_val: u64, max_val: u64, num_rows: usize) -> usize {
let range = Self::calculate_range(min_val, max_val);
if range >= usize::MAX as u64 {
return usize::MAX;
}
let size = (range + 1) as usize;
size.saturating_mul(size_of::<u32>())
.saturating_add(num_rows.saturating_mul(size_of::<u32>()))
}
pub fn calculate_range(min_val: u64, max_val: u64) -> u64 {
max_val.wrapping_sub(min_val)
}
pub(crate) fn try_new(array: &ArrayRef, min_val: u64, max_val: u64) -> Result<Self> {
let range = max_val.wrapping_sub(min_val);
if range >= usize::MAX as u64 {
return internal_err!("ArrayMap key range is too large to be allocated.");
}
let size = (range + 1) as usize;
let mut data: Vec<u32> = vec![0; size];
let mut next: Vec<u32> = vec![];
let mut num_of_distinct_key = 0;
downcast_supported_integer!(
array.data_type() => (
fill_data,
array,
min_val,
&mut data,
&mut next,
&mut num_of_distinct_key
)
)?;
Ok(Self {
data,
offset: min_val,
next,
num_of_distinct_key,
})
}
fn fill_data<T: ArrowNumericType>(
array: &ArrayRef,
offset_val: u64,
data: &mut [u32],
next: &mut Vec<u32>,
num_of_distinct_key: &mut usize,
) -> Result<()>
where
T::Native: AsPrimitive<u64>,
{
let arr = array.as_primitive::<T>();
for (i, val) in arr.iter().enumerate().rev() {
if let Some(val) = val {
let key: u64 = val.as_();
let idx = key.wrapping_sub(offset_val) as usize;
if idx >= data.len() {
return internal_err!("failed build Array idx >= data.len()");
}
if data[idx] != 0 {
if next.is_empty() {
*next = vec![0; array.len()]
}
next[i] = data[idx]
} else {
*num_of_distinct_key += 1;
}
data[idx] = (i) as u32 + 1;
}
}
Ok(())
}
pub fn num_of_distinct_key(&self) -> usize {
self.num_of_distinct_key
}
pub fn size(&self) -> usize {
self.data.capacity() * size_of::<u32>() + self.next.capacity() * size_of::<u32>()
}
pub fn get_matched_indices_with_limit_offset(
&self,
prob_side_keys: &[ArrayRef],
limit: usize,
current_offset: MapOffset,
probe_indices: &mut Vec<u32>,
build_indices: &mut Vec<u64>,
) -> Result<Option<MapOffset>> {
if prob_side_keys.len() != 1 {
return internal_err!(
"ArrayMap expects 1 join key, but got {}",
prob_side_keys.len()
);
}
let array = &prob_side_keys[0];
downcast_supported_integer!(
array.data_type() => (
lookup_and_get_indices,
self,
array,
limit,
current_offset,
probe_indices,
build_indices
)
)
}
fn lookup_and_get_indices<T: ArrowNumericType>(
&self,
array: &ArrayRef,
limit: usize,
current_offset: MapOffset,
probe_indices: &mut Vec<u32>,
build_indices: &mut Vec<u64>,
) -> Result<Option<MapOffset>>
where
T::Native: Copy + AsPrimitive<u64>,
{
probe_indices.clear();
build_indices.clear();
let arr = array.as_primitive::<T>();
let have_null = arr.null_count() > 0;
if self.next.is_empty() {
for prob_idx in current_offset.0..arr.len() {
if build_indices.len() == limit {
return Ok(Some((prob_idx, None)));
}
if have_null && arr.is_null(prob_idx) {
continue;
}
let prob_val: u64 = unsafe { arr.value_unchecked(prob_idx) }.as_();
let idx_in_build_side = prob_val.wrapping_sub(self.offset) as usize;
if idx_in_build_side >= self.data.len()
|| self.data[idx_in_build_side] == 0
{
continue;
}
build_indices.push((self.data[idx_in_build_side] - 1) as u64);
probe_indices.push(prob_idx as u32);
}
Ok(None)
} else {
let mut remaining_output = limit;
let to_skip = match current_offset {
(idx, None) => idx,
(idx, Some(0)) => idx + 1,
(idx, Some(next_idx)) => {
let is_last = idx == arr.len() - 1;
if let Some(next_offset) = traverse_chain(
&self.next,
idx,
next_idx as u32,
&mut remaining_output,
probe_indices,
build_indices,
is_last,
) {
return Ok(Some(next_offset));
}
idx + 1
}
};
for prob_side_idx in to_skip..arr.len() {
if remaining_output == 0 {
return Ok(Some((prob_side_idx, None)));
}
if arr.is_null(prob_side_idx) {
continue;
}
let is_last = prob_side_idx == arr.len() - 1;
let prob_val: u64 = unsafe { arr.value_unchecked(prob_side_idx) }.as_();
let idx_in_build_side = prob_val.wrapping_sub(self.offset) as usize;
if idx_in_build_side >= self.data.len()
|| self.data[idx_in_build_side] == 0
{
continue;
}
let build_idx = self.data[idx_in_build_side];
if let Some(offset) = traverse_chain(
&self.next,
prob_side_idx,
build_idx,
&mut remaining_output,
probe_indices,
build_indices,
is_last,
) {
return Ok(Some(offset));
}
}
Ok(None)
}
}
pub fn contain_keys(&self, probe_side_keys: &[ArrayRef]) -> Result<BooleanArray> {
if probe_side_keys.len() != 1 {
return internal_err!(
"ArrayMap join expects 1 join key, but got {}",
probe_side_keys.len()
);
}
let array = &probe_side_keys[0];
downcast_supported_integer!(
array.data_type() => (
contain_hashes_helper,
self,
array
)
)
}
fn contain_hashes_helper<T: ArrowNumericType>(
&self,
array: &ArrayRef,
) -> Result<BooleanArray>
where
T::Native: AsPrimitive<u64>,
{
let arr = array.as_primitive::<T>();
let buffer = BooleanBuffer::collect_bool(arr.len(), |i| {
if arr.is_null(i) {
return false;
}
let key: u64 = unsafe { arr.value_unchecked(i) }.as_();
let idx = key.wrapping_sub(self.offset) as usize;
idx < self.data.len() && self.data[idx] != 0
});
Ok(BooleanArray::new(buffer, None))
}
}
#[cfg(test)]
mod tests {
use super::*;
use arrow::array::Int32Array;
use arrow::array::Int64Array;
use std::sync::Arc;
#[test]
fn test_array_map_limit_offset_duplicate_elements() -> Result<()> {
let build: ArrayRef = Arc::new(Int32Array::from(vec![1, 1, 2]));
let map = ArrayMap::try_new(&build, 1, 2)?;
let probe = [Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef];
let mut prob_idx = Vec::new();
let mut build_idx = Vec::new();
let mut next = Some((0, None));
let mut results = vec![];
while let Some(o) = next {
next = map.get_matched_indices_with_limit_offset(
&probe,
1,
o,
&mut prob_idx,
&mut build_idx,
)?;
results.push((prob_idx.clone(), build_idx.clone(), next));
}
let expected = vec![
(vec![0], vec![0], Some((0, Some(2)))),
(vec![0], vec![1], Some((0, Some(0)))),
(vec![1], vec![2], None),
];
assert_eq!(results, expected);
Ok(())
}
#[test]
fn test_array_map_with_limit_and_misses() -> Result<()> {
let build: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
let map = ArrayMap::try_new(&build, 1, 2)?;
let probe = [Arc::new(Int32Array::from(vec![10, 1, 2])) as ArrayRef];
let (mut p_idx, mut b_idx) = (vec![], vec![]);
let next = map.get_matched_indices_with_limit_offset(
&probe,
1,
(0, None),
&mut p_idx,
&mut b_idx,
)?;
assert_eq!(p_idx, vec![1]);
assert_eq!(b_idx, vec![0]);
assert_eq!(next, Some((2, None)));
let next = map.get_matched_indices_with_limit_offset(
&probe,
1,
next.unwrap(),
&mut p_idx,
&mut b_idx,
)?;
assert_eq!(p_idx, vec![2]);
assert_eq!(b_idx, vec![1]);
assert!(next.is_none());
Ok(())
}
#[test]
fn test_array_map_with_build_duplicates_and_misses() -> Result<()> {
let build_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 1]));
let array_map = ArrayMap::try_new(&build_array, 1, 1)?;
let probe_array: ArrayRef = Arc::new(Int32Array::from(vec![10, 1, 20, 1]));
let prob_side_keys = [probe_array];
let mut prob_indices = Vec::new();
let mut build_indices = Vec::new();
let result_offset = array_map.get_matched_indices_with_limit_offset(
&prob_side_keys,
3,
(0, None),
&mut prob_indices,
&mut build_indices,
)?;
assert_eq!(prob_indices, vec![1, 1, 3]);
assert_eq!(build_indices, vec![0, 1, 0]);
assert_eq!(result_offset, Some((3, Some(2))));
Ok(())
}
#[test]
fn test_array_map_i64_with_negative_and_positive_numbers() -> Result<()> {
let build_array: ArrayRef = Arc::new(Int64Array::from(vec![-5, 0, 5, -2, 3, 10]));
let min_val = -5_i128;
let max_val = 10_i128;
let array_map = ArrayMap::try_new(&build_array, min_val as u64, max_val as u64)?;
let probe_array: ArrayRef = Arc::new(Int64Array::from(vec![0, -5, 10, -1]));
let prob_side_keys = [Arc::clone(&probe_array)];
let mut prob_indices = Vec::new();
let mut build_indices = Vec::new();
let result_offset = array_map.get_matched_indices_with_limit_offset(
&prob_side_keys,
10, (0, None),
&mut prob_indices,
&mut build_indices,
)?;
let expected_prob_indices = vec![0, 1, 2];
let expected_build_indices = vec![1, 0, 5];
assert_eq!(prob_indices, expected_prob_indices);
assert_eq!(build_indices, expected_build_indices);
assert!(result_offset.is_none());
Ok(())
}
}