mod boolean;
mod bytes;
pub mod bytes_view;
pub mod primitive;
use std::mem::{self, size_of};
use crate::aggregates::group_values::GroupValues;
use crate::aggregates::group_values::multi_group_by::{
boolean::BooleanGroupValueBuilder, bytes::ByteGroupValueBuilder,
bytes_view::ByteViewGroupValueBuilder, primitive::PrimitiveGroupValueBuilder,
};
use ahash::RandomState;
use arrow::array::{Array, ArrayRef};
use arrow::compute::cast;
use arrow::datatypes::{
BinaryViewType, DataType, Date32Type, Date64Type, Decimal128Type, Float32Type,
Float64Type, Int8Type, Int16Type, Int32Type, Int64Type, Schema, SchemaRef,
StringViewType, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
Time64NanosecondType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType,
TimestampNanosecondType, TimestampSecondType, UInt8Type, UInt16Type, UInt32Type,
UInt64Type,
};
use datafusion_common::hash_utils::create_hashes;
use datafusion_common::{Result, internal_datafusion_err, not_impl_err};
use datafusion_execution::memory_pool::proxy::{HashTableAllocExt, VecAllocExt};
use datafusion_expr::EmitTo;
use datafusion_physical_expr::binary_map::OutputType;
use hashbrown::hash_table::HashTable;
const NON_INLINED_FLAG: u64 = 0x8000000000000000;
const VALUE_MASK: u64 = 0x7FFFFFFFFFFFFFFF;
pub trait GroupColumn: Send + Sync {
fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool;
fn append_val(&mut self, array: &ArrayRef, row: usize) -> Result<()>;
fn vectorized_equal_to(
&self,
lhs_rows: &[usize],
array: &ArrayRef,
rhs_rows: &[usize],
equal_to_results: &mut [bool],
);
fn vectorized_append(&mut self, array: &ArrayRef, rows: &[usize]) -> Result<()>;
fn len(&self) -> usize;
fn is_empty(&self) -> bool {
self.len() == 0
}
fn size(&self) -> usize;
fn build(self: Box<Self>) -> ArrayRef;
fn take_n(&mut self, n: usize) -> ArrayRef;
}
pub fn nulls_equal_to(lhs_null: bool, rhs_null: bool) -> Option<bool> {
match (lhs_null, rhs_null) {
(true, true) => Some(true),
(false, true) | (true, false) => Some(false),
_ => None,
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct GroupIndexView(u64);
impl GroupIndexView {
#[inline]
pub fn is_non_inlined(&self) -> bool {
(self.0 & NON_INLINED_FLAG) > 0
}
#[inline]
pub fn new_inlined(group_index: u64) -> Self {
Self(group_index)
}
#[inline]
pub fn new_non_inlined(list_offset: u64) -> Self {
let non_inlined_value = list_offset | NON_INLINED_FLAG;
Self(non_inlined_value)
}
#[inline]
pub fn value(&self) -> u64 {
self.0 & VALUE_MASK
}
}
pub struct GroupValuesColumn<const STREAMING: bool> {
schema: SchemaRef,
map: HashTable<(u64, GroupIndexView)>,
map_size: usize,
group_index_lists: Vec<Vec<usize>>,
emit_group_index_list_buffer: Vec<usize>,
vectorized_operation_buffers: VectorizedOperationBuffers,
group_values: Vec<Box<dyn GroupColumn>>,
hashes_buffer: Vec<u64>,
random_state: RandomState,
}
#[derive(Default)]
struct VectorizedOperationBuffers {
append_row_indices: Vec<usize>,
equal_to_row_indices: Vec<usize>,
equal_to_group_indices: Vec<usize>,
equal_to_results: Vec<bool>,
remaining_row_indices: Vec<usize>,
}
impl VectorizedOperationBuffers {
fn clear(&mut self) {
self.append_row_indices.clear();
self.equal_to_row_indices.clear();
self.equal_to_group_indices.clear();
self.equal_to_results.clear();
self.remaining_row_indices.clear();
}
}
impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
pub fn try_new(schema: SchemaRef) -> Result<Self> {
let map = HashTable::with_capacity(0);
Ok(Self {
schema,
map,
group_index_lists: Vec::new(),
emit_group_index_list_buffer: Vec::new(),
vectorized_operation_buffers: VectorizedOperationBuffers::default(),
map_size: 0,
group_values: vec![],
hashes_buffer: Default::default(),
random_state: crate::aggregates::AGGREGATION_HASH_SEED,
})
}
fn scalarized_intern(
&mut self,
cols: &[ArrayRef],
groups: &mut Vec<usize>,
) -> Result<()> {
let n_rows = cols[0].len();
groups.clear();
let batch_hashes = &mut self.hashes_buffer;
batch_hashes.clear();
batch_hashes.resize(n_rows, 0);
create_hashes(cols, &self.random_state, batch_hashes)?;
for (row, &target_hash) in batch_hashes.iter().enumerate() {
let entry = self
.map
.find_mut(target_hash, |(exist_hash, group_idx_view)| {
debug_assert!(!group_idx_view.is_non_inlined());
if target_hash != *exist_hash {
return false;
}
fn check_row_equal(
array_row: &dyn GroupColumn,
lhs_row: usize,
array: &ArrayRef,
rhs_row: usize,
) -> bool {
array_row.equal_to(lhs_row, array, rhs_row)
}
for (i, group_val) in self.group_values.iter().enumerate() {
if !check_row_equal(
group_val.as_ref(),
group_idx_view.value() as usize,
&cols[i],
row,
) {
return false;
}
}
true
});
let group_idx = match entry {
Some((_hash, group_idx_view)) => group_idx_view.value() as usize,
None => {
let mut checklen = 0;
let group_idx = self.group_values[0].len();
for (i, group_value) in self.group_values.iter_mut().enumerate() {
group_value.append_val(&cols[i], row)?;
let len = group_value.len();
if i == 0 {
checklen = len;
} else {
debug_assert_eq!(checklen, len);
}
}
self.map.insert_accounted(
(target_hash, GroupIndexView::new_inlined(group_idx as u64)),
|(hash, _group_index)| *hash,
&mut self.map_size,
);
group_idx
}
};
groups.push(group_idx);
}
Ok(())
}
fn vectorized_intern(
&mut self,
cols: &[ArrayRef],
groups: &mut Vec<usize>,
) -> Result<()> {
let n_rows = cols[0].len();
groups.clear();
groups.resize(n_rows, usize::MAX);
let mut batch_hashes = mem::take(&mut self.hashes_buffer);
batch_hashes.clear();
batch_hashes.resize(n_rows, 0);
create_hashes(cols, &self.random_state, &mut batch_hashes)?;
self.collect_vectorized_process_context(&batch_hashes, groups);
self.vectorized_append(cols)?;
self.vectorized_equal_to(cols, groups);
self.scalarized_intern_remaining(cols, &batch_hashes, groups)?;
self.hashes_buffer = batch_hashes;
Ok(())
}
fn collect_vectorized_process_context(
&mut self,
batch_hashes: &[u64],
groups: &mut [usize],
) {
self.vectorized_operation_buffers.append_row_indices.clear();
self.vectorized_operation_buffers
.equal_to_row_indices
.clear();
self.vectorized_operation_buffers
.equal_to_group_indices
.clear();
let mut group_values_len = self.group_values[0].len();
for (row, &target_hash) in batch_hashes.iter().enumerate() {
let entry = self
.map
.find(target_hash, |(exist_hash, _)| target_hash == *exist_hash);
let Some((_, group_index_view)) = entry else {
let current_group_idx = group_values_len;
let group_index_view =
GroupIndexView::new_inlined(current_group_idx as u64);
self.map.insert_accounted(
(target_hash, group_index_view),
|(hash, _)| *hash,
&mut self.map_size,
);
self.vectorized_operation_buffers
.append_row_indices
.push(row);
groups[row] = current_group_idx;
group_values_len += 1;
continue;
};
if group_index_view.is_non_inlined() {
let list_offset = group_index_view.value() as usize;
let group_index_list = &self.group_index_lists[list_offset];
self.vectorized_operation_buffers
.equal_to_group_indices
.extend_from_slice(group_index_list);
self.vectorized_operation_buffers
.equal_to_row_indices
.extend(std::iter::repeat_n(row, group_index_list.len()));
} else {
let group_index = group_index_view.value() as usize;
self.vectorized_operation_buffers
.equal_to_row_indices
.push(row);
self.vectorized_operation_buffers
.equal_to_group_indices
.push(group_index);
}
}
}
fn vectorized_append(&mut self, cols: &[ArrayRef]) -> Result<()> {
if self
.vectorized_operation_buffers
.append_row_indices
.is_empty()
{
return Ok(());
}
let iter = self.group_values.iter_mut().zip(cols.iter());
for (group_column, col) in iter {
group_column.vectorized_append(
col,
&self.vectorized_operation_buffers.append_row_indices,
)?;
}
Ok(())
}
fn vectorized_equal_to(&mut self, cols: &[ArrayRef], groups: &mut [usize]) {
assert_eq!(
self.vectorized_operation_buffers
.equal_to_group_indices
.len(),
self.vectorized_operation_buffers.equal_to_row_indices.len()
);
self.vectorized_operation_buffers
.remaining_row_indices
.clear();
if self
.vectorized_operation_buffers
.equal_to_group_indices
.is_empty()
{
return;
}
let mut equal_to_results =
mem::take(&mut self.vectorized_operation_buffers.equal_to_results);
equal_to_results.clear();
equal_to_results.resize(
self.vectorized_operation_buffers
.equal_to_group_indices
.len(),
true,
);
for (col_idx, group_col) in self.group_values.iter().enumerate() {
group_col.vectorized_equal_to(
&self.vectorized_operation_buffers.equal_to_group_indices,
&cols[col_idx],
&self.vectorized_operation_buffers.equal_to_row_indices,
&mut equal_to_results,
);
}
let mut current_row_equal_to_result = false;
for (idx, &row) in self
.vectorized_operation_buffers
.equal_to_row_indices
.iter()
.enumerate()
{
let equal_to_result = equal_to_results[idx];
if equal_to_result {
groups[row] =
self.vectorized_operation_buffers.equal_to_group_indices[idx];
}
current_row_equal_to_result |= equal_to_result;
let next_row = self
.vectorized_operation_buffers
.equal_to_row_indices
.get(idx + 1)
.unwrap_or(&usize::MAX);
if row != *next_row {
if !current_row_equal_to_result {
self.vectorized_operation_buffers
.remaining_row_indices
.push(row);
}
current_row_equal_to_result = false;
}
}
self.vectorized_operation_buffers.equal_to_results = equal_to_results;
}
fn scalarized_intern_remaining(
&mut self,
cols: &[ArrayRef],
batch_hashes: &[u64],
groups: &mut [usize],
) -> Result<()> {
if self
.vectorized_operation_buffers
.remaining_row_indices
.is_empty()
{
return Ok(());
}
let mut map = mem::take(&mut self.map);
for &row in &self.vectorized_operation_buffers.remaining_row_indices {
let target_hash = batch_hashes[row];
let entry = map.find_mut(target_hash, |(exist_hash, _)| {
target_hash == *exist_hash
});
let Some((_, group_index_view)) = entry else {
unreachable!()
};
if self.scalarized_equal_to_remaining(group_index_view, cols, row, groups) {
continue;
}
let group_idx = self.group_values[0].len();
let mut checklen = 0;
for (i, group_value) in self.group_values.iter_mut().enumerate() {
group_value.append_val(&cols[i], row)?;
let len = group_value.len();
if i == 0 {
checklen = len;
} else {
debug_assert_eq!(checklen, len);
}
}
if group_index_view.is_non_inlined() {
let list_offset = group_index_view.value() as usize;
let group_index_list = &mut self.group_index_lists[list_offset];
group_index_list.push(group_idx);
} else {
let list_offset = self.group_index_lists.len();
let exist_group_index = group_index_view.value() as usize;
let new_group_index_list = vec![exist_group_index, group_idx];
self.group_index_lists.push(new_group_index_list);
let new_group_index_view =
GroupIndexView::new_non_inlined(list_offset as u64);
*group_index_view = new_group_index_view;
}
groups[row] = group_idx;
}
self.map = map;
Ok(())
}
fn scalarized_equal_to_remaining(
&self,
group_index_view: &GroupIndexView,
cols: &[ArrayRef],
row: usize,
groups: &mut [usize],
) -> bool {
fn check_row_equal(
array_row: &dyn GroupColumn,
lhs_row: usize,
array: &ArrayRef,
rhs_row: usize,
) -> bool {
array_row.equal_to(lhs_row, array, rhs_row)
}
if group_index_view.is_non_inlined() {
let list_offset = group_index_view.value() as usize;
let group_index_list = &self.group_index_lists[list_offset];
for &group_idx in group_index_list {
let mut check_result = true;
for (i, group_val) in self.group_values.iter().enumerate() {
if !check_row_equal(group_val.as_ref(), group_idx, &cols[i], row) {
check_result = false;
break;
}
}
if check_result {
groups[row] = group_idx;
return true;
}
}
false
} else {
let group_idx = group_index_view.value() as usize;
for (i, group_val) in self.group_values.iter().enumerate() {
if !check_row_equal(group_val.as_ref(), group_idx, &cols[i], row) {
return false;
}
}
groups[row] = group_idx;
true
}
}
#[cfg(test)]
fn get_indices_by_hash(&self, hash: u64) -> Option<(Vec<usize>, GroupIndexView)> {
let entry = self.map.find(hash, |(exist_hash, _)| hash == *exist_hash);
match entry {
Some((_, group_index_view)) => {
if group_index_view.is_non_inlined() {
let list_offset = group_index_view.value() as usize;
Some((
self.group_index_lists[list_offset].clone(),
*group_index_view,
))
} else {
let group_index = group_index_view.value() as usize;
Some((vec![group_index], *group_index_view))
}
}
None => None,
}
}
}
macro_rules! instantiate_primitive {
($v:expr, $nullable:expr, $t:ty, $data_type:ident) => {
if $nullable {
let b = PrimitiveGroupValueBuilder::<$t, true>::new($data_type.to_owned());
$v.push(Box::new(b) as _)
} else {
let b = PrimitiveGroupValueBuilder::<$t, false>::new($data_type.to_owned());
$v.push(Box::new(b) as _)
}
};
}
impl<const STREAMING: bool> GroupValues for GroupValuesColumn<STREAMING> {
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()> {
if self.group_values.is_empty() {
let mut v = Vec::with_capacity(cols.len());
for f in self.schema.fields().iter() {
let nullable = f.is_nullable();
let data_type = f.data_type();
match data_type {
&DataType::Int8 => {
instantiate_primitive!(v, nullable, Int8Type, data_type)
}
&DataType::Int16 => {
instantiate_primitive!(v, nullable, Int16Type, data_type)
}
&DataType::Int32 => {
instantiate_primitive!(v, nullable, Int32Type, data_type)
}
&DataType::Int64 => {
instantiate_primitive!(v, nullable, Int64Type, data_type)
}
&DataType::UInt8 => {
instantiate_primitive!(v, nullable, UInt8Type, data_type)
}
&DataType::UInt16 => {
instantiate_primitive!(v, nullable, UInt16Type, data_type)
}
&DataType::UInt32 => {
instantiate_primitive!(v, nullable, UInt32Type, data_type)
}
&DataType::UInt64 => {
instantiate_primitive!(v, nullable, UInt64Type, data_type)
}
&DataType::Float32 => {
instantiate_primitive!(v, nullable, Float32Type, data_type)
}
&DataType::Float64 => {
instantiate_primitive!(v, nullable, Float64Type, data_type)
}
&DataType::Date32 => {
instantiate_primitive!(v, nullable, Date32Type, data_type)
}
&DataType::Date64 => {
instantiate_primitive!(v, nullable, Date64Type, data_type)
}
&DataType::Time32(t) => match t {
TimeUnit::Second => {
instantiate_primitive!(
v,
nullable,
Time32SecondType,
data_type
)
}
TimeUnit::Millisecond => {
instantiate_primitive!(
v,
nullable,
Time32MillisecondType,
data_type
)
}
_ => {}
},
&DataType::Time64(t) => match t {
TimeUnit::Microsecond => {
instantiate_primitive!(
v,
nullable,
Time64MicrosecondType,
data_type
)
}
TimeUnit::Nanosecond => {
instantiate_primitive!(
v,
nullable,
Time64NanosecondType,
data_type
)
}
_ => {}
},
&DataType::Timestamp(t, _) => match t {
TimeUnit::Second => {
instantiate_primitive!(
v,
nullable,
TimestampSecondType,
data_type
)
}
TimeUnit::Millisecond => {
instantiate_primitive!(
v,
nullable,
TimestampMillisecondType,
data_type
)
}
TimeUnit::Microsecond => {
instantiate_primitive!(
v,
nullable,
TimestampMicrosecondType,
data_type
)
}
TimeUnit::Nanosecond => {
instantiate_primitive!(
v,
nullable,
TimestampNanosecondType,
data_type
)
}
},
&DataType::Decimal128(_, _) => {
instantiate_primitive! {
v,
nullable,
Decimal128Type,
data_type
}
}
&DataType::Utf8 => {
let b = ByteGroupValueBuilder::<i32>::new(OutputType::Utf8);
v.push(Box::new(b) as _)
}
&DataType::LargeUtf8 => {
let b = ByteGroupValueBuilder::<i64>::new(OutputType::Utf8);
v.push(Box::new(b) as _)
}
&DataType::Binary => {
let b = ByteGroupValueBuilder::<i32>::new(OutputType::Binary);
v.push(Box::new(b) as _)
}
&DataType::LargeBinary => {
let b = ByteGroupValueBuilder::<i64>::new(OutputType::Binary);
v.push(Box::new(b) as _)
}
&DataType::Utf8View => {
let b = ByteViewGroupValueBuilder::<StringViewType>::new();
v.push(Box::new(b) as _)
}
&DataType::BinaryView => {
let b = ByteViewGroupValueBuilder::<BinaryViewType>::new();
v.push(Box::new(b) as _)
}
&DataType::Boolean => {
if nullable {
let b = BooleanGroupValueBuilder::<true>::new();
v.push(Box::new(b) as _)
} else {
let b = BooleanGroupValueBuilder::<false>::new();
v.push(Box::new(b) as _)
}
}
dt => {
return not_impl_err!("{dt} not supported in GroupValuesColumn");
}
}
}
self.group_values = v;
}
if !STREAMING {
self.vectorized_intern(cols, groups)
} else {
self.scalarized_intern(cols, groups)
}
}
fn size(&self) -> usize {
let group_values_size: usize = self.group_values.iter().map(|v| v.size()).sum();
group_values_size + self.map_size + self.hashes_buffer.allocated_size()
}
fn is_empty(&self) -> bool {
self.len() == 0
}
fn len(&self) -> usize {
if self.group_values.is_empty() {
return 0;
}
self.group_values[0].len()
}
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
let mut output = match emit_to {
EmitTo::All => {
let group_values = mem::take(&mut self.group_values);
debug_assert!(self.group_values.is_empty());
group_values
.into_iter()
.map(|v| v.build())
.collect::<Vec<_>>()
}
EmitTo::First(n) => {
let output = self
.group_values
.iter_mut()
.map(|v| v.take_n(n))
.collect::<Vec<_>>();
let mut next_new_list_offset = 0;
self.map.retain(|(_exist_hash, group_idx_view)| {
if !STREAMING && group_idx_view.is_non_inlined() {
self.emit_group_index_list_buffer.clear();
let list_offset = group_idx_view.value() as usize;
for group_index in self.group_index_lists[list_offset].iter() {
if let Some(remaining) = group_index.checked_sub(n) {
self.emit_group_index_list_buffer.push(remaining);
}
}
if self.emit_group_index_list_buffer.is_empty() {
false
} else if self.emit_group_index_list_buffer.len() == 1 {
let group_index =
self.emit_group_index_list_buffer.first().unwrap();
*group_idx_view =
GroupIndexView::new_inlined(*group_index as u64);
true
} else {
let group_index_list =
&mut self.group_index_lists[next_new_list_offset];
group_index_list.clear();
group_index_list
.extend(self.emit_group_index_list_buffer.iter());
*group_idx_view = GroupIndexView::new_non_inlined(
next_new_list_offset as u64,
);
next_new_list_offset += 1;
true
}
} else {
debug_assert!(!group_idx_view.is_non_inlined());
let group_index = group_idx_view.value() as usize;
match group_index.checked_sub(n) {
Some(sub) => {
*group_idx_view = GroupIndexView::new_inlined(sub as u64);
true
}
None => false,
}
}
});
if !STREAMING {
self.group_index_lists.truncate(next_new_list_offset);
}
output
}
};
for (field, array) in self.schema.fields.iter().zip(&mut output) {
let expected = field.data_type();
if let DataType::Dictionary(_, v) = expected {
let actual = array.data_type();
if v.as_ref() != actual {
return Err(internal_datafusion_err!(
"Converted group rows expected dictionary of {v} got {actual}"
));
}
*array = cast(array.as_ref(), expected)?;
}
}
Ok(output)
}
fn clear_shrink(&mut self, num_rows: usize) {
self.group_values.clear();
self.map.clear();
self.map.shrink_to(num_rows, |_| 0); self.map_size = self.map.capacity() * size_of::<(u64, usize)>();
self.hashes_buffer.clear();
self.hashes_buffer.shrink_to(num_rows);
if !STREAMING {
self.group_index_lists.clear();
self.emit_group_index_list_buffer.clear();
self.vectorized_operation_buffers.clear();
}
}
}
pub fn supported_schema(schema: &Schema) -> bool {
schema
.fields()
.iter()
.map(|f| f.data_type())
.all(supported_type)
}
fn supported_type(data_type: &DataType) -> bool {
matches!(
*data_type,
DataType::Int8
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64
| DataType::Float32
| DataType::Float64
| DataType::Decimal128(_, _)
| DataType::Utf8
| DataType::LargeUtf8
| DataType::Binary
| DataType::LargeBinary
| DataType::Date32
| DataType::Date64
| DataType::Time32(_)
| DataType::Timestamp(_, _)
| DataType::Utf8View
| DataType::BinaryView
| DataType::Boolean
)
}
enum Nulls {
All,
Some,
None,
}
#[cfg(test)]
mod tests {
use std::{collections::HashMap, sync::Arc};
use arrow::array::{ArrayRef, Int64Array, RecordBatch, StringArray, StringViewArray};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::{compute::concat_batches, util::pretty::pretty_format_batches};
use datafusion_common::utils::proxy::HashTableAllocExt;
use datafusion_expr::EmitTo;
use crate::aggregates::group_values::{
GroupValues, multi_group_by::GroupValuesColumn,
};
use super::GroupIndexView;
#[test]
fn test_intern_for_vectorized_group_values() {
let data_set = VectorizedTestDataSet::new();
let mut group_values =
GroupValuesColumn::<false>::try_new(data_set.schema()).unwrap();
data_set.load_to_group_values(&mut group_values);
let actual_batch = group_values.emit(EmitTo::All).unwrap();
let actual_batch = RecordBatch::try_new(data_set.schema(), actual_batch).unwrap();
check_result(&actual_batch, &data_set.expected_batch);
}
#[test]
fn test_emit_first_n_for_vectorized_group_values() {
let data_set = VectorizedTestDataSet::new();
let mut group_values =
GroupValuesColumn::<false>::try_new(data_set.schema()).unwrap();
let num_rows = data_set.expected_batch.num_rows();
let schema = data_set.schema();
for times_to_take in 1..=num_rows {
data_set.load_to_group_values(&mut group_values);
let suggest_num_emit = data_set.expected_batch.num_rows() / times_to_take;
let mut num_remaining_rows = num_rows;
let mut actual_sub_batches = Vec::new();
for nth_time in 0..times_to_take {
let num_emit = if nth_time == times_to_take - 1 {
num_remaining_rows
} else {
suggest_num_emit
};
let sub_batch = group_values.emit(EmitTo::First(num_emit)).unwrap();
let sub_batch =
RecordBatch::try_new(Arc::clone(&schema), sub_batch).unwrap();
actual_sub_batches.push(sub_batch);
num_remaining_rows -= num_emit;
}
assert!(num_remaining_rows == 0);
let actual_batch = concat_batches(&schema, &actual_sub_batches).unwrap();
check_result(&actual_batch, &data_set.expected_batch);
}
}
#[test]
fn test_hashtable_modifying_in_emit_first_n() {
let field = Field::new_list_field(DataType::Int32, true);
let schema = Arc::new(Schema::new_with_metadata(vec![field], HashMap::new()));
let mut group_values = GroupValuesColumn::<false>::try_new(schema).unwrap();
insert_inline_group_index_view(&mut group_values, 0, 0);
insert_non_inline_group_index_view(&mut group_values, 1, vec![1, 2]);
insert_non_inline_group_index_view(&mut group_values, 2, vec![3, 4, 5]);
insert_inline_group_index_view(&mut group_values, 3, 6);
insert_non_inline_group_index_view(&mut group_values, 4, vec![7, 8]);
insert_non_inline_group_index_view(&mut group_values, 5, vec![9, 10, 11]);
assert_eq!(
group_values.get_indices_by_hash(0).unwrap(),
(vec![0], GroupIndexView::new_inlined(0))
);
assert_eq!(
group_values.get_indices_by_hash(1).unwrap(),
(vec![1, 2], GroupIndexView::new_non_inlined(0))
);
assert_eq!(
group_values.get_indices_by_hash(2).unwrap(),
(vec![3, 4, 5], GroupIndexView::new_non_inlined(1))
);
assert_eq!(
group_values.get_indices_by_hash(3).unwrap(),
(vec![6], GroupIndexView::new_inlined(6))
);
assert_eq!(
group_values.get_indices_by_hash(4).unwrap(),
(vec![7, 8], GroupIndexView::new_non_inlined(2))
);
assert_eq!(
group_values.get_indices_by_hash(5).unwrap(),
(vec![9, 10, 11], GroupIndexView::new_non_inlined(3))
);
assert_eq!(group_values.map.len(), 6);
let _ = group_values.emit(EmitTo::First(4)).unwrap();
assert!(group_values.get_indices_by_hash(0).is_none());
assert!(group_values.get_indices_by_hash(1).is_none());
assert_eq!(
group_values.get_indices_by_hash(2).unwrap(),
(vec![0, 1], GroupIndexView::new_non_inlined(0))
);
assert_eq!(
group_values.get_indices_by_hash(3).unwrap(),
(vec![2], GroupIndexView::new_inlined(2))
);
assert_eq!(
group_values.get_indices_by_hash(4).unwrap(),
(vec![3, 4], GroupIndexView::new_non_inlined(1))
);
assert_eq!(
group_values.get_indices_by_hash(5).unwrap(),
(vec![5, 6, 7], GroupIndexView::new_non_inlined(2))
);
assert_eq!(group_values.map.len(), 4);
let _ = group_values.emit(EmitTo::First(1)).unwrap();
assert_eq!(
group_values.get_indices_by_hash(2).unwrap(),
(vec![0], GroupIndexView::new_inlined(0))
);
assert_eq!(
group_values.get_indices_by_hash(3).unwrap(),
(vec![1], GroupIndexView::new_inlined(1))
);
assert_eq!(
group_values.get_indices_by_hash(4).unwrap(),
(vec![2, 3], GroupIndexView::new_non_inlined(0))
);
assert_eq!(
group_values.get_indices_by_hash(5).unwrap(),
(vec![4, 5, 6], GroupIndexView::new_non_inlined(1))
);
assert_eq!(group_values.map.len(), 4);
let _ = group_values.emit(EmitTo::First(5)).unwrap();
assert_eq!(
group_values.get_indices_by_hash(5).unwrap(),
(vec![0, 1], GroupIndexView::new_non_inlined(0))
);
assert_eq!(group_values.map.len(), 1);
let _ = group_values.emit(EmitTo::First(1)).unwrap();
assert_eq!(
group_values.get_indices_by_hash(5).unwrap(),
(vec![0], GroupIndexView::new_inlined(0))
);
assert_eq!(group_values.map.len(), 1);
let _ = group_values.emit(EmitTo::First(1)).unwrap();
assert!(group_values.map.is_empty());
}
struct VectorizedTestDataSet {
test_batches: Vec<Vec<ArrayRef>>,
expected_batch: RecordBatch,
}
impl VectorizedTestDataSet {
fn new() -> Self {
let col1 = Int64Array::from(vec![
Some(42), None, None, Some(1142), None, Some(42),
None,
None,
Some(1142),
None,
Some(4211), None, None, Some(4212), ]);
let col2 = StringArray::from(vec![
Some("string1"), None, Some("string2"), None, None, Some("string1"),
None,
Some("string2"),
None,
None,
Some("string3"), None, Some("string4"), None, ]);
let col3 = StringViewArray::from(vec![
Some("stringview1"), Some("stringview2"), None, None, None, Some("stringview1"),
Some("stringview2"),
None,
None,
None,
Some("stringview3"), Some("stringview4"), None, None, ]);
let batch1 = vec![
Arc::new(col1) as _,
Arc::new(col2) as _,
Arc::new(col3) as _,
];
let col1 = Int64Array::from(vec![
Some(42), None, None, Some(21142), None, Some(42),
None,
None,
Some(21142),
None,
Some(4211), None, None, Some(24212), ]);
let col2 = StringArray::from(vec![
Some("string1"), None, Some("2string2"), None, None, Some("string1"),
None,
Some("2string2"),
None,
None,
Some("string3"), None, Some("2string4"), None, ]);
let col3 = StringViewArray::from(vec![
Some("stringview1"), Some("stringview2"), None, None, None, Some("stringview1"),
Some("stringview2"),
None,
None,
None,
Some("stringview3"), Some("stringview4"), None, None, ]);
let batch2 = vec![
Arc::new(col1) as _,
Arc::new(col2) as _,
Arc::new(col3) as _,
];
let col1 = Int64Array::from(vec![
Some(42), None, None, Some(31142), None, Some(42),
None,
None,
Some(31142),
None,
Some(4211), None, None, Some(34212), ]);
let col2 = StringArray::from(vec![
Some("string1"), None, Some("3string2"), None, None, Some("string1"),
None,
Some("3string2"),
None,
None,
Some("string3"), None, Some("3string4"), None, ]);
let col3 = StringViewArray::from(vec![
Some("stringview1"), Some("stringview2"), None, None, None, Some("stringview1"),
Some("stringview2"),
None,
None,
None,
Some("stringview3"), Some("stringview4"), None, None, ]);
let batch3 = vec![
Arc::new(col1) as _,
Arc::new(col2) as _,
Arc::new(col3) as _,
];
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, true),
Field::new("b", DataType::Utf8, true),
Field::new("c", DataType::Utf8View, true),
]));
let col1 = Int64Array::from(vec![
Some(42),
None,
None,
Some(1142),
None,
Some(21142),
None,
Some(31142),
None,
Some(4211),
None,
None,
Some(4212),
None,
Some(24212),
None,
Some(34212),
]);
let col2 = StringArray::from(vec![
Some("string1"),
None,
Some("string2"),
None,
Some("2string2"),
None,
Some("3string2"),
None,
None,
Some("string3"),
None,
Some("string4"),
None,
Some("2string4"),
None,
Some("3string4"),
None,
]);
let col3 = StringViewArray::from(vec![
Some("stringview1"),
Some("stringview2"),
None,
None,
None,
None,
None,
None,
None,
Some("stringview3"),
Some("stringview4"),
None,
None,
None,
None,
None,
None,
]);
let expected_batch = vec![
Arc::new(col1) as _,
Arc::new(col2) as _,
Arc::new(col3) as _,
];
let expected_batch = RecordBatch::try_new(schema, expected_batch).unwrap();
Self {
test_batches: vec![batch1, batch2, batch3],
expected_batch,
}
}
fn load_to_group_values(&self, group_values: &mut impl GroupValues) {
for batch in self.test_batches.iter() {
group_values.intern(batch, &mut vec![]).unwrap();
}
}
fn schema(&self) -> SchemaRef {
self.expected_batch.schema()
}
}
fn check_result(actual_batch: &RecordBatch, expected_batch: &RecordBatch) {
let formatted_actual_batch =
pretty_format_batches(std::slice::from_ref(actual_batch))
.unwrap()
.to_string();
let mut formatted_actual_batch_sorted: Vec<&str> =
formatted_actual_batch.trim().lines().collect();
formatted_actual_batch_sorted.sort_unstable();
let formatted_expected_batch =
pretty_format_batches(std::slice::from_ref(expected_batch))
.unwrap()
.to_string();
let mut formatted_expected_batch_sorted: Vec<&str> =
formatted_expected_batch.trim().lines().collect();
formatted_expected_batch_sorted.sort_unstable();
for (i, (actual_line, expected_line)) in formatted_actual_batch_sorted
.iter()
.zip(&formatted_expected_batch_sorted)
.enumerate()
{
assert_eq!(
(i, actual_line),
(i, expected_line),
"Inconsistent result\n\n\
Actual batch:\n{formatted_actual_batch}\n\
Expected batch:\n{formatted_expected_batch}\n\
",
);
}
}
fn insert_inline_group_index_view(
group_values: &mut GroupValuesColumn<false>,
hash_key: u64,
group_index: u64,
) {
let group_index_view = GroupIndexView::new_inlined(group_index);
group_values.map.insert_accounted(
(hash_key, group_index_view),
|(hash, _)| *hash,
&mut group_values.map_size,
);
}
fn insert_non_inline_group_index_view(
group_values: &mut GroupValuesColumn<false>,
hash_key: u64,
group_indices: Vec<usize>,
) {
let list_offset = group_values.group_index_lists.len();
let group_index_view = GroupIndexView::new_non_inlined(list_offset as u64);
group_values.group_index_lists.push(group_indices);
group_values.map.insert_accounted(
(hash_key, group_index_view),
|(hash, _)| *hash,
&mut group_values.map_size,
);
}
}