use crate::array::{
as_boolean_array, as_generic_binary_array, as_largestring_array, as_string_array,
Array, ArrayRef, Decimal128Array, Decimal256Array,
};
use crate::compute::SortOptions;
use crate::datatypes::*;
use crate::error::{ArrowError, Result};
use crate::row::interner::{Interned, OrderPreservingInterner};
use crate::util::decimal::{Decimal128, Decimal256};
use crate::{downcast_dictionary_array, downcast_primitive_array};
mod fixed;
mod interner;
mod variable;
#[derive(Debug)]
pub struct RowConverter {
fields: Vec<SortField>,
interners: Vec<Option<Box<OrderPreservingInterner>>>,
}
#[derive(Debug, Clone)]
pub struct SortField {
options: SortOptions,
data_type: DataType,
}
impl SortField {
pub fn new(data_type: DataType) -> Self {
Self::new_with_options(data_type, Default::default())
}
pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self {
Self { options, data_type }
}
}
impl RowConverter {
pub fn new(fields: Vec<SortField>) -> Self {
let interners = (0..fields.len()).map(|_| None).collect();
Self { fields, interners }
}
pub fn convert_columns(&mut self, columns: &[ArrayRef]) -> Result<Rows> {
if columns.len() != self.fields.len() {
return Err(ArrowError::InvalidArgumentError(format!(
"Incorrect number of arrays provided to RowConverter, expected {} got {}",
self.fields.len(),
columns.len()
)));
}
let dictionaries = columns
.iter()
.zip(&mut self.interners)
.zip(&self.fields)
.map(|((column, interner), field)| {
if !column.data_type().equals_datatype(&field.data_type) {
return Err(ArrowError::InvalidArgumentError(format!(
"RowConverter column schema mismatch, expected {} got {}",
field.data_type,
column.data_type()
)));
}
let values = downcast_dictionary_array! {
column => column.values(),
_ => return Ok(None)
};
let interner = interner.get_or_insert_with(Default::default);
let mapping: Vec<_> = compute_dictionary_mapping(interner, values)?
.into_iter()
.map(|maybe_interned| {
maybe_interned.map(|interned| interner.normalized_key(interned))
})
.collect();
Ok(Some(mapping))
})
.collect::<Result<Vec<_>>>()?;
let mut rows = new_empty_rows(columns, &dictionaries)?;
for ((column, field), dictionary) in
columns.iter().zip(&self.fields).zip(dictionaries)
{
encode_column(&mut rows, column, field.options, dictionary.as_deref())
}
if cfg!(debug_assertions) {
assert_eq!(*rows.offsets.last().unwrap(), rows.buffer.len());
rows.offsets
.windows(2)
.for_each(|w| assert!(w[0] < w[1], "offsets should be monotonic"));
}
Ok(rows)
}
}
#[derive(Debug)]
pub struct Rows {
buffer: Box<[u8]>,
offsets: Box<[usize]>,
}
impl Rows {
pub fn row(&self, row: usize) -> Row<'_> {
let end = self.offsets[row + 1];
let start = self.offsets[row];
Row(&self.buffer[start..end])
}
pub fn num_rows(&self) -> usize {
self.offsets.len() - 1
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Row<'a>(&'a [u8]);
impl<'a> AsRef<[u8]> for Row<'a> {
fn as_ref(&self) -> &[u8] {
self.0
}
}
fn compute_dictionary_mapping(
interner: &mut OrderPreservingInterner,
values: &ArrayRef,
) -> Result<Vec<Option<Interned>>> {
use fixed::FixedLengthEncoding;
Ok(downcast_primitive_array! {
values => interner
.intern(values.iter().map(|x| x.map(|x| x.encode()))),
DataType::Binary => {
let iter = as_generic_binary_array::<i64>(values).iter();
interner.intern(iter)
}
DataType::LargeBinary => {
let iter = as_generic_binary_array::<i64>(values).iter();
interner.intern(iter)
}
DataType::Utf8 => {
let iter = as_string_array(values).iter().map(|x| x.map(|x| x.as_bytes()));
interner.intern(iter)
}
DataType::LargeUtf8 => {
let iter = as_largestring_array(values).iter().map(|x| x.map(|x| x.as_bytes()));
interner.intern(iter)
}
t => return Err(ArrowError::NotYetImplemented(format!("dictionary value {} is not supported", t))),
})
}
fn new_empty_rows(
cols: &[ArrayRef],
dictionaries: &[Option<Vec<Option<&[u8]>>>],
) -> Result<Rows> {
use fixed::FixedLengthEncoding;
let num_rows = cols.first().map(|x| x.len()).unwrap_or(0);
let mut lengths = vec![0; num_rows];
for (array, dict) in cols.iter().zip(dictionaries) {
downcast_primitive_array! {
array => lengths.iter_mut().for_each(|x| *x += fixed::encoded_len(array)),
DataType::Null => lengths.iter_mut().for_each(|x| *x += 1),
DataType::Boolean => lengths.iter_mut().for_each(|x| *x += bool::ENCODED_LEN),
DataType::Decimal128(_, _) => lengths.iter_mut().for_each(|x| *x += Decimal128::ENCODED_LEN),
DataType::Decimal256(_, _) => lengths.iter_mut().for_each(|x| *x += Decimal256::ENCODED_LEN),
DataType::Binary => as_generic_binary_array::<i32>(array)
.iter()
.zip(lengths.iter_mut())
.for_each(|(slice, length)| *length += variable::encoded_len(slice)),
DataType::LargeBinary => as_generic_binary_array::<i64>(array)
.iter()
.zip(lengths.iter_mut())
.for_each(|(slice, length)| *length += variable::encoded_len(slice)),
DataType::Utf8 => as_string_array(array)
.iter()
.zip(lengths.iter_mut())
.for_each(|(slice, length)| {
*length += variable::encoded_len(slice.map(|x| x.as_bytes()))
}),
DataType::LargeUtf8 => as_largestring_array(array)
.iter()
.zip(lengths.iter_mut())
.for_each(|(slice, length)| {
*length += variable::encoded_len(slice.map(|x| x.as_bytes()))
}),
DataType::Dictionary(_, _) => downcast_dictionary_array! {
array => {
let dict = dict.as_ref().unwrap();
for (v, length) in array.keys().iter().zip(lengths.iter_mut()) {
match v.and_then(|v| dict[v as usize]) {
Some(k) => *length += k.len() + 1,
None => *length += 1,
}
}
}
_ => unreachable!(),
}
t => return Err(ArrowError::NotYetImplemented(format!("not yet implemented: {}", t)))
}
}
let mut offsets = Vec::with_capacity(num_rows + 1);
offsets.push(0);
let mut cur_offset = 0_usize;
for l in lengths {
offsets.push(cur_offset);
cur_offset = cur_offset.checked_add(l).expect("overflow");
}
let buffer = vec![0_u8; cur_offset];
Ok(Rows {
buffer: buffer.into(),
offsets: offsets.into(),
})
}
fn encode_column(
out: &mut Rows,
column: &ArrayRef,
opts: SortOptions,
dictionary: Option<&[Option<&[u8]>]>,
) {
downcast_primitive_array! {
column => fixed::encode(out, column, opts),
DataType::Null => {
fixed::encode(out, std::iter::repeat(None::<bool>).take(column.len()), opts)
}
DataType::Boolean => fixed::encode(out, as_boolean_array(column), opts),
DataType::Decimal128(_, _) => fixed::encode(
out,
column.as_any().downcast_ref::<Decimal128Array>().unwrap(),
opts,
),
DataType::Decimal256(_, _) => fixed::encode(
out,
column.as_any().downcast_ref::<Decimal256Array>().unwrap(),
opts,
),
DataType::Binary => {
variable::encode(out, as_generic_binary_array::<i32>(column).iter(), opts)
}
DataType::LargeBinary => {
variable::encode(out, as_generic_binary_array::<i64>(column).iter(), opts)
}
DataType::Utf8 => variable::encode(
out,
as_string_array(column).iter().map(|x| x.map(|x| x.as_bytes())),
opts,
),
DataType::LargeUtf8 => variable::encode(
out,
as_largestring_array(column)
.iter()
.map(|x| x.map(|x| x.as_bytes())),
opts,
),
DataType::Dictionary(_, _) => downcast_dictionary_array! {
column => {
let dict = dictionary.unwrap();
for (offset, k) in out.offsets.iter_mut().skip(1).zip(column.keys()) {
match k.and_then(|k| dict[k as usize]) {
Some(v) => {
let end_offset = *offset + 1 + v.len();
out.buffer[*offset] = 1;
out.buffer[*offset+1..end_offset].copy_from_slice(v);
if opts.descending {
out.buffer[*offset..end_offset].iter_mut().for_each(|v| *v = !*v)
}
*offset = end_offset;
}
None => {
if !opts.nulls_first {
out.buffer[*offset] = 0xFF;
}
*offset += 1;
}
}
}
},
_ => unreachable!()
}
t => unimplemented!("not yet implemented: {}", t)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::array::{
BinaryArray, BooleanArray, DictionaryArray, Float32Array, GenericStringArray,
Int16Array, Int32Array, OffsetSizeTrait, PrimitiveArray,
PrimitiveDictionaryBuilder, StringArray,
};
use crate::compute::{LexicographicalComparator, SortColumn};
use crate::util::display::array_value_to_string;
use rand::distributions::uniform::SampleUniform;
use rand::distributions::{Distribution, Standard};
use rand::{thread_rng, Rng};
use std::sync::Arc;
#[test]
fn test_fixed_width() {
let cols = [
Arc::new(Int16Array::from_iter([
Some(1),
Some(2),
None,
Some(-5),
Some(2),
Some(2),
Some(0),
])) as ArrayRef,
Arc::new(Float32Array::from_iter([
Some(1.3),
Some(2.5),
None,
Some(4.),
Some(0.1),
Some(-4.),
Some(-0.),
])) as ArrayRef,
];
let mut converter = RowConverter::new(vec![
SortField::new(DataType::Int16),
SortField::new(DataType::Float32),
]);
let rows = converter.convert_columns(&cols).unwrap();
assert_eq!(rows.offsets.as_ref(), &[0, 8, 16, 24, 32, 40, 48, 56]);
assert_eq!(
rows.buffer.as_ref(),
&[
1, 128, 1, 1, 191, 166, 102, 102, 1, 128, 2, 1, 192, 32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 127, 251, 1, 192, 128, 0, 0, 1, 128, 2, 1, 189, 204, 204, 205, 1, 128, 2, 1, 63, 127, 255, 255, 1, 128, 0, 1, 127, 255, 255, 255 ]
);
assert!(rows.row(3) < rows.row(6));
assert!(rows.row(0) < rows.row(1));
assert!(rows.row(3) < rows.row(0));
assert!(rows.row(4) < rows.row(1));
assert!(rows.row(5) < rows.row(4))
}
#[test]
fn test_bool() {
let mut converter = RowConverter::new(vec![SortField::new(DataType::Boolean)]);
let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)]));
let rows = converter.convert_columns(&[col]).unwrap();
assert!(rows.row(2) > rows.row(1));
assert!(rows.row(2) > rows.row(0));
assert!(rows.row(1) > rows.row(0));
let mut converter = RowConverter::new(vec![SortField::new_with_options(
DataType::Boolean,
SortOptions {
descending: true,
nulls_first: false,
},
)]);
let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)]));
let rows = converter.convert_columns(&[col]).unwrap();
assert!(rows.row(2) < rows.row(1));
assert!(rows.row(2) < rows.row(0));
assert!(rows.row(1) < rows.row(0));
}
#[test]
fn test_variable_width() {
let col = Arc::new(StringArray::from_iter([
Some("hello"),
Some("he"),
None,
Some("foo"),
Some(""),
]));
let mut converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]);
let rows = converter.convert_columns(&[col]).unwrap();
assert!(rows.row(1) < rows.row(0));
assert!(rows.row(2) < rows.row(4));
assert!(rows.row(3) < rows.row(0));
assert!(rows.row(3) < rows.row(1));
let col = Arc::new(BinaryArray::from_iter([
None,
Some(vec![0_u8; 0]),
Some(vec![0_u8; 6]),
Some(vec![0_u8; variable::BLOCK_SIZE]),
Some(vec![0_u8; variable::BLOCK_SIZE + 1]),
Some(vec![1_u8; 6]),
Some(vec![1_u8; variable::BLOCK_SIZE]),
Some(vec![1_u8; variable::BLOCK_SIZE + 1]),
Some(vec![0xFF_u8; 6]),
Some(vec![0xFF_u8; variable::BLOCK_SIZE]),
Some(vec![0xFF_u8; variable::BLOCK_SIZE + 1]),
])) as ArrayRef;
let mut converter = RowConverter::new(vec![SortField::new(DataType::Binary)]);
let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
for i in 0..rows.num_rows() {
for j in i + 1..rows.num_rows() {
assert!(
rows.row(i) < rows.row(j),
"{} < {} - {:?} < {:?}",
i,
j,
rows.row(i),
rows.row(j)
);
}
}
let mut converter = RowConverter::new(vec![SortField::new_with_options(
DataType::Binary,
SortOptions {
descending: true,
nulls_first: false,
},
)]);
let rows = converter.convert_columns(&[col]).unwrap();
for i in 0..rows.num_rows() {
for j in i + 1..rows.num_rows() {
assert!(
rows.row(i) > rows.row(j),
"{} > {} - {:?} > {:?}",
i,
j,
rows.row(i),
rows.row(j)
);
}
}
}
#[test]
fn test_string_dictionary() {
let a = Arc::new(DictionaryArray::<Int32Type>::from_iter([
Some("foo"),
Some("hello"),
Some("he"),
None,
Some("hello"),
Some(""),
Some("hello"),
Some("hello"),
])) as ArrayRef;
let mut converter =
RowConverter::new(vec![SortField::new(a.data_type().clone())]);
let rows_a = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
assert!(rows_a.row(3) < rows_a.row(5));
assert!(rows_a.row(2) < rows_a.row(1));
assert!(rows_a.row(0) < rows_a.row(1));
assert!(rows_a.row(3) < rows_a.row(0));
assert_eq!(rows_a.row(1), rows_a.row(4));
assert_eq!(rows_a.row(1), rows_a.row(6));
assert_eq!(rows_a.row(1), rows_a.row(7));
let b = Arc::new(DictionaryArray::<Int32Type>::from_iter([
Some("hello"),
None,
Some("cupcakes"),
]));
let rows_b = converter.convert_columns(&[b]).unwrap();
assert_eq!(rows_a.row(1), rows_b.row(0));
assert_eq!(rows_a.row(3), rows_b.row(1));
assert!(rows_b.row(2) < rows_a.row(0));
let mut converter = RowConverter::new(vec![SortField::new_with_options(
a.data_type().clone(),
SortOptions {
descending: true,
nulls_first: false,
},
)]);
let rows_c = converter.convert_columns(&[a]).unwrap();
assert!(rows_c.row(3) > rows_c.row(5));
assert!(rows_c.row(2) > rows_c.row(1));
assert!(rows_c.row(0) > rows_c.row(1));
assert!(rows_c.row(3) > rows_c.row(0));
}
#[test]
fn test_primitive_dictionary() {
let mut builder = PrimitiveDictionaryBuilder::<Int32Type, Int32Type>::new();
builder.append(2).unwrap();
builder.append(3).unwrap();
builder.append(0).unwrap();
builder.append_null();
builder.append(5).unwrap();
builder.append(3).unwrap();
builder.append(-1).unwrap();
let a = builder.finish();
let mut converter =
RowConverter::new(vec![SortField::new(a.data_type().clone())]);
let rows = converter.convert_columns(&[Arc::new(a)]).unwrap();
assert!(rows.row(0) < rows.row(1));
assert!(rows.row(2) < rows.row(0));
assert!(rows.row(3) < rows.row(2));
assert!(rows.row(6) < rows.row(2));
assert!(rows.row(3) < rows.row(6));
}
#[test]
fn test_dictionary_nulls() {
let values =
Int32Array::from_iter([Some(1), Some(-1), None, Some(4), None]).into_data();
let keys =
Int32Array::from_iter([Some(0), Some(0), Some(1), Some(2), Some(4), None])
.into_data();
let data_type =
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32));
let data = keys
.into_builder()
.data_type(data_type.clone())
.child_data(vec![values])
.build()
.unwrap();
let mut converter = RowConverter::new(vec![SortField::new(data_type)]);
let rows = converter
.convert_columns(&[Arc::new(DictionaryArray::<Int32Type>::from(data))])
.unwrap();
assert_eq!(rows.row(0), rows.row(1));
assert_eq!(rows.row(3), rows.row(4));
assert_eq!(rows.row(4), rows.row(5));
assert!(rows.row(3) < rows.row(0));
}
fn generate_primitive_array<K>(len: usize, valid_percent: f64) -> PrimitiveArray<K>
where
K: ArrowPrimitiveType,
Standard: Distribution<K::Native>,
{
let mut rng = thread_rng();
(0..len)
.map(|_| rng.gen_bool(valid_percent).then(|| rng.gen()))
.collect()
}
fn generate_strings<O: OffsetSizeTrait>(
len: usize,
valid_percent: f64,
) -> GenericStringArray<O> {
let mut rng = thread_rng();
(0..len)
.map(|_| {
rng.gen_bool(valid_percent).then(|| {
let len = rng.gen_range(0..100);
let bytes = (0..len).map(|_| rng.gen_range(0..128)).collect();
String::from_utf8(bytes).unwrap()
})
})
.collect()
}
fn generate_dictionary<K>(
values: ArrayRef,
len: usize,
valid_percent: f64,
) -> DictionaryArray<K>
where
K: ArrowDictionaryKeyType,
K::Native: SampleUniform,
{
let mut rng = thread_rng();
let min_key = K::Native::from_usize(0).unwrap();
let max_key = K::Native::from_usize(values.len()).unwrap();
let keys: PrimitiveArray<K> = (0..len)
.map(|_| {
rng.gen_bool(valid_percent)
.then(|| rng.gen_range(min_key..max_key))
})
.collect();
let data_type = DataType::Dictionary(
Box::new(K::DATA_TYPE),
Box::new(values.data_type().clone()),
);
let data = keys
.into_data()
.into_builder()
.data_type(data_type)
.add_child_data(values.data().clone())
.build()
.unwrap();
DictionaryArray::from(data)
}
fn generate_column(len: usize) -> ArrayRef {
let mut rng = thread_rng();
match rng.gen_range(0..9) {
0 => Arc::new(generate_primitive_array::<Int32Type>(len, 0.8)),
1 => Arc::new(generate_primitive_array::<UInt32Type>(len, 0.8)),
2 => Arc::new(generate_primitive_array::<Int64Type>(len, 0.8)),
3 => Arc::new(generate_primitive_array::<UInt64Type>(len, 0.8)),
4 => Arc::new(generate_primitive_array::<Float32Type>(len, 0.8)),
5 => Arc::new(generate_primitive_array::<Float64Type>(len, 0.8)),
6 => Arc::new(generate_strings::<i32>(len, 0.8)),
7 => Arc::new(generate_dictionary::<Int64Type>(
Arc::new(generate_strings::<i32>(rng.gen_range(1..len), 1.0)),
len,
0.8,
)),
8 => Arc::new(generate_dictionary::<Int64Type>(
Arc::new(generate_primitive_array::<Int64Type>(
rng.gen_range(1..len),
1.0,
)),
len,
0.8,
)),
_ => unreachable!(),
}
}
fn print_row(cols: &[SortColumn], row: usize) -> String {
let t: Vec<_> = cols
.iter()
.map(|x| array_value_to_string(&x.values, row).unwrap())
.collect();
t.join(",")
}
fn print_col_types(cols: &[SortColumn]) -> String {
let t: Vec<_> = cols
.iter()
.map(|x| x.values.data_type().to_string())
.collect();
t.join(",")
}
#[test]
#[cfg_attr(miri, ignore)]
fn fuzz_test() {
for _ in 0..100 {
let mut rng = thread_rng();
let num_columns = rng.gen_range(1..5);
let len = rng.gen_range(5..100);
let arrays: Vec<_> = (0..num_columns).map(|_| generate_column(len)).collect();
let options: Vec<_> = (0..num_columns)
.map(|_| SortOptions {
descending: rng.gen_bool(0.5),
nulls_first: rng.gen_bool(0.5),
})
.collect();
let sort_columns: Vec<_> = options
.iter()
.zip(&arrays)
.map(|(o, c)| SortColumn {
values: Arc::clone(c),
options: Some(*o),
})
.collect();
let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap();
let columns = options
.into_iter()
.zip(&arrays)
.map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o))
.collect();
let mut converter = RowConverter::new(columns);
let rows = converter.convert_columns(&arrays).unwrap();
for i in 0..len {
for j in 0..len {
let row_i = rows.row(i);
let row_j = rows.row(j);
let row_cmp = row_i.cmp(&row_j);
let lex_cmp = comparator.compare(&i, &j);
assert_eq!(
row_cmp,
lex_cmp,
"({:?} vs {:?}) vs ({:?} vs {:?}) for types {}",
print_row(&sort_columns, i),
print_row(&sort_columns, j),
row_i,
row_j,
print_col_types(&sort_columns)
);
}
}
}
}
}