use std::{
cmp::Ordering,
hash::{Hash, Hasher},
sync::Arc,
};
use crate::{
array::{Array, BinaryArray, BooleanArray, DictionaryArray, PrimitiveArray, Utf8Array},
datatypes::PhysicalType,
error::*,
};
use crate::{compute::sort::SortOptions, datatypes::DataType};
use self::{
dictionary::{compute_dictionary_mapping, encode_dictionary},
interner::OrderPreservingInterner,
};
mod dictionary;
mod fixed;
mod interner;
mod variable;
#[derive(Debug)]
pub struct RowConverter {
fields: Arc<[SortField]>,
interners: Vec<Option<Box<OrderPreservingInterner>>>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SortField {
options: SortOptions,
data_type: DataType,
}
impl SortField {
pub fn new(data_type: DataType) -> Self {
Self::new_with_options(data_type, SortOptions::default())
}
pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self {
Self { options, data_type }
}
}
impl RowConverter {
pub fn new(fields: Vec<SortField>) -> Self {
let interners = vec![None; fields.len()];
Self {
fields: fields.into(),
interners,
}
}
pub fn convert_columns(&mut self, columns: &[Box<dyn Array>]) -> Result<Rows> {
if columns.len() != self.fields.len() {
return Err(Error::InvalidArgumentError(format!(
"Incorrect number of arrays provided to RowConverter, expected {} got {}",
self.fields.len(),
columns.len()
)));
}
let dictionaries = columns
.iter()
.zip(&mut self.interners)
.zip(self.fields.iter())
.map(|((column, interner), field)| {
if column.data_type() != &field.data_type {
return Err(Error::InvalidArgumentError(format!(
"RowConverter column schema mismatch, expected {:?} got {:?}",
field.data_type,
column.data_type()
)));
}
let values = match column.data_type().to_logical_type() {
DataType::Dictionary(k, _, _) => match_integer_type!(k, |$T| {
let column = column
.as_any()
.downcast_ref::<DictionaryArray<$T>>()
.unwrap();
column.values()
}),
_ => return Ok(None),
};
let interner = interner.get_or_insert_with(Default::default);
let mapping = compute_dictionary_mapping(interner, values)?
.into_iter()
.map(|maybe_interned| {
maybe_interned.map(|interned| interner.normalized_key(interned))
})
.collect::<Vec<_>>();
Ok(Some(mapping))
})
.collect::<Result<Vec<_>>>()?;
let mut rows = new_empty_rows(columns, &dictionaries)?;
for ((column, field), dictionary) in
columns.iter().zip(self.fields.iter()).zip(dictionaries)
{
encode_column(&mut rows, column, field.options, dictionary.as_deref())
}
Ok(rows)
}
}
#[derive(Debug)]
pub struct Rows {
buffer: Box<[u8]>,
offsets: Box<[usize]>,
}
impl Rows {
pub fn row(&self, row: usize) -> Row<'_> {
let end = self.offsets[row + 1];
let start = self.offsets[row];
Row {
data: unsafe { self.buffer.get_unchecked(start..end) },
}
}
pub fn row_unchecked(&self, row: usize) -> Row<'_> {
let data = unsafe {
let end = *self.offsets.get_unchecked(row + 1);
let start = *self.offsets.get_unchecked(row);
self.buffer.get_unchecked(start..end)
};
Row { data }
}
#[inline]
pub fn len(&self) -> usize {
self.offsets.len() - 1
}
#[inline]
pub fn iter(&self) -> RowsIter<'_> {
self.into_iter()
}
}
impl<'a> IntoIterator for &'a Rows {
type Item = Row<'a>;
type IntoIter = RowsIter<'a>;
#[inline]
fn into_iter(self) -> Self::IntoIter {
RowsIter {
rows: self,
start: 0,
end: self.len(),
}
}
}
#[derive(Debug)]
pub struct RowsIter<'a> {
rows: &'a Rows,
start: usize,
end: usize,
}
impl<'a> Iterator for RowsIter<'a> {
type Item = Row<'a>;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.start < self.end {
let row = self.rows.row_unchecked(self.start);
self.start += 1;
Some(row)
} else {
None
}
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
let len = self.len();
(len, Some(len))
}
}
impl<'a> ExactSizeIterator for RowsIter<'a> {
#[inline]
fn len(&self) -> usize {
self.end - self.start
}
}
impl<'a> DoubleEndedIterator for RowsIter<'a> {
fn next_back(&mut self) -> Option<Self::Item> {
if self.end == self.start {
return None;
}
let row = self.rows.row(self.end);
self.end -= 1;
Some(row)
}
}
unsafe impl<'a> crate::trusted_len::TrustedLen for RowsIter<'a> {}
#[derive(Debug, Copy, Clone)]
pub struct Row<'a> {
data: &'a [u8],
}
impl<'a> PartialEq for Row<'a> {
#[inline]
fn eq(&self, other: &Self) -> bool {
self.data.eq(other.data)
}
}
impl<'a> Eq for Row<'a> {}
impl<'a> PartialOrd for Row<'a> {
#[inline]
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
self.data.partial_cmp(other.data)
}
}
impl<'a> Ord for Row<'a> {
#[inline]
fn cmp(&self, other: &Self) -> Ordering {
self.data.cmp(other.data)
}
}
impl<'a> Hash for Row<'a> {
#[inline]
fn hash<H: Hasher>(&self, state: &mut H) {
self.data.hash(state)
}
}
impl<'a> AsRef<[u8]> for Row<'a> {
#[inline]
fn as_ref(&self) -> &[u8] {
self.data
}
}
#[inline]
fn null_sentinel(options: SortOptions) -> u8 {
match options.nulls_first {
true => 0,
false => 0xFF,
}
}
#[macro_export]
macro_rules! with_match_primitive_without_interval_type {(
$key_type:expr, | $_:tt $T:ident | $($body:tt)*
) => ({
macro_rules! __with_ty__ {( $_ $T:ident ) => ( $($body)* )}
use $crate::datatypes::PrimitiveType::*;
use $crate::types::{f16, i256};
match $key_type {
Int8 => __with_ty__! { i8 },
Int16 => __with_ty__! { i16 },
Int32 => __with_ty__! { i32 },
Int64 => __with_ty__! { i64 },
Int128 => __with_ty__! { i128 },
Int256 => __with_ty__! { i256 },
UInt8 => __with_ty__! { u8 },
UInt16 => __with_ty__! { u16 },
UInt32 => __with_ty__! { u32 },
UInt64 => __with_ty__! { u64 },
Float16 => __with_ty__! { f16 },
Float32 => __with_ty__! { f32 },
Float64 => __with_ty__! { f64 },
_ => unimplemented!("Unsupported type: {:?}", $key_type),
}
})}
fn new_empty_rows(
cols: &[Box<dyn Array>],
dictionaries: &[Option<Vec<Option<&[u8]>>>],
) -> Result<Rows> {
use fixed::FixedLengthEncoding;
let num_rows = cols.first().map(|x| x.len()).unwrap_or(0);
let mut lengths = vec![0; num_rows];
for (array, dict) in cols.iter().zip(dictionaries) {
match array.data_type().to_physical_type() {
PhysicalType::Primitive(primitive) => {
with_match_primitive_without_interval_type!(primitive, |$T| {
let array = array
.as_any()
.downcast_ref::<PrimitiveArray<$T>>()
.unwrap();
lengths.iter_mut().for_each(|x| *x += fixed::encoded_len(array))
})
}
PhysicalType::Null => {}
PhysicalType::Boolean => lengths.iter_mut().for_each(|x| *x += bool::ENCODED_LEN),
PhysicalType::Binary => array
.as_any()
.downcast_ref::<BinaryArray<i32>>()
.unwrap()
.iter()
.zip(lengths.iter_mut())
.for_each(|(slice, length)| *length += variable::encoded_len(slice)),
PhysicalType::LargeBinary => array
.as_any()
.downcast_ref::<BinaryArray<i64>>()
.unwrap()
.iter()
.zip(lengths.iter_mut())
.for_each(|(slice, length)| *length += variable::encoded_len(slice)),
PhysicalType::Utf8 => array
.as_any()
.downcast_ref::<Utf8Array<i32>>()
.unwrap()
.iter()
.zip(lengths.iter_mut())
.for_each(|(slice, length)| {
*length += variable::encoded_len(slice.map(|x| x.as_bytes()))
}),
PhysicalType::LargeUtf8 => array
.as_any()
.downcast_ref::<Utf8Array<i64>>()
.unwrap()
.iter()
.zip(lengths.iter_mut())
.for_each(|(slice, length)| {
*length += variable::encoded_len(slice.map(|x| x.as_bytes()))
}),
PhysicalType::Dictionary(k) => match_integer_type!(k, |$T| {
let array = array
.as_any()
.downcast_ref::<DictionaryArray<$T>>()
.unwrap();
let dict = dict.as_ref().unwrap();
for (v, length) in array.keys().iter().zip(lengths.iter_mut()) {
match v.and_then(|v| dict[*v as usize]) {
Some(k) => *length += k.len() + 1,
None => *length += 1,
}
}
}),
t => {
return Err(Error::NotYetImplemented(format!(
"not yet implemented: {:?}",
t
)))
}
}
}
let mut offsets = Vec::with_capacity(num_rows + 1);
offsets.push(0);
let mut cur_offset = 0_usize;
for l in lengths {
offsets.push(cur_offset);
cur_offset = cur_offset.checked_add(l).expect("overflow");
}
let buffer = vec![0_u8; cur_offset];
Ok(Rows {
buffer: buffer.into(),
offsets: offsets.into(),
})
}
fn encode_column(
out: &mut Rows,
column: &Box<dyn Array>,
opts: SortOptions,
dictionary: Option<&[Option<&[u8]>]>,
) {
match column.data_type().to_physical_type() {
PhysicalType::Primitive(primitive) => {
with_match_primitive_without_interval_type!(primitive, |$T| {
let column = column
.as_any()
.downcast_ref::<PrimitiveArray<$T>>()
.unwrap()
.iter()
.map(|v| v.map(|v| *v));
fixed::encode(out, column, opts);
})
}
PhysicalType::Null => {}
PhysicalType::Boolean => fixed::encode(
out,
column.as_any().downcast_ref::<BooleanArray>().unwrap(),
opts,
),
PhysicalType::Binary => {
variable::encode(
out,
column
.as_any()
.downcast_ref::<BinaryArray<i32>>()
.unwrap()
.iter(),
opts,
);
}
PhysicalType::LargeBinary => {
variable::encode(
out,
column
.as_any()
.downcast_ref::<BinaryArray<i64>>()
.unwrap()
.iter(),
opts,
);
}
PhysicalType::Utf8 => variable::encode(
out,
column
.as_any()
.downcast_ref::<Utf8Array<i32>>()
.unwrap()
.iter()
.map(|x| x.map(|x| x.as_bytes())),
opts,
),
PhysicalType::LargeUtf8 => variable::encode(
out,
column
.as_any()
.downcast_ref::<Utf8Array<i64>>()
.unwrap()
.iter()
.map(|x| x.map(|x| x.as_bytes())),
opts,
),
PhysicalType::Dictionary(k) => match_integer_type!(k, |$T| {
let column = column
.as_any()
.downcast_ref::<DictionaryArray<$T>>()
.unwrap();
encode_dictionary(out, column, dictionary.unwrap(), opts);
}),
t => unimplemented!("not yet implemented: {:?}", t),
}
}
#[cfg(test)]
mod tests {
use std::fmt::Debug;
use rand::{
distributions::{uniform::SampleUniform, Distribution, Standard},
thread_rng, Rng,
};
use super::*;
use crate::{
array::{Array, DictionaryKey, Float32Array, Int16Array, NullArray},
compute::sort::build_compare,
datatypes::DataType,
offset::Offset,
types::NativeType,
};
#[test]
fn test_fixed_width() {
let cols = [
Int16Array::from([Some(1), Some(2), None, Some(-5), Some(2), Some(2), Some(0)])
.to_boxed(),
Float32Array::from([
Some(1.3),
Some(2.5),
None,
Some(4.),
Some(0.1),
Some(-4.),
Some(-0.),
])
.to_boxed(),
];
let mut converter = RowConverter::new(vec![
SortField::new(DataType::Int16),
SortField::new(DataType::Float32),
]);
let rows = converter.convert_columns(&cols).unwrap();
assert_eq!(rows.offsets.as_ref(), &[0, 8, 16, 24, 32, 40, 48, 56]);
assert_eq!(
rows.buffer.as_ref(),
&[
1, 128, 1, 1, 191, 166, 102, 102, 1, 128, 2, 1, 192, 32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 127, 251, 1, 192, 128, 0, 0, 1, 128, 2, 1, 189, 204, 204, 205, 1, 128, 2, 1, 63, 127, 255, 255, 1, 128, 0, 1, 127, 255, 255, 255 ]
);
assert!(rows.row(3) < rows.row(6));
assert!(rows.row(0) < rows.row(1));
assert!(rows.row(3) < rows.row(0));
assert!(rows.row(4) < rows.row(1));
assert!(rows.row(5) < rows.row(4));
}
#[test]
fn test_null_encoding() {
let col = NullArray::new(DataType::Null, 10).to_boxed();
let mut converter = RowConverter::new(vec![SortField::new(DataType::Null)]);
let rows = converter.convert_columns(&[col]).unwrap();
assert_eq!(rows.len(), 10);
assert_eq!(rows.row(1).data.len(), 0);
}
fn generate_primitive_array<K>(len: usize, valid_percent: f64) -> PrimitiveArray<K>
where
K: NativeType,
Standard: Distribution<K>,
{
let mut rng = thread_rng();
(0..len)
.map(|_| rng.gen_bool(valid_percent).then(|| rng.gen()))
.collect()
}
fn generate_strings<O: Offset>(len: usize, valid_percent: f64) -> Utf8Array<O> {
let mut rng = thread_rng();
(0..len)
.map(|_| {
rng.gen_bool(valid_percent).then(|| {
let len = rng.gen_range(0..100);
let bytes = (0..len).map(|_| rng.gen_range(0..128)).collect();
String::from_utf8(bytes).unwrap()
})
})
.collect()
}
fn generate_dictionary<K>(
values: Box<dyn Array>,
len: usize,
valid_percent: f64,
) -> DictionaryArray<K>
where
K: DictionaryKey + Ord + SampleUniform,
<K as TryFrom<usize>>::Error: Debug,
{
let mut rng = thread_rng();
let min_key = 0_usize.try_into().unwrap();
let max_key = values.len().try_into().unwrap();
let keys: PrimitiveArray<K> = (0..len)
.map(|_| {
rng.gen_bool(valid_percent)
.then(|| rng.gen_range(min_key..max_key))
})
.collect();
DictionaryArray::try_from_keys(keys, values).unwrap()
}
fn generate_column(len: usize) -> Box<dyn Array> {
let mut rng = thread_rng();
match rng.gen_range(0..9) {
0 => Box::new(generate_primitive_array::<i32>(len, 0.8)),
1 => Box::new(generate_primitive_array::<u32>(len, 0.8)),
2 => Box::new(generate_primitive_array::<i64>(len, 0.8)),
3 => Box::new(generate_primitive_array::<u64>(len, 0.8)),
4 => Box::new(generate_primitive_array::<f32>(len, 0.8)),
5 => Box::new(generate_primitive_array::<f64>(len, 0.8)),
6 => Box::new(generate_strings::<i32>(len, 0.8)),
7 => Box::new(generate_dictionary::<i64>(
Box::new(generate_strings::<i32>(rng.gen_range(1..len), 1.0)),
len,
0.8,
)),
8 => Box::new(generate_dictionary::<i64>(
Box::new(generate_primitive_array::<i64>(rng.gen_range(1..len), 1.0)),
len,
0.8,
)),
_ => unreachable!(),
}
}
#[test]
#[cfg_attr(miri, ignore)]
fn fuzz_test() {
for _ in 0..100 {
let mut rng = thread_rng();
let num_columns = rng.gen_range(1..5);
let len = rng.gen_range(5..100);
let arrays: Vec<_> = (0..num_columns).map(|_| generate_column(len)).collect();
let options: Vec<_> = (0..num_columns)
.map(|_| SortOptions {
descending: rng.gen_bool(0.5),
nulls_first: rng.gen_bool(0.5),
})
.collect();
let comparators = arrays
.iter()
.zip(options.iter())
.map(|(a, o)| build_compare(&**a, *o).unwrap())
.collect::<Vec<_>>();
let columns = options
.into_iter()
.zip(&arrays)
.map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o))
.collect();
let mut converter = RowConverter::new(columns);
let rows = converter.convert_columns(&arrays).unwrap();
let cmp = |i, j| {
for cmp in comparators.iter() {
let cmp = cmp(i, j);
if cmp != Ordering::Equal {
return cmp;
}
}
Ordering::Equal
};
for i in 0..len {
for j in 0..len {
let row_i = rows.row(i);
let row_j = rows.row(j);
let row_cmp = row_i.cmp(&row_j);
let lex_cmp = cmp(i, j);
assert_eq!(row_cmp, lex_cmp);
}
}
}
}
}