#![doc(
html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg",
html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg"
)]
#![cfg_attr(docsrs, feature(doc_cfg))]
#![warn(missing_docs)]
use std::cmp::Ordering;
use std::hash::{Hash, Hasher};
use std::iter::Map;
use std::slice::Windows;
use std::sync::Arc;
use arrow_array::cast::*;
use arrow_array::types::{ArrowDictionaryKeyType, ByteArrayType, ByteViewType};
use arrow_array::*;
use arrow_buffer::{ArrowNativeType, Buffer, OffsetBuffer, ScalarBuffer};
use arrow_data::{ArrayData, ArrayDataBuilder};
use arrow_schema::*;
use variable::{decode_binary_view, decode_string_view};
use crate::fixed::{decode_bool, decode_fixed_size_binary, decode_primitive};
use crate::list::{compute_lengths_fixed_size_list, encode_fixed_size_list};
use crate::variable::{decode_binary, decode_string};
use arrow_array::types::{Int16Type, Int32Type, Int64Type};
mod fixed;
mod list;
mod run;
mod variable;
#[derive(Debug)]
pub struct RowConverter {
fields: Arc<[SortField]>,
codecs: Vec<Codec>,
}
#[derive(Debug)]
enum Codec {
Stateless,
Dictionary(RowConverter, OwnedRow),
Struct(RowConverter, OwnedRow),
List(RowConverter),
RunEndEncoded(RowConverter),
Union(Vec<RowConverter>, Vec<i8>, Vec<OwnedRow>),
}
fn compute_list_view_bounds<O: OffsetSizeTrait>(array: &GenericListViewArray<O>) -> (usize, usize) {
if array.is_empty() {
return (0, 0);
}
let offsets = array.value_offsets();
let sizes = array.value_sizes();
let values_len = array.values().len();
let mut min_offset = usize::MAX;
let mut max_end = 0usize;
for i in 0..array.len() {
let offset = offsets[i].as_usize();
let size = sizes[i].as_usize();
let end = offset + size;
if size > 0 {
min_offset = min_offset.min(offset);
max_end = max_end.max(end);
}
if min_offset == 0 && max_end == values_len {
break;
}
}
if min_offset == usize::MAX {
(0, 0)
} else {
(min_offset, max_end)
}
}
impl Codec {
fn new(sort_field: &SortField) -> Result<Self, ArrowError> {
match &sort_field.data_type {
DataType::Dictionary(_, values) => {
let sort_field =
SortField::new_with_options(values.as_ref().clone(), sort_field.options);
let converter = RowConverter::new(vec![sort_field])?;
let null_array = new_null_array(values.as_ref(), 1);
let nulls = converter.convert_columns(&[null_array])?;
let owned = OwnedRow {
data: nulls.buffer.into(),
config: nulls.config,
};
Ok(Self::Dictionary(converter, owned))
}
DataType::RunEndEncoded(_, values) => {
let options = SortOptions {
descending: false,
nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
};
let field = SortField::new_with_options(values.data_type().clone(), options);
let converter = RowConverter::new(vec![field])?;
Ok(Self::RunEndEncoded(converter))
}
d if !d.is_nested() => Ok(Self::Stateless),
DataType::List(f)
| DataType::LargeList(f)
| DataType::ListView(f)
| DataType::LargeListView(f) => {
let options = SortOptions {
descending: false,
nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
};
let field = SortField::new_with_options(f.data_type().clone(), options);
let converter = RowConverter::new(vec![field])?;
Ok(Self::List(converter))
}
DataType::FixedSizeList(f, _) => {
let field = SortField::new_with_options(f.data_type().clone(), sort_field.options);
let converter = RowConverter::new(vec![field])?;
Ok(Self::List(converter))
}
DataType::Struct(f) => {
let sort_fields = f
.iter()
.map(|x| SortField::new_with_options(x.data_type().clone(), sort_field.options))
.collect();
let converter = RowConverter::new(sort_fields)?;
let nulls: Vec<_> = f.iter().map(|x| new_null_array(x.data_type(), 1)).collect();
let nulls = converter.convert_columns(&nulls)?;
let owned = OwnedRow {
data: nulls.buffer.into(),
config: nulls.config,
};
Ok(Self::Struct(converter, owned))
}
DataType::Union(fields, _mode) => {
let options = SortOptions {
descending: false,
nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
};
let mut converters = Vec::with_capacity(fields.len());
let mut type_ids = Vec::with_capacity(fields.len());
let mut null_rows = Vec::with_capacity(fields.len());
for (type_id, field) in fields.iter() {
let sort_field =
SortField::new_with_options(field.data_type().clone(), options);
let converter = RowConverter::new(vec![sort_field])?;
let null_array = new_null_array(field.data_type(), 1);
let nulls = converter.convert_columns(&[null_array])?;
let owned = OwnedRow {
data: nulls.buffer.into(),
config: nulls.config,
};
converters.push(converter);
type_ids.push(type_id);
null_rows.push(owned);
}
Ok(Self::Union(converters, type_ids, null_rows))
}
_ => Err(ArrowError::NotYetImplemented(format!(
"not yet implemented: {:?}",
sort_field.data_type
))),
}
}
fn encoder(&self, array: &dyn Array) -> Result<Encoder<'_>, ArrowError> {
match self {
Codec::Stateless => Ok(Encoder::Stateless),
Codec::Dictionary(converter, nulls) => {
let values = array.as_any_dictionary().values().clone();
let rows = converter.convert_columns(&[values])?;
Ok(Encoder::Dictionary(rows, nulls.row()))
}
Codec::Struct(converter, null) => {
let v = as_struct_array(array);
let rows = converter.convert_columns(v.columns())?;
Ok(Encoder::Struct(rows, null.row()))
}
Codec::List(converter) => {
let values = match array.data_type() {
DataType::List(_) => {
let list_array = as_list_array(array);
let first_offset = list_array.offsets()[0] as usize;
let last_offset =
list_array.offsets()[list_array.offsets().len() - 1] as usize;
list_array
.values()
.slice(first_offset, last_offset - first_offset)
}
DataType::LargeList(_) => {
let list_array = as_large_list_array(array);
let first_offset = list_array.offsets()[0] as usize;
let last_offset =
list_array.offsets()[list_array.offsets().len() - 1] as usize;
list_array
.values()
.slice(first_offset, last_offset - first_offset)
}
DataType::ListView(_) => {
let list_view_array = array.as_list_view::<i32>();
let (min_offset, max_end) = compute_list_view_bounds(list_view_array);
list_view_array
.values()
.slice(min_offset, max_end - min_offset)
}
DataType::LargeListView(_) => {
let list_view_array = array.as_list_view::<i64>();
let (min_offset, max_end) = compute_list_view_bounds(list_view_array);
list_view_array
.values()
.slice(min_offset, max_end - min_offset)
}
DataType::FixedSizeList(_, _) => {
as_fixed_size_list_array(array).values().clone()
}
_ => unreachable!(),
};
let rows = converter.convert_columns(&[values])?;
Ok(Encoder::List(rows))
}
Codec::RunEndEncoded(converter) => {
let values = match array.data_type() {
DataType::RunEndEncoded(r, _) => match r.data_type() {
DataType::Int16 => array.as_run::<Int16Type>().values_slice(),
DataType::Int32 => array.as_run::<Int32Type>().values_slice(),
DataType::Int64 => array.as_run::<Int64Type>().values_slice(),
_ => unreachable!("Unsupported run end index type: {r:?}"),
},
_ => unreachable!(),
};
let rows = converter.convert_columns(std::slice::from_ref(&values))?;
Ok(Encoder::RunEndEncoded(rows))
}
Codec::Union(converters, field_to_type_ids, _) => {
let union_array = array
.as_any()
.downcast_ref::<UnionArray>()
.expect("expected Union array");
let type_ids = union_array.type_ids().clone();
let offsets = union_array.offsets().cloned();
let mut child_rows = Vec::with_capacity(converters.len());
for (field_idx, converter) in converters.iter().enumerate() {
let type_id = field_to_type_ids[field_idx];
let child_array = union_array.child(type_id);
let rows = converter.convert_columns(std::slice::from_ref(child_array))?;
child_rows.push(rows);
}
Ok(Encoder::Union {
child_rows,
field_to_type_ids: field_to_type_ids.clone(),
type_ids,
offsets,
})
}
}
}
fn size(&self) -> usize {
match self {
Codec::Stateless => 0,
Codec::Dictionary(converter, nulls) => converter.size() + nulls.data.len(),
Codec::Struct(converter, nulls) => converter.size() + nulls.data.len(),
Codec::List(converter) => converter.size(),
Codec::RunEndEncoded(converter) => converter.size(),
Codec::Union(converters, _, null_rows) => {
converters.iter().map(|c| c.size()).sum::<usize>()
+ null_rows.iter().map(|n| n.data.len()).sum::<usize>()
}
}
}
}
#[derive(Debug)]
enum Encoder<'a> {
Stateless,
Dictionary(Rows, Row<'a>),
Struct(Rows, Row<'a>),
List(Rows),
RunEndEncoded(Rows),
Union {
child_rows: Vec<Rows>,
field_to_type_ids: Vec<i8>,
type_ids: ScalarBuffer<i8>,
offsets: Option<ScalarBuffer<i32>>,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SortField {
options: SortOptions,
data_type: DataType,
}
impl SortField {
pub fn new(data_type: DataType) -> Self {
Self::new_with_options(data_type, Default::default())
}
pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self {
Self { options, data_type }
}
pub fn size(&self) -> usize {
self.data_type.size() + std::mem::size_of::<Self>() - std::mem::size_of::<DataType>()
}
}
impl RowConverter {
pub fn new(fields: Vec<SortField>) -> Result<Self, ArrowError> {
if !Self::supports_fields(&fields) {
return Err(ArrowError::NotYetImplemented(format!(
"Row format support not yet implemented for: {fields:?}"
)));
}
let codecs = fields.iter().map(Codec::new).collect::<Result<_, _>>()?;
Ok(Self {
fields: fields.into(),
codecs,
})
}
pub fn supports_fields(fields: &[SortField]) -> bool {
fields.iter().all(|x| Self::supports_datatype(&x.data_type))
}
fn supports_datatype(d: &DataType) -> bool {
match d {
_ if !d.is_nested() => true,
DataType::List(f)
| DataType::LargeList(f)
| DataType::ListView(f)
| DataType::LargeListView(f)
| DataType::FixedSizeList(f, _) => Self::supports_datatype(f.data_type()),
DataType::Struct(f) => f.iter().all(|x| Self::supports_datatype(x.data_type())),
DataType::RunEndEncoded(_, values) => Self::supports_datatype(values.data_type()),
DataType::Union(fs, _mode) => fs
.iter()
.all(|(_, f)| Self::supports_datatype(f.data_type())),
_ => false,
}
}
pub fn convert_columns(&self, columns: &[ArrayRef]) -> Result<Rows, ArrowError> {
let num_rows = columns.first().map(|x| x.len()).unwrap_or(0);
let mut rows = self.empty_rows(num_rows, 0);
self.append(&mut rows, columns)?;
Ok(rows)
}
pub fn append(&self, rows: &mut Rows, columns: &[ArrayRef]) -> Result<(), ArrowError> {
assert!(
Arc::ptr_eq(&rows.config.fields, &self.fields),
"rows were not produced by this RowConverter"
);
if columns.len() != self.fields.len() {
return Err(ArrowError::InvalidArgumentError(format!(
"Incorrect number of arrays provided to RowConverter, expected {} got {}",
self.fields.len(),
columns.len()
)));
}
for colum in columns.iter().skip(1) {
if colum.len() != columns[0].len() {
return Err(ArrowError::InvalidArgumentError(format!(
"RowConverter columns must all have the same length, expected {} got {}",
columns[0].len(),
colum.len()
)));
}
}
let encoders = columns
.iter()
.zip(&self.codecs)
.zip(self.fields.iter())
.map(|((column, codec), field)| {
if !column.data_type().equals_datatype(&field.data_type) {
return Err(ArrowError::InvalidArgumentError(format!(
"RowConverter column schema mismatch, expected {} got {}",
field.data_type,
column.data_type()
)));
}
codec.encoder(column.as_ref())
})
.collect::<Result<Vec<_>, _>>()?;
let write_offset = rows.num_rows();
let lengths = row_lengths(columns, &encoders);
let total = lengths.extend_offsets(rows.offsets[write_offset], &mut rows.offsets);
rows.buffer.resize(total, 0);
for ((column, field), encoder) in columns.iter().zip(self.fields.iter()).zip(encoders) {
encode_column(
&mut rows.buffer,
&mut rows.offsets[write_offset..],
column.as_ref(),
field.options,
&encoder,
)
}
if cfg!(debug_assertions) {
assert_eq!(*rows.offsets.last().unwrap(), rows.buffer.len());
rows.offsets
.windows(2)
.for_each(|w| assert!(w[0] <= w[1], "offsets should be monotonic"));
}
Ok(())
}
pub fn convert_rows<'a, I>(&self, rows: I) -> Result<Vec<ArrayRef>, ArrowError>
where
I: IntoIterator<Item = Row<'a>>,
{
let mut validate_utf8 = false;
let mut rows: Vec<_> = rows
.into_iter()
.map(|row| {
assert!(
Arc::ptr_eq(&row.config.fields, &self.fields),
"rows were not produced by this RowConverter"
);
validate_utf8 |= row.config.validate_utf8;
row.data
})
.collect();
let result = unsafe { self.convert_raw(&mut rows, validate_utf8) }?;
if cfg!(debug_assertions) {
for (i, row) in rows.iter().enumerate() {
if !row.is_empty() {
return Err(ArrowError::InvalidArgumentError(format!(
"Codecs {codecs:?} did not consume all bytes for row {i}, remaining bytes: {row:?}",
codecs = &self.codecs
)));
}
}
}
Ok(result)
}
pub fn empty_rows(&self, row_capacity: usize, data_capacity: usize) -> Rows {
let mut offsets = Vec::with_capacity(row_capacity.saturating_add(1));
offsets.push(0);
Rows {
offsets,
buffer: Vec::with_capacity(data_capacity),
config: RowConfig {
fields: self.fields.clone(),
validate_utf8: false,
},
}
}
pub fn from_binary(&self, array: BinaryArray) -> Rows {
assert_eq!(
array.null_count(),
0,
"can't construct Rows instance from array with nulls"
);
let (offsets, values, _) = array.into_parts();
let offsets = offsets.iter().map(|&i| i.as_usize()).collect();
let buffer = values.into_vec().unwrap_or_else(|values| values.to_vec());
Rows {
buffer,
offsets,
config: RowConfig {
fields: Arc::clone(&self.fields),
validate_utf8: true,
},
}
}
unsafe fn convert_raw(
&self,
rows: &mut [&[u8]],
validate_utf8: bool,
) -> Result<Vec<ArrayRef>, ArrowError> {
self.fields
.iter()
.zip(&self.codecs)
.map(|(field, codec)| unsafe { decode_column(field, rows, codec, validate_utf8) })
.collect()
}
pub fn parser(&self) -> RowParser {
RowParser::new(Arc::clone(&self.fields))
}
pub fn size(&self) -> usize {
std::mem::size_of::<Self>()
+ self.fields.iter().map(|x| x.size()).sum::<usize>()
+ self.codecs.capacity() * std::mem::size_of::<Codec>()
+ self.codecs.iter().map(Codec::size).sum::<usize>()
}
}
#[derive(Debug)]
pub struct RowParser {
config: RowConfig,
}
impl RowParser {
fn new(fields: Arc<[SortField]>) -> Self {
Self {
config: RowConfig {
fields,
validate_utf8: true,
},
}
}
pub fn parse<'a>(&'a self, bytes: &'a [u8]) -> Row<'a> {
Row {
data: bytes,
config: &self.config,
}
}
}
#[derive(Debug, Clone)]
struct RowConfig {
fields: Arc<[SortField]>,
validate_utf8: bool,
}
#[derive(Debug)]
pub struct Rows {
buffer: Vec<u8>,
offsets: Vec<usize>,
config: RowConfig,
}
pub type RowLengthIter<'a> = Map<Windows<'a, usize>, fn(&'a [usize]) -> usize>;
impl Rows {
pub fn push(&mut self, row: Row<'_>) {
assert!(
Arc::ptr_eq(&row.config.fields, &self.config.fields),
"row was not produced by this RowConverter"
);
self.config.validate_utf8 |= row.config.validate_utf8;
self.buffer.extend_from_slice(row.data);
self.offsets.push(self.buffer.len())
}
pub fn reserve(&mut self, row_capacity: usize, data_capacity: usize) {
self.buffer.reserve(data_capacity);
self.offsets.reserve(row_capacity);
}
pub fn row(&self, row: usize) -> Row<'_> {
assert!(row + 1 < self.offsets.len());
unsafe { self.row_unchecked(row) }
}
pub unsafe fn row_unchecked(&self, index: usize) -> Row<'_> {
let end = unsafe { self.offsets.get_unchecked(index + 1) };
let start = unsafe { self.offsets.get_unchecked(index) };
let data = unsafe { self.buffer.get_unchecked(*start..*end) };
Row {
data,
config: &self.config,
}
}
pub fn row_len(&self, row: usize) -> usize {
assert!(row + 1 < self.offsets.len());
self.offsets[row + 1] - self.offsets[row]
}
pub fn lengths(&self) -> RowLengthIter<'_> {
self.offsets.windows(2).map(|w| w[1] - w[0])
}
pub fn clear(&mut self) {
self.offsets.truncate(1);
self.buffer.clear();
}
pub fn num_rows(&self) -> usize {
self.offsets.len() - 1
}
pub fn iter(&self) -> RowsIter<'_> {
self.into_iter()
}
pub fn size(&self) -> usize {
std::mem::size_of::<Self>()
+ self.buffer.capacity()
+ self.offsets.capacity() * std::mem::size_of::<usize>()
}
pub fn try_into_binary(self) -> Result<BinaryArray, ArrowError> {
if self.buffer.len() > i32::MAX as usize {
return Err(ArrowError::InvalidArgumentError(format!(
"{}-byte rows buffer too long to convert into a i32-indexed BinaryArray",
self.buffer.len()
)));
}
let offsets_scalar = ScalarBuffer::from_iter(self.offsets.into_iter().map(i32::usize_as));
let array = unsafe {
BinaryArray::new_unchecked(
OffsetBuffer::new_unchecked(offsets_scalar),
Buffer::from_vec(self.buffer),
None,
)
};
Ok(array)
}
}
impl<'a> IntoIterator for &'a Rows {
type Item = Row<'a>;
type IntoIter = RowsIter<'a>;
fn into_iter(self) -> Self::IntoIter {
RowsIter {
rows: self,
start: 0,
end: self.num_rows(),
}
}
}
#[derive(Debug)]
pub struct RowsIter<'a> {
rows: &'a Rows,
start: usize,
end: usize,
}
impl<'a> Iterator for RowsIter<'a> {
type Item = Row<'a>;
fn next(&mut self) -> Option<Self::Item> {
if self.end == self.start {
return None;
}
let row = unsafe { self.rows.row_unchecked(self.start) };
self.start += 1;
Some(row)
}
fn size_hint(&self) -> (usize, Option<usize>) {
let len = self.len();
(len, Some(len))
}
}
impl ExactSizeIterator for RowsIter<'_> {
fn len(&self) -> usize {
self.end - self.start
}
}
impl DoubleEndedIterator for RowsIter<'_> {
fn next_back(&mut self) -> Option<Self::Item> {
if self.end == self.start {
return None;
}
self.end -= 1;
let row = unsafe { self.rows.row_unchecked(self.end) };
Some(row)
}
}
#[derive(Debug, Copy, Clone)]
pub struct Row<'a> {
data: &'a [u8],
config: &'a RowConfig,
}
impl<'a> Row<'a> {
pub fn owned(&self) -> OwnedRow {
OwnedRow {
data: self.data.into(),
config: self.config.clone(),
}
}
pub fn data(&self) -> &'a [u8] {
self.data
}
}
impl PartialEq for Row<'_> {
#[inline]
fn eq(&self, other: &Self) -> bool {
self.data.eq(other.data)
}
}
impl Eq for Row<'_> {}
impl PartialOrd for Row<'_> {
#[inline]
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for Row<'_> {
#[inline]
fn cmp(&self, other: &Self) -> Ordering {
self.data.cmp(other.data)
}
}
impl Hash for Row<'_> {
#[inline]
fn hash<H: Hasher>(&self, state: &mut H) {
self.data.hash(state)
}
}
impl AsRef<[u8]> for Row<'_> {
#[inline]
fn as_ref(&self) -> &[u8] {
self.data
}
}
#[derive(Debug, Clone)]
pub struct OwnedRow {
data: Box<[u8]>,
config: RowConfig,
}
impl OwnedRow {
pub fn row(&self) -> Row<'_> {
Row {
data: &self.data,
config: &self.config,
}
}
}
impl PartialEq for OwnedRow {
#[inline]
fn eq(&self, other: &Self) -> bool {
self.row().eq(&other.row())
}
}
impl Eq for OwnedRow {}
impl PartialOrd for OwnedRow {
#[inline]
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for OwnedRow {
#[inline]
fn cmp(&self, other: &Self) -> Ordering {
self.row().cmp(&other.row())
}
}
impl Hash for OwnedRow {
#[inline]
fn hash<H: Hasher>(&self, state: &mut H) {
self.row().hash(state)
}
}
impl AsRef<[u8]> for OwnedRow {
#[inline]
fn as_ref(&self) -> &[u8] {
&self.data
}
}
#[inline]
fn null_sentinel(options: SortOptions) -> u8 {
match options.nulls_first {
true => 0,
false => 0xFF,
}
}
enum LengthTracker {
Fixed { length: usize, num_rows: usize },
Variable {
fixed_length: usize,
lengths: Vec<usize>,
},
}
impl LengthTracker {
fn new(num_rows: usize) -> Self {
Self::Fixed {
length: 0,
num_rows,
}
}
fn push_fixed(&mut self, new_length: usize) {
match self {
LengthTracker::Fixed { length, .. } => *length += new_length,
LengthTracker::Variable { fixed_length, .. } => *fixed_length += new_length,
}
}
fn push_variable(&mut self, new_lengths: impl ExactSizeIterator<Item = usize>) {
match self {
LengthTracker::Fixed { length, .. } => {
*self = LengthTracker::Variable {
fixed_length: *length,
lengths: new_lengths.collect(),
}
}
LengthTracker::Variable { lengths, .. } => {
assert_eq!(lengths.len(), new_lengths.len());
lengths
.iter_mut()
.zip(new_lengths)
.for_each(|(length, new_length)| *length += new_length);
}
}
}
fn materialized(&mut self) -> &mut [usize] {
if let LengthTracker::Fixed { length, num_rows } = *self {
*self = LengthTracker::Variable {
fixed_length: length,
lengths: vec![0; num_rows],
};
}
match self {
LengthTracker::Variable { lengths, .. } => lengths,
LengthTracker::Fixed { .. } => unreachable!(),
}
}
fn extend_offsets(&self, initial_offset: usize, offsets: &mut Vec<usize>) -> usize {
match self {
LengthTracker::Fixed { length, num_rows } => {
offsets.extend((0..*num_rows).map(|i| initial_offset + i * length));
initial_offset + num_rows * length
}
LengthTracker::Variable {
fixed_length,
lengths,
} => {
let mut acc = initial_offset;
offsets.extend(lengths.iter().map(|length| {
let current = acc;
acc += length + fixed_length;
current
}));
acc
}
}
}
}
fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> LengthTracker {
use fixed::FixedLengthEncoding;
let num_rows = cols.first().map(|x| x.len()).unwrap_or(0);
let mut tracker = LengthTracker::new(num_rows);
for (array, encoder) in cols.iter().zip(encoders) {
match encoder {
Encoder::Stateless => {
downcast_primitive_array! {
array => tracker.push_fixed(fixed::encoded_len(array)),
DataType::Null => tracker.push_fixed(2)
DataType::Boolean => tracker.push_fixed(bool::ENCODED_LEN),
DataType::Binary => push_generic_byte_array_lengths(&mut tracker, as_generic_binary_array::<i32>(array)),
DataType::LargeBinary => push_generic_byte_array_lengths(&mut tracker, as_generic_binary_array::<i64>(array)),
DataType::BinaryView => push_byte_view_array_lengths(&mut tracker, array.as_binary_view()),
DataType::Utf8 => push_generic_byte_array_lengths(&mut tracker, array.as_string::<i32>()),
DataType::LargeUtf8 => push_generic_byte_array_lengths(&mut tracker, array.as_string::<i64>()),
DataType::Utf8View => push_byte_view_array_lengths(&mut tracker, array.as_string_view()),
DataType::FixedSizeBinary(len) => {
let len = len.to_usize().unwrap();
tracker.push_fixed(1 + len)
}
_ => unimplemented!("unsupported data type: {}", array.data_type()),
}
}
Encoder::Dictionary(values, null) => {
downcast_dictionary_array! {
array => {
tracker.push_variable(
array.keys().iter().map(|v| match v {
Some(k) => values.row_len(k.as_usize()),
None => null.data.len(),
})
)
}
_ => unreachable!(),
}
}
Encoder::Struct(rows, null) => {
let array = as_struct_array(array);
if rows.num_rows() > 0 {
tracker.push_variable((0..array.len()).map(|idx| match array.is_valid(idx) {
true => 1 + rows.row_len(idx),
false => 1 + null.data.len(),
}));
} else {
tracker.push_variable((0..array.len()).map(|idx| match array.is_valid(idx) {
true => 1,
false => 1 + null.data.len(),
}));
}
}
Encoder::List(rows) => match array.data_type() {
DataType::List(_) => {
list::compute_lengths(tracker.materialized(), rows, as_list_array(array))
}
DataType::LargeList(_) => {
list::compute_lengths(tracker.materialized(), rows, as_large_list_array(array))
}
DataType::ListView(_) => {
let list_view = array.as_list_view::<i32>();
let (min_offset, _) = compute_list_view_bounds(list_view);
list::compute_lengths_list_view(
tracker.materialized(),
rows,
list_view,
min_offset,
)
}
DataType::LargeListView(_) => {
let list_view = array.as_list_view::<i64>();
let (min_offset, _) = compute_list_view_bounds(list_view);
list::compute_lengths_list_view(
tracker.materialized(),
rows,
list_view,
min_offset,
)
}
DataType::FixedSizeList(_, _) => compute_lengths_fixed_size_list(
&mut tracker,
rows,
as_fixed_size_list_array(array),
),
_ => unreachable!(),
},
Encoder::RunEndEncoded(rows) => match array.data_type() {
DataType::RunEndEncoded(r, _) => match r.data_type() {
DataType::Int16 => run::compute_lengths(
tracker.materialized(),
rows,
array.as_run::<Int16Type>(),
),
DataType::Int32 => run::compute_lengths(
tracker.materialized(),
rows,
array.as_run::<Int32Type>(),
),
DataType::Int64 => run::compute_lengths(
tracker.materialized(),
rows,
array.as_run::<Int64Type>(),
),
_ => unreachable!("Unsupported run end index type: {r:?}"),
},
_ => unreachable!(),
},
Encoder::Union {
child_rows,
field_to_type_ids,
type_ids,
offsets,
} => {
let union_array = array
.as_any()
.downcast_ref::<UnionArray>()
.expect("expected UnionArray");
let mut type_id_to_field_idx = [0usize; 128];
for (field_idx, &type_id) in field_to_type_ids.iter().enumerate() {
type_id_to_field_idx[type_id as usize] = field_idx;
}
let lengths = (0..union_array.len()).map(|i| {
let type_id = type_ids[i];
let field_idx = type_id_to_field_idx[type_id as usize];
let child_row_i = offsets.as_ref().map(|o| o[i] as usize).unwrap_or(i);
let child_row_len = child_rows[field_idx].row_len(child_row_i);
1 + child_row_len
});
tracker.push_variable(lengths);
}
}
}
tracker
}
fn push_generic_byte_array_lengths<T: ByteArrayType>(
tracker: &mut LengthTracker,
array: &GenericByteArray<T>,
) {
if let Some(nulls) = array.nulls().filter(|n| n.null_count() > 0) {
tracker.push_variable(
array
.offsets()
.lengths()
.zip(nulls.iter())
.map(|(length, is_valid)| if is_valid { Some(length) } else { None })
.map(variable::padded_length),
)
} else {
tracker.push_variable(
array
.offsets()
.lengths()
.map(variable::non_null_padded_length),
)
}
}
fn push_byte_view_array_lengths<T: ByteViewType>(
tracker: &mut LengthTracker,
array: &GenericByteViewArray<T>,
) {
if let Some(nulls) = array.nulls().filter(|n| n.null_count() > 0) {
tracker.push_variable(
array
.lengths()
.zip(nulls.iter())
.map(|(length, is_valid)| {
if is_valid {
Some(length as usize)
} else {
None
}
})
.map(variable::padded_length),
)
} else {
tracker.push_variable(
array
.lengths()
.map(|len| variable::padded_length(Some(len as usize))),
)
}
}
fn encode_column(
data: &mut [u8],
offsets: &mut [usize],
column: &dyn Array,
opts: SortOptions,
encoder: &Encoder<'_>,
) {
match encoder {
Encoder::Stateless => {
downcast_primitive_array! {
column => {
if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
fixed::encode(data, offsets, column.values(), nulls, opts)
} else {
fixed::encode_not_null(data, offsets, column.values(), opts)
}
}
DataType::Null => {
for offset in offsets.iter_mut().skip(1) {
variable::encode_null_value(&mut data[*offset..], opts);
*offset += 2;
}
}
DataType::Boolean => {
if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
fixed::encode_boolean(data, offsets, column.as_boolean().values(), nulls, opts)
} else {
fixed::encode_boolean_not_null(data, offsets, column.as_boolean().values(), opts)
}
}
DataType::Binary => {
variable::encode_generic_byte_array(data, offsets, as_generic_binary_array::<i32>(column), opts)
}
DataType::BinaryView => {
variable::encode(data, offsets, column.as_binary_view().iter(), opts)
}
DataType::LargeBinary => {
variable::encode_generic_byte_array(data, offsets, as_generic_binary_array::<i64>(column), opts)
}
DataType::Utf8 => variable::encode_generic_byte_array(
data, offsets,
column.as_string::<i32>(),
opts,
),
DataType::LargeUtf8 => variable::encode_generic_byte_array(
data, offsets,
column.as_string::<i64>(),
opts,
),
DataType::Utf8View => variable::encode(
data, offsets,
column.as_string_view().iter().map(|x| x.map(|x| x.as_bytes())),
opts,
),
DataType::FixedSizeBinary(_) => {
let array = column.as_any().downcast_ref().unwrap();
fixed::encode_fixed_size_binary(data, offsets, array, opts)
}
_ => unimplemented!("unsupported data type: {}", column.data_type()),
}
}
Encoder::Dictionary(values, nulls) => {
downcast_dictionary_array! {
column => encode_dictionary_values(data, offsets, column, values, nulls),
_ => unreachable!()
}
}
Encoder::Struct(rows, null) => {
fn struct_encode_helper<const NO_CHILD_FIELDS: bool>(
array: &StructArray,
offsets: &mut [usize],
null_sentinel: u8,
rows: &Rows,
null: &Row<'_>,
data: &mut [u8],
) {
let empty_row = Row {
data: &[],
config: &rows.config,
};
offsets
.iter_mut()
.skip(1)
.enumerate()
.for_each(|(idx, offset)| {
let (row, sentinel) = match array.is_valid(idx) {
true => (
if NO_CHILD_FIELDS {
empty_row
} else {
rows.row(idx)
},
0x01,
),
false => (*null, null_sentinel),
};
let end_offset = *offset + 1 + row.as_ref().len();
data[*offset] = sentinel;
data[*offset + 1..end_offset].copy_from_slice(row.as_ref());
*offset = end_offset;
})
}
let array = as_struct_array(column);
let null_sentinel = null_sentinel(opts);
if rows.num_rows() == 0 {
struct_encode_helper::<true>(array, offsets, null_sentinel, rows, null, data);
} else {
struct_encode_helper::<false>(array, offsets, null_sentinel, rows, null, data);
}
}
Encoder::List(rows) => match column.data_type() {
DataType::List(_) => list::encode(data, offsets, rows, opts, as_list_array(column)),
DataType::LargeList(_) => {
list::encode(data, offsets, rows, opts, as_large_list_array(column))
}
DataType::ListView(_) => {
let list_view = column.as_list_view::<i32>();
let (min_offset, _) = compute_list_view_bounds(list_view);
list::encode_list_view(data, offsets, rows, opts, list_view, min_offset)
}
DataType::LargeListView(_) => {
let list_view = column.as_list_view::<i64>();
let (min_offset, _) = compute_list_view_bounds(list_view);
list::encode_list_view(data, offsets, rows, opts, list_view, min_offset)
}
DataType::FixedSizeList(_, _) => {
encode_fixed_size_list(data, offsets, rows, opts, as_fixed_size_list_array(column))
}
_ => unreachable!(),
},
Encoder::RunEndEncoded(rows) => match column.data_type() {
DataType::RunEndEncoded(r, _) => match r.data_type() {
DataType::Int16 => {
run::encode(data, offsets, rows, opts, column.as_run::<Int16Type>())
}
DataType::Int32 => {
run::encode(data, offsets, rows, opts, column.as_run::<Int32Type>())
}
DataType::Int64 => {
run::encode(data, offsets, rows, opts, column.as_run::<Int64Type>())
}
_ => unreachable!("Unsupported run end index type: {r:?}"),
},
_ => unreachable!(),
},
Encoder::Union {
child_rows,
field_to_type_ids,
type_ids,
offsets: offsets_buf,
} => {
let mut type_id_to_field_idx = [0usize; 128];
for (field_idx, &type_id) in field_to_type_ids.iter().enumerate() {
type_id_to_field_idx[type_id as usize] = field_idx;
}
offsets
.iter_mut()
.skip(1)
.enumerate()
.for_each(|(i, offset)| {
let type_id = type_ids[i];
let field_idx = type_id_to_field_idx[type_id as usize];
let child_row_idx = offsets_buf.as_ref().map(|o| o[i] as usize).unwrap_or(i);
let child_row = child_rows[field_idx].row(child_row_idx);
let child_bytes = child_row.as_ref();
let type_id_byte = if opts.descending {
!(type_id as u8)
} else {
type_id as u8
};
data[*offset] = type_id_byte;
let child_start = *offset + 1;
let child_end = child_start + child_bytes.len();
data[child_start..child_end].copy_from_slice(child_bytes);
*offset = child_end;
});
}
}
}
pub fn encode_dictionary_values<K: ArrowDictionaryKeyType>(
data: &mut [u8],
offsets: &mut [usize],
column: &DictionaryArray<K>,
values: &Rows,
null: &Row<'_>,
) {
for (offset, k) in offsets.iter_mut().skip(1).zip(column.keys()) {
let row = match k {
Some(k) => values.row(k.as_usize()).data,
None => null.data,
};
let end_offset = *offset + row.len();
data[*offset..end_offset].copy_from_slice(row);
*offset = end_offset;
}
}
macro_rules! decode_primitive_helper {
($t:ty, $rows:ident, $data_type:ident, $options:ident) => {
Arc::new(decode_primitive::<$t>($rows, $data_type, $options))
};
}
unsafe fn decode_column(
field: &SortField,
rows: &mut [&[u8]],
codec: &Codec,
validate_utf8: bool,
) -> Result<ArrayRef, ArrowError> {
let options = field.options;
let array: ArrayRef = match codec {
Codec::Stateless => {
let data_type = field.data_type.clone();
downcast_primitive! {
data_type => (decode_primitive_helper, rows, data_type, options),
DataType::Null => {
variable::decode_null_value(rows, options);
Arc::new(NullArray::new(rows.len()))
}
DataType::Boolean => Arc::new(decode_bool(rows, options)),
DataType::Binary => Arc::new(decode_binary::<i32>(rows, options)),
DataType::LargeBinary => Arc::new(decode_binary::<i64>(rows, options)),
DataType::BinaryView => Arc::new(decode_binary_view(rows, options)),
DataType::FixedSizeBinary(size) => Arc::new(decode_fixed_size_binary(rows, size, options)),
DataType::Utf8 => Arc::new(unsafe{ decode_string::<i32>(rows, options, validate_utf8) }),
DataType::LargeUtf8 => Arc::new(unsafe { decode_string::<i64>(rows, options, validate_utf8) }),
DataType::Utf8View => Arc::new(unsafe { decode_string_view(rows, options, validate_utf8) }),
_ => return Err(ArrowError::NotYetImplemented(format!("unsupported data type: {data_type}" )))
}
}
Codec::Dictionary(converter, _) => {
let cols = unsafe { converter.convert_raw(rows, validate_utf8) }?;
cols.into_iter().next().unwrap()
}
Codec::Struct(converter, _) => {
let (null_count, nulls) = fixed::decode_nulls(rows);
rows.iter_mut().for_each(|row| *row = &row[1..]);
let children = unsafe { converter.convert_raw(rows, validate_utf8) }?;
let child_data: Vec<ArrayData> = children.iter().map(|c| c.to_data()).collect();
let corrected_fields: Vec<Field> = match &field.data_type {
DataType::Struct(struct_fields) => struct_fields
.iter()
.zip(child_data.iter())
.map(|(orig_field, child_array)| {
orig_field
.as_ref()
.clone()
.with_data_type(child_array.data_type().clone())
})
.collect(),
_ => unreachable!("Only Struct types should be corrected here"),
};
let corrected_struct_type = DataType::Struct(corrected_fields.into());
let builder = ArrayDataBuilder::new(corrected_struct_type)
.len(rows.len())
.null_count(null_count)
.null_bit_buffer(Some(nulls))
.child_data(child_data);
Arc::new(StructArray::from(unsafe { builder.build_unchecked() }))
}
Codec::List(converter) => match &field.data_type {
DataType::List(_) => {
Arc::new(unsafe { list::decode::<i32>(converter, rows, field, validate_utf8) }?)
}
DataType::LargeList(_) => {
Arc::new(unsafe { list::decode::<i64>(converter, rows, field, validate_utf8) }?)
}
DataType::ListView(_) => Arc::new(unsafe {
list::decode_list_view::<i32>(converter, rows, field, validate_utf8)
}?),
DataType::LargeListView(_) => Arc::new(unsafe {
list::decode_list_view::<i64>(converter, rows, field, validate_utf8)
}?),
DataType::FixedSizeList(_, value_length) => Arc::new(unsafe {
list::decode_fixed_size_list(
converter,
rows,
field,
validate_utf8,
value_length.as_usize(),
)
}?),
_ => unreachable!(),
},
Codec::RunEndEncoded(converter) => match &field.data_type {
DataType::RunEndEncoded(run_ends, _) => match run_ends.data_type() {
DataType::Int16 => Arc::new(unsafe {
run::decode::<Int16Type>(converter, rows, field, validate_utf8)
}?),
DataType::Int32 => Arc::new(unsafe {
run::decode::<Int32Type>(converter, rows, field, validate_utf8)
}?),
DataType::Int64 => Arc::new(unsafe {
run::decode::<Int64Type>(converter, rows, field, validate_utf8)
}?),
_ => unreachable!(),
},
_ => unreachable!(),
},
Codec::Union(converters, field_to_type_ids, null_rows) => {
let len = rows.len();
let DataType::Union(union_fields, mode) = &field.data_type else {
unreachable!()
};
let mut type_id_to_field_idx = [0usize; 128];
for (field_idx, &type_id) in field_to_type_ids.iter().enumerate() {
type_id_to_field_idx[type_id as usize] = field_idx;
}
let mut type_ids = Vec::with_capacity(len);
let mut rows_by_field: Vec<Vec<(usize, &[u8])>> = vec![Vec::new(); converters.len()];
for (idx, row) in rows.iter_mut().enumerate() {
let type_id_byte = {
let id = row[0];
if options.descending { !id } else { id }
};
let type_id = type_id_byte as i8;
type_ids.push(type_id);
let field_idx = type_id_to_field_idx[type_id as usize];
let child_row = &row[1..];
rows_by_field[field_idx].push((idx, child_row));
}
let mut child_arrays: Vec<ArrayRef> = Vec::with_capacity(converters.len());
let mut offsets = (*mode == UnionMode::Dense).then(|| Vec::with_capacity(len));
for (field_idx, converter) in converters.iter().enumerate() {
let field_rows = &rows_by_field[field_idx];
match &mode {
UnionMode::Dense => {
if field_rows.is_empty() {
let (_, field) = union_fields.iter().nth(field_idx).unwrap();
child_arrays.push(arrow_array::new_empty_array(field.data_type()));
continue;
}
let mut child_data = field_rows
.iter()
.map(|(_, bytes)| *bytes)
.collect::<Vec<_>>();
let child_array =
unsafe { converter.convert_raw(&mut child_data, validate_utf8) }?;
for ((row_idx, original_bytes), remaining_bytes) in
field_rows.iter().zip(child_data)
{
let consumed_length = 1 + original_bytes.len() - remaining_bytes.len();
rows[*row_idx] = &rows[*row_idx][consumed_length..];
}
child_arrays.push(child_array.into_iter().next().unwrap());
}
UnionMode::Sparse => {
let mut sparse_data: Vec<&[u8]> = Vec::with_capacity(len);
let mut field_row_iter = field_rows.iter().peekable();
let null_row_bytes: &[u8] = &null_rows[field_idx].data;
for idx in 0..len {
if let Some((next_idx, bytes)) = field_row_iter.peek() {
if *next_idx == idx {
sparse_data.push(*bytes);
field_row_iter.next();
continue;
}
}
sparse_data.push(null_row_bytes);
}
let child_array =
unsafe { converter.convert_raw(&mut sparse_data, validate_utf8) }?;
for (row_idx, child_row) in field_rows.iter() {
let remaining_len = sparse_data[*row_idx].len();
let consumed_length = 1 + child_row.len() - remaining_len;
rows[*row_idx] = &rows[*row_idx][consumed_length..];
}
child_arrays.push(child_array.into_iter().next().unwrap());
}
}
}
if let Some(ref mut offsets_vec) = offsets {
let mut count = vec![0i32; converters.len()];
for type_id in &type_ids {
let field_idx = *type_id as usize;
offsets_vec.push(count[field_idx]);
count[field_idx] += 1;
}
}
let type_ids_buffer = ScalarBuffer::from(type_ids);
let offsets_buffer = offsets.map(ScalarBuffer::from);
let union_array = UnionArray::try_new(
union_fields.clone(),
type_ids_buffer,
offsets_buffer,
child_arrays,
)?;
Arc::new(union_array)
}
};
Ok(array)
}
#[cfg(test)]
mod tests {
use arrow_array::builder::*;
use arrow_array::types::*;
use arrow_array::*;
use arrow_buffer::{Buffer, OffsetBuffer};
use arrow_buffer::{NullBuffer, i256};
use arrow_cast::display::{ArrayFormatter, FormatOptions};
use arrow_ord::sort::{LexicographicalComparator, SortColumn};
use rand::distr::uniform::SampleUniform;
use rand::distr::{Distribution, StandardUniform};
use rand::prelude::StdRng;
use rand::{Rng, RngCore, SeedableRng};
use super::*;
#[test]
fn test_fixed_width() {
let cols = [
Arc::new(Int16Array::from_iter([
Some(1),
Some(2),
None,
Some(-5),
Some(2),
Some(2),
Some(0),
])) as ArrayRef,
Arc::new(Float32Array::from_iter([
Some(1.3),
Some(2.5),
None,
Some(4.),
Some(0.1),
Some(-4.),
Some(-0.),
])) as ArrayRef,
];
let converter = RowConverter::new(vec![
SortField::new(DataType::Int16),
SortField::new(DataType::Float32),
])
.unwrap();
let rows = converter.convert_columns(&cols).unwrap();
assert_eq!(rows.offsets, &[0, 8, 16, 24, 32, 40, 48, 56]);
assert_eq!(
rows.buffer,
&[
1, 128, 1, 1, 191, 166, 102, 102, 1, 128, 2, 1, 192, 32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 127, 251, 1, 192, 128, 0, 0, 1, 128, 2, 1, 189, 204, 204, 205, 1, 128, 2, 1, 63, 127, 255, 255, 1, 128, 0, 1, 127, 255, 255, 255 ]
);
assert!(rows.row(3) < rows.row(6));
assert!(rows.row(0) < rows.row(1));
assert!(rows.row(3) < rows.row(0));
assert!(rows.row(4) < rows.row(1));
assert!(rows.row(5) < rows.row(4));
let back = converter.convert_rows(&rows).unwrap();
for (expected, actual) in cols.iter().zip(&back) {
assert_eq!(expected, actual);
}
}
#[test]
fn test_decimal32() {
let converter = RowConverter::new(vec![SortField::new(DataType::Decimal32(
DECIMAL32_MAX_PRECISION,
7,
))])
.unwrap();
let col = Arc::new(
Decimal32Array::from_iter([
None,
Some(i32::MIN),
Some(-13),
Some(46_i32),
Some(5456_i32),
Some(i32::MAX),
])
.with_precision_and_scale(9, 7)
.unwrap(),
) as ArrayRef;
let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
for i in 0..rows.num_rows() - 1 {
assert!(rows.row(i) < rows.row(i + 1));
}
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(back.len(), 1);
assert_eq!(col.as_ref(), back[0].as_ref())
}
#[test]
fn test_decimal64() {
let converter = RowConverter::new(vec![SortField::new(DataType::Decimal64(
DECIMAL64_MAX_PRECISION,
7,
))])
.unwrap();
let col = Arc::new(
Decimal64Array::from_iter([
None,
Some(i64::MIN),
Some(-13),
Some(46_i64),
Some(5456_i64),
Some(i64::MAX),
])
.with_precision_and_scale(18, 7)
.unwrap(),
) as ArrayRef;
let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
for i in 0..rows.num_rows() - 1 {
assert!(rows.row(i) < rows.row(i + 1));
}
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(back.len(), 1);
assert_eq!(col.as_ref(), back[0].as_ref())
}
#[test]
fn test_decimal128() {
let converter = RowConverter::new(vec![SortField::new(DataType::Decimal128(
DECIMAL128_MAX_PRECISION,
7,
))])
.unwrap();
let col = Arc::new(
Decimal128Array::from_iter([
None,
Some(i128::MIN),
Some(-13),
Some(46_i128),
Some(5456_i128),
Some(i128::MAX),
])
.with_precision_and_scale(38, 7)
.unwrap(),
) as ArrayRef;
let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
for i in 0..rows.num_rows() - 1 {
assert!(rows.row(i) < rows.row(i + 1));
}
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(back.len(), 1);
assert_eq!(col.as_ref(), back[0].as_ref())
}
#[test]
fn test_decimal256() {
let converter = RowConverter::new(vec![SortField::new(DataType::Decimal256(
DECIMAL256_MAX_PRECISION,
7,
))])
.unwrap();
let col = Arc::new(
Decimal256Array::from_iter([
None,
Some(i256::MIN),
Some(i256::from_parts(0, -1)),
Some(i256::from_parts(u128::MAX, -1)),
Some(i256::from_parts(u128::MAX, 0)),
Some(i256::from_parts(0, 46_i128)),
Some(i256::from_parts(5, 46_i128)),
Some(i256::MAX),
])
.with_precision_and_scale(DECIMAL256_MAX_PRECISION, 7)
.unwrap(),
) as ArrayRef;
let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
for i in 0..rows.num_rows() - 1 {
assert!(rows.row(i) < rows.row(i + 1));
}
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(back.len(), 1);
assert_eq!(col.as_ref(), back[0].as_ref())
}
#[test]
fn test_bool() {
let converter = RowConverter::new(vec![SortField::new(DataType::Boolean)]).unwrap();
let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)])) as ArrayRef;
let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
assert!(rows.row(2) > rows.row(1));
assert!(rows.row(2) > rows.row(0));
assert!(rows.row(1) > rows.row(0));
let cols = converter.convert_rows(&rows).unwrap();
assert_eq!(&cols[0], &col);
let converter = RowConverter::new(vec![SortField::new_with_options(
DataType::Boolean,
SortOptions::default().desc().with_nulls_first(false),
)])
.unwrap();
let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
assert!(rows.row(2) < rows.row(1));
assert!(rows.row(2) < rows.row(0));
assert!(rows.row(1) < rows.row(0));
let cols = converter.convert_rows(&rows).unwrap();
assert_eq!(&cols[0], &col);
}
#[test]
fn test_timezone() {
let a =
TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]).with_timezone("+01:00".to_string());
let d = a.data_type().clone();
let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(back.len(), 1);
assert_eq!(back[0].data_type(), &d);
let mut a = PrimitiveDictionaryBuilder::<Int32Type, TimestampNanosecondType>::new();
a.append(34).unwrap();
a.append_null();
a.append(345).unwrap();
let dict = a.finish();
let values = TimestampNanosecondArray::from(dict.values().to_data());
let dict_with_tz = dict.with_values(Arc::new(values.with_timezone("+02:00")));
let v = DataType::Timestamp(TimeUnit::Nanosecond, Some("+02:00".into()));
let d = DataType::Dictionary(Box::new(DataType::Int32), Box::new(v.clone()));
assert_eq!(dict_with_tz.data_type(), &d);
let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
let rows = converter
.convert_columns(&[Arc::new(dict_with_tz) as _])
.unwrap();
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(back.len(), 1);
assert_eq!(back[0].data_type(), &v);
}
#[test]
fn test_null_encoding() {
let col = Arc::new(NullArray::new(10));
let converter = RowConverter::new(vec![SortField::new(DataType::Null)]).unwrap();
let rows = converter.convert_columns(&[col]).unwrap();
assert_eq!(rows.num_rows(), 10);
assert_eq!(rows.row(1).data.len(), 2);
}
#[test]
fn test_variable_width() {
let col = Arc::new(StringArray::from_iter([
Some("hello"),
Some("he"),
None,
Some("foo"),
Some(""),
])) as ArrayRef;
let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
assert!(rows.row(1) < rows.row(0));
assert!(rows.row(2) < rows.row(4));
assert!(rows.row(3) < rows.row(0));
assert!(rows.row(3) < rows.row(1));
let cols = converter.convert_rows(&rows).unwrap();
assert_eq!(&cols[0], &col);
let col = Arc::new(BinaryArray::from_iter([
None,
Some(vec![0_u8; 0]),
Some(vec![0_u8; 6]),
Some(vec![0_u8; variable::MINI_BLOCK_SIZE]),
Some(vec![0_u8; variable::MINI_BLOCK_SIZE + 1]),
Some(vec![0_u8; variable::BLOCK_SIZE]),
Some(vec![0_u8; variable::BLOCK_SIZE + 1]),
Some(vec![1_u8; 6]),
Some(vec![1_u8; variable::MINI_BLOCK_SIZE]),
Some(vec![1_u8; variable::MINI_BLOCK_SIZE + 1]),
Some(vec![1_u8; variable::BLOCK_SIZE]),
Some(vec![1_u8; variable::BLOCK_SIZE + 1]),
Some(vec![0xFF_u8; 6]),
Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE]),
Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE + 1]),
Some(vec![0xFF_u8; variable::BLOCK_SIZE]),
Some(vec![0xFF_u8; variable::BLOCK_SIZE + 1]),
])) as ArrayRef;
let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
for i in 0..rows.num_rows() {
for j in i + 1..rows.num_rows() {
assert!(
rows.row(i) < rows.row(j),
"{} < {} - {:?} < {:?}",
i,
j,
rows.row(i),
rows.row(j)
);
}
}
let cols = converter.convert_rows(&rows).unwrap();
assert_eq!(&cols[0], &col);
let converter = RowConverter::new(vec![SortField::new_with_options(
DataType::Binary,
SortOptions::default().desc().with_nulls_first(false),
)])
.unwrap();
let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
for i in 0..rows.num_rows() {
for j in i + 1..rows.num_rows() {
assert!(
rows.row(i) > rows.row(j),
"{} > {} - {:?} > {:?}",
i,
j,
rows.row(i),
rows.row(j)
);
}
}
let cols = converter.convert_rows(&rows).unwrap();
assert_eq!(&cols[0], &col);
}
fn dictionary_eq(a: &dyn Array, b: &dyn Array) {
match b.data_type() {
DataType::Dictionary(_, v) => {
assert_eq!(a.data_type(), v.as_ref());
let b = arrow_cast::cast(b, v).unwrap();
assert_eq!(a, b.as_ref())
}
_ => assert_eq!(a, b),
}
}
#[test]
fn test_string_dictionary() {
let a = Arc::new(DictionaryArray::<Int32Type>::from_iter([
Some("foo"),
Some("hello"),
Some("he"),
None,
Some("hello"),
Some(""),
Some("hello"),
Some("hello"),
])) as ArrayRef;
let field = SortField::new(a.data_type().clone());
let converter = RowConverter::new(vec![field]).unwrap();
let rows_a = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
assert!(rows_a.row(3) < rows_a.row(5));
assert!(rows_a.row(2) < rows_a.row(1));
assert!(rows_a.row(0) < rows_a.row(1));
assert!(rows_a.row(3) < rows_a.row(0));
assert_eq!(rows_a.row(1), rows_a.row(4));
assert_eq!(rows_a.row(1), rows_a.row(6));
assert_eq!(rows_a.row(1), rows_a.row(7));
let cols = converter.convert_rows(&rows_a).unwrap();
dictionary_eq(&cols[0], &a);
let b = Arc::new(DictionaryArray::<Int32Type>::from_iter([
Some("hello"),
None,
Some("cupcakes"),
])) as ArrayRef;
let rows_b = converter.convert_columns(&[Arc::clone(&b)]).unwrap();
assert_eq!(rows_a.row(1), rows_b.row(0));
assert_eq!(rows_a.row(3), rows_b.row(1));
assert!(rows_b.row(2) < rows_a.row(0));
let cols = converter.convert_rows(&rows_b).unwrap();
dictionary_eq(&cols[0], &b);
let converter = RowConverter::new(vec![SortField::new_with_options(
a.data_type().clone(),
SortOptions::default().desc().with_nulls_first(false),
)])
.unwrap();
let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
assert!(rows_c.row(3) > rows_c.row(5));
assert!(rows_c.row(2) > rows_c.row(1));
assert!(rows_c.row(0) > rows_c.row(1));
assert!(rows_c.row(3) > rows_c.row(0));
let cols = converter.convert_rows(&rows_c).unwrap();
dictionary_eq(&cols[0], &a);
let converter = RowConverter::new(vec![SortField::new_with_options(
a.data_type().clone(),
SortOptions::default().desc().with_nulls_first(true),
)])
.unwrap();
let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
assert!(rows_c.row(3) < rows_c.row(5));
assert!(rows_c.row(2) > rows_c.row(1));
assert!(rows_c.row(0) > rows_c.row(1));
assert!(rows_c.row(3) < rows_c.row(0));
let cols = converter.convert_rows(&rows_c).unwrap();
dictionary_eq(&cols[0], &a);
}
#[test]
fn test_struct() {
let a = Arc::new(Int32Array::from(vec![1, 1, 2, 2])) as ArrayRef;
let a_f = Arc::new(Field::new("int", DataType::Int32, false));
let u = Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as ArrayRef;
let u_f = Arc::new(Field::new("s", DataType::Utf8, false));
let s1 = Arc::new(StructArray::from(vec![(a_f, a), (u_f, u)])) as ArrayRef;
let sort_fields = vec![SortField::new(s1.data_type().clone())];
let converter = RowConverter::new(sort_fields).unwrap();
let r1 = converter.convert_columns(&[Arc::clone(&s1)]).unwrap();
for (a, b) in r1.iter().zip(r1.iter().skip(1)) {
assert!(a < b);
}
let back = converter.convert_rows(&r1).unwrap();
assert_eq!(back.len(), 1);
assert_eq!(&back[0], &s1);
let data = s1
.to_data()
.into_builder()
.null_bit_buffer(Some(Buffer::from_slice_ref([0b00001010])))
.null_count(2)
.build()
.unwrap();
let s2 = Arc::new(StructArray::from(data)) as ArrayRef;
let r2 = converter.convert_columns(&[Arc::clone(&s2)]).unwrap();
assert_eq!(r2.row(0), r2.row(2)); assert!(r2.row(0) < r2.row(1)); assert_ne!(r1.row(0), r2.row(0)); assert_eq!(r1.row(1), r2.row(1));
let back = converter.convert_rows(&r2).unwrap();
assert_eq!(back.len(), 1);
assert_eq!(&back[0], &s2);
back[0].to_data().validate_full().unwrap();
}
#[test]
fn test_dictionary_in_struct() {
let builder = StringDictionaryBuilder::<Int32Type>::new();
let mut struct_builder = StructBuilder::new(
vec![Field::new_dictionary(
"foo",
DataType::Int32,
DataType::Utf8,
true,
)],
vec![Box::new(builder)],
);
let dict_builder = struct_builder
.field_builder::<StringDictionaryBuilder<Int32Type>>(0)
.unwrap();
dict_builder.append_value("a");
dict_builder.append_null();
dict_builder.append_value("a");
dict_builder.append_value("b");
for _ in 0..4 {
struct_builder.append(true);
}
let s = Arc::new(struct_builder.finish()) as ArrayRef;
let sort_fields = vec![SortField::new(s.data_type().clone())];
let converter = RowConverter::new(sort_fields).unwrap();
let r = converter.convert_columns(&[Arc::clone(&s)]).unwrap();
let back = converter.convert_rows(&r).unwrap();
let [s2] = back.try_into().unwrap();
assert_ne!(&s.data_type(), &s2.data_type());
s2.to_data().validate_full().unwrap();
let s1_struct = s.as_struct();
let s1_0 = s1_struct.column(0);
let s1_idx_0 = s1_0.as_dictionary::<Int32Type>();
let keys = s1_idx_0.keys();
let values = s1_idx_0.values().as_string::<i32>();
let s2_struct = s2.as_struct();
let s2_0 = s2_struct.column(0);
let s2_idx_0 = s2_0.as_string::<i32>();
for i in 0..keys.len() {
if keys.is_null(i) {
assert!(s2_idx_0.is_null(i));
} else {
let dict_index = keys.value(i) as usize;
assert_eq!(values.value(dict_index), s2_idx_0.value(i));
}
}
}
#[test]
fn test_dictionary_in_struct_empty() {
let ty = DataType::Struct(
vec![Field::new_dictionary(
"foo",
DataType::Int32,
DataType::Int32,
false,
)]
.into(),
);
let s = arrow_array::new_empty_array(&ty);
let sort_fields = vec![SortField::new(s.data_type().clone())];
let converter = RowConverter::new(sort_fields).unwrap();
let r = converter.convert_columns(&[Arc::clone(&s)]).unwrap();
let back = converter.convert_rows(&r).unwrap();
let [s2] = back.try_into().unwrap();
assert_ne!(&s.data_type(), &s2.data_type());
s2.to_data().validate_full().unwrap();
assert_eq!(s.len(), 0);
assert_eq!(s2.len(), 0);
}
#[test]
fn test_list_of_string_dictionary() {
let mut builder = ListBuilder::<StringDictionaryBuilder<Int32Type>>::default();
builder.values().append("a").unwrap();
builder.values().append("b").unwrap();
builder.values().append("zero").unwrap();
builder.values().append_null();
builder.values().append("c").unwrap();
builder.values().append("b").unwrap();
builder.values().append("d").unwrap();
builder.append(true);
builder.append(false);
builder.values().append("e").unwrap();
builder.values().append("zero").unwrap();
builder.values().append("a").unwrap();
builder.append(true);
let a = Arc::new(builder.finish()) as ArrayRef;
let data_type = a.data_type().clone();
let field = SortField::new(data_type.clone());
let converter = RowConverter::new(vec![field]).unwrap();
let rows = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(back.len(), 1);
let [a2] = back.try_into().unwrap();
assert_ne!(&a.data_type(), &a2.data_type());
a2.to_data().validate_full().unwrap();
let a2_list = a2.as_list::<i32>();
let a1_list = a.as_list::<i32>();
let a1_0 = a1_list.value(0);
let a1_idx_0 = a1_0.as_dictionary::<Int32Type>();
let keys = a1_idx_0.keys();
let values = a1_idx_0.values().as_string::<i32>();
let a2_0 = a2_list.value(0);
let a2_idx_0 = a2_0.as_string::<i32>();
for i in 0..keys.len() {
if keys.is_null(i) {
assert!(a2_idx_0.is_null(i));
} else {
let dict_index = keys.value(i) as usize;
assert_eq!(values.value(dict_index), a2_idx_0.value(i));
}
}
assert!(a1_list.is_null(1));
assert!(a2_list.is_null(1));
let a1_2 = a1_list.value(2);
let a1_idx_2 = a1_2.as_dictionary::<Int32Type>();
let keys = a1_idx_2.keys();
let values = a1_idx_2.values().as_string::<i32>();
let a2_2 = a2_list.value(2);
let a2_idx_2 = a2_2.as_string::<i32>();
for i in 0..keys.len() {
if keys.is_null(i) {
assert!(a2_idx_2.is_null(i));
} else {
let dict_index = keys.value(i) as usize;
assert_eq!(values.value(dict_index), a2_idx_2.value(i));
}
}
}
#[test]
fn test_primitive_dictionary() {
let mut builder = PrimitiveDictionaryBuilder::<Int32Type, Int32Type>::new();
builder.append(2).unwrap();
builder.append(3).unwrap();
builder.append(0).unwrap();
builder.append_null();
builder.append(5).unwrap();
builder.append(3).unwrap();
builder.append(-1).unwrap();
let a = builder.finish();
let data_type = a.data_type().clone();
let columns = [Arc::new(a) as ArrayRef];
let field = SortField::new(data_type.clone());
let converter = RowConverter::new(vec![field]).unwrap();
let rows = converter.convert_columns(&columns).unwrap();
assert!(rows.row(0) < rows.row(1));
assert!(rows.row(2) < rows.row(0));
assert!(rows.row(3) < rows.row(2));
assert!(rows.row(6) < rows.row(2));
assert!(rows.row(3) < rows.row(6));
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(back.len(), 1);
back[0].to_data().validate_full().unwrap();
}
#[test]
fn test_dictionary_nulls() {
let values = Int32Array::from_iter([Some(1), Some(-1), None, Some(4), None]).into_data();
let keys =
Int32Array::from_iter([Some(0), Some(0), Some(1), Some(2), Some(4), None]).into_data();
let data_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32));
let data = keys
.into_builder()
.data_type(data_type.clone())
.child_data(vec![values])
.build()
.unwrap();
let columns = [Arc::new(DictionaryArray::<Int32Type>::from(data)) as ArrayRef];
let field = SortField::new(data_type.clone());
let converter = RowConverter::new(vec![field]).unwrap();
let rows = converter.convert_columns(&columns).unwrap();
assert_eq!(rows.row(0), rows.row(1));
assert_eq!(rows.row(3), rows.row(4));
assert_eq!(rows.row(4), rows.row(5));
assert!(rows.row(3) < rows.row(0));
}
#[test]
fn test_from_binary_shared_buffer() {
let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
let rows = converter.convert_columns(&[array]).unwrap();
let binary_rows = rows.try_into_binary().expect("known-small rows");
let _binary_rows_shared_buffer = binary_rows.clone();
let parsed = converter.from_binary(binary_rows);
converter.convert_rows(parsed.iter()).unwrap();
}
#[test]
#[should_panic(expected = "Encountered non UTF-8 data")]
fn test_invalid_utf8() {
let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
let rows = converter.convert_columns(&[array]).unwrap();
let binary_row = rows.row(0);
let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
let parser = converter.parser();
let utf8_row = parser.parse(binary_row.as_ref());
converter.convert_rows(std::iter::once(utf8_row)).unwrap();
}
#[test]
#[should_panic(expected = "Encountered non UTF-8 data")]
fn test_invalid_utf8_array() {
let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
let rows = converter.convert_columns(&[array]).unwrap();
let binary_rows = rows.try_into_binary().expect("known-small rows");
let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
let parsed = converter.from_binary(binary_rows);
converter.convert_rows(parsed.iter()).unwrap();
}
#[test]
#[should_panic(expected = "index out of bounds")]
fn test_invalid_empty() {
let binary_row: &[u8] = &[];
let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
let parser = converter.parser();
let utf8_row = parser.parse(binary_row.as_ref());
converter.convert_rows(std::iter::once(utf8_row)).unwrap();
}
#[test]
#[should_panic(expected = "index out of bounds")]
fn test_invalid_empty_array() {
let row: &[u8] = &[];
let binary_rows = BinaryArray::from(vec![row]);
let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
let parsed = converter.from_binary(binary_rows);
converter.convert_rows(parsed.iter()).unwrap();
}
#[test]
#[should_panic(expected = "index out of bounds")]
fn test_invalid_truncated() {
let binary_row: &[u8] = &[0x02];
let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
let parser = converter.parser();
let utf8_row = parser.parse(binary_row.as_ref());
converter.convert_rows(std::iter::once(utf8_row)).unwrap();
}
#[test]
#[should_panic(expected = "index out of bounds")]
fn test_invalid_truncated_array() {
let row: &[u8] = &[0x02];
let binary_rows = BinaryArray::from(vec![row]);
let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
let parsed = converter.from_binary(binary_rows);
converter.convert_rows(parsed.iter()).unwrap();
}
#[test]
#[should_panic(expected = "rows were not produced by this RowConverter")]
fn test_different_converter() {
let values = Arc::new(Int32Array::from_iter([Some(1), Some(-1)]));
let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
let rows = converter.convert_columns(&[values]).unwrap();
let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
let _ = converter.convert_rows(&rows);
}
fn test_single_list<O: OffsetSizeTrait>() {
let mut builder = GenericListBuilder::<O, _>::new(Int32Builder::new());
builder.values().append_value(32);
builder.values().append_value(52);
builder.values().append_value(32);
builder.append(true);
builder.values().append_value(32);
builder.values().append_value(52);
builder.values().append_value(12);
builder.append(true);
builder.values().append_value(32);
builder.values().append_value(52);
builder.append(true);
builder.values().append_value(32); builder.values().append_value(52); builder.append(false);
builder.values().append_value(32);
builder.values().append_null();
builder.append(true);
builder.append(true);
builder.values().append_value(17); builder.values().append_null(); builder.append(false);
let list = Arc::new(builder.finish()) as ArrayRef;
let d = list.data_type().clone();
let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6));
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(back.len(), 1);
back[0].to_data().validate_full().unwrap();
assert_eq!(&back[0], &list);
let options = SortOptions::default().asc().with_nulls_first(false);
let field = SortField::new_with_options(d.clone(), options);
let converter = RowConverter::new(vec![field]).unwrap();
let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6));
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(back.len(), 1);
back[0].to_data().validate_full().unwrap();
assert_eq!(&back[0], &list);
let options = SortOptions::default().desc().with_nulls_first(false);
let field = SortField::new_with_options(d.clone(), options);
let converter = RowConverter::new(vec![field]).unwrap();
let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6));
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(back.len(), 1);
back[0].to_data().validate_full().unwrap();
assert_eq!(&back[0], &list);
let options = SortOptions::default().desc().with_nulls_first(true);
let field = SortField::new_with_options(d, options);
let converter = RowConverter::new(vec![field]).unwrap();
let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6));
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(back.len(), 1);
back[0].to_data().validate_full().unwrap();
assert_eq!(&back[0], &list);
let sliced_list = list.slice(1, 5);
let rows_on_sliced_list = converter
.convert_columns(&[Arc::clone(&sliced_list)])
.unwrap();
assert!(rows_on_sliced_list.row(1) > rows_on_sliced_list.row(0)); assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(3) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(4) > rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(4));
let back = converter.convert_rows(&rows_on_sliced_list).unwrap();
assert_eq!(back.len(), 1);
back[0].to_data().validate_full().unwrap();
assert_eq!(&back[0], &sliced_list);
}
fn test_nested_list<O: OffsetSizeTrait>() {
let mut builder =
GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(Int32Builder::new()));
builder.values().values().append_value(1);
builder.values().values().append_value(2);
builder.values().append(true);
builder.values().values().append_value(1);
builder.values().values().append_null();
builder.values().append(true);
builder.append(true);
builder.values().values().append_value(1);
builder.values().values().append_null();
builder.values().append(true);
builder.values().values().append_value(1);
builder.values().values().append_null();
builder.values().append(true);
builder.append(true);
builder.values().values().append_value(1);
builder.values().values().append_null();
builder.values().append(true);
builder.values().append(false);
builder.append(true);
builder.append(false);
builder.values().values().append_value(1);
builder.values().values().append_value(2);
builder.values().append(true);
builder.append(true);
let list = Arc::new(builder.finish()) as ArrayRef;
let d = list.data_type().clone();
let options = SortOptions::default().asc().with_nulls_first(true);
let field = SortField::new_with_options(d.clone(), options);
let converter = RowConverter::new(vec![field]).unwrap();
let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
assert!(rows.row(0) > rows.row(1));
assert!(rows.row(1) > rows.row(2));
assert!(rows.row(2) > rows.row(3));
assert!(rows.row(4) < rows.row(0));
assert!(rows.row(4) > rows.row(1));
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(back.len(), 1);
back[0].to_data().validate_full().unwrap();
assert_eq!(&back[0], &list);
let options = SortOptions::default().desc().with_nulls_first(true);
let field = SortField::new_with_options(d.clone(), options);
let converter = RowConverter::new(vec![field]).unwrap();
let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
assert!(rows.row(0) > rows.row(1));
assert!(rows.row(1) > rows.row(2));
assert!(rows.row(2) > rows.row(3));
assert!(rows.row(4) > rows.row(0));
assert!(rows.row(4) > rows.row(1));
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(back.len(), 1);
back[0].to_data().validate_full().unwrap();
assert_eq!(&back[0], &list);
let options = SortOptions::default().desc().with_nulls_first(false);
let field = SortField::new_with_options(d, options);
let converter = RowConverter::new(vec![field]).unwrap();
let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
assert!(rows.row(0) < rows.row(1));
assert!(rows.row(1) < rows.row(2));
assert!(rows.row(2) < rows.row(3));
assert!(rows.row(4) > rows.row(0));
assert!(rows.row(4) < rows.row(1));
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(back.len(), 1);
back[0].to_data().validate_full().unwrap();
assert_eq!(&back[0], &list);
let sliced_list = list.slice(1, 3);
let rows = converter
.convert_columns(&[Arc::clone(&sliced_list)])
.unwrap();
assert!(rows.row(0) < rows.row(1));
assert!(rows.row(1) < rows.row(2));
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(back.len(), 1);
back[0].to_data().validate_full().unwrap();
assert_eq!(&back[0], &sliced_list);
}
#[test]
fn test_list() {
test_single_list::<i32>();
test_nested_list::<i32>();
}
#[test]
fn test_large_list() {
test_single_list::<i64>();
test_nested_list::<i64>();
}
fn test_single_list_view<O: OffsetSizeTrait>() {
let mut builder = GenericListViewBuilder::<O, _>::new(Int32Builder::new());
builder.values().append_value(32);
builder.values().append_value(52);
builder.values().append_value(32);
builder.append(true);
builder.values().append_value(32);
builder.values().append_value(52);
builder.values().append_value(12);
builder.append(true);
builder.values().append_value(32);
builder.values().append_value(52);
builder.append(true);
builder.values().append_value(32); builder.values().append_value(52); builder.append(false);
builder.values().append_value(32);
builder.values().append_null();
builder.append(true);
builder.append(true);
builder.values().append_value(17); builder.values().append_null(); builder.append(false);
let list = Arc::new(builder.finish()) as ArrayRef;
let d = list.data_type().clone();
let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6));
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(back.len(), 1);
back[0].to_data().validate_full().unwrap();
let back_list_view = back[0]
.as_any()
.downcast_ref::<GenericListViewArray<O>>()
.unwrap();
let orig_list_view = list
.as_any()
.downcast_ref::<GenericListViewArray<O>>()
.unwrap();
assert_eq!(back_list_view.len(), orig_list_view.len());
for i in 0..back_list_view.len() {
assert_eq!(back_list_view.is_valid(i), orig_list_view.is_valid(i));
if back_list_view.is_valid(i) {
assert_eq!(&back_list_view.value(i), &orig_list_view.value(i));
}
}
let options = SortOptions::default().asc().with_nulls_first(false);
let field = SortField::new_with_options(d.clone(), options);
let converter = RowConverter::new(vec![field]).unwrap();
let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6));
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(back.len(), 1);
back[0].to_data().validate_full().unwrap();
let options = SortOptions::default().desc().with_nulls_first(false);
let field = SortField::new_with_options(d.clone(), options);
let converter = RowConverter::new(vec![field]).unwrap();
let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6));
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(back.len(), 1);
back[0].to_data().validate_full().unwrap();
let options = SortOptions::default().desc().with_nulls_first(true);
let field = SortField::new_with_options(d, options);
let converter = RowConverter::new(vec![field]).unwrap();
let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6));
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(back.len(), 1);
back[0].to_data().validate_full().unwrap();
let sliced_list = list.slice(1, 5);
let rows_on_sliced_list = converter
.convert_columns(&[Arc::clone(&sliced_list)])
.unwrap();
assert!(rows_on_sliced_list.row(1) > rows_on_sliced_list.row(0)); assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(3) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(4) > rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(4));
let back = converter.convert_rows(&rows_on_sliced_list).unwrap();
assert_eq!(back.len(), 1);
back[0].to_data().validate_full().unwrap();
}
fn test_nested_list_view<O: OffsetSizeTrait>() {
let mut builder = GenericListViewBuilder::<O, _>::new(GenericListViewBuilder::<O, _>::new(
Int32Builder::new(),
));
builder.values().values().append_value(1);
builder.values().values().append_value(2);
builder.values().append(true);
builder.values().values().append_value(1);
builder.values().values().append_null();
builder.values().append(true);
builder.append(true);
builder.values().values().append_value(1);
builder.values().values().append_null();
builder.values().append(true);
builder.values().values().append_value(1);
builder.values().values().append_null();
builder.values().append(true);
builder.append(true);
builder.values().values().append_value(1);
builder.values().values().append_null();
builder.values().append(true);
builder.values().append(false);
builder.append(true);
builder.append(false);
builder.values().values().append_value(1);
builder.values().values().append_value(2);
builder.values().append(true);
builder.append(true);
let list = Arc::new(builder.finish()) as ArrayRef;
let d = list.data_type().clone();
let options = SortOptions::default().asc().with_nulls_first(true);
let field = SortField::new_with_options(d.clone(), options);
let converter = RowConverter::new(vec![field]).unwrap();
let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
assert!(rows.row(0) > rows.row(1));
assert!(rows.row(1) > rows.row(2));
assert!(rows.row(2) > rows.row(3));
assert!(rows.row(4) < rows.row(0));
assert!(rows.row(4) > rows.row(1));
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(back.len(), 1);
back[0].to_data().validate_full().unwrap();
let back_list_view = back[0]
.as_any()
.downcast_ref::<GenericListViewArray<O>>()
.unwrap();
let orig_list_view = list
.as_any()
.downcast_ref::<GenericListViewArray<O>>()
.unwrap();
assert_eq!(back_list_view.len(), orig_list_view.len());
for i in 0..back_list_view.len() {
assert_eq!(back_list_view.is_valid(i), orig_list_view.is_valid(i));
if back_list_view.is_valid(i) {
assert_eq!(&back_list_view.value(i), &orig_list_view.value(i));
}
}
let options = SortOptions::default().desc().with_nulls_first(true);
let field = SortField::new_with_options(d.clone(), options);
let converter = RowConverter::new(vec![field]).unwrap();
let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
assert!(rows.row(0) > rows.row(1));
assert!(rows.row(1) > rows.row(2));
assert!(rows.row(2) > rows.row(3));
assert!(rows.row(4) > rows.row(0));
assert!(rows.row(4) > rows.row(1));
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(back.len(), 1);
back[0].to_data().validate_full().unwrap();
let back_list_view = back[0]
.as_any()
.downcast_ref::<GenericListViewArray<O>>()
.unwrap();
assert_eq!(back_list_view.len(), orig_list_view.len());
for i in 0..back_list_view.len() {
assert_eq!(back_list_view.is_valid(i), orig_list_view.is_valid(i));
if back_list_view.is_valid(i) {
assert_eq!(&back_list_view.value(i), &orig_list_view.value(i));
}
}
let options = SortOptions::default().desc().with_nulls_first(false);
let field = SortField::new_with_options(d.clone(), options);
let converter = RowConverter::new(vec![field]).unwrap();
let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
assert!(rows.row(0) < rows.row(1));
assert!(rows.row(1) < rows.row(2));
assert!(rows.row(2) < rows.row(3));
assert!(rows.row(4) > rows.row(0));
assert!(rows.row(4) < rows.row(1));
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(back.len(), 1);
back[0].to_data().validate_full().unwrap();
let back_list_view = back[0]
.as_any()
.downcast_ref::<GenericListViewArray<O>>()
.unwrap();
assert_eq!(back_list_view.len(), orig_list_view.len());
for i in 0..back_list_view.len() {
assert_eq!(back_list_view.is_valid(i), orig_list_view.is_valid(i));
if back_list_view.is_valid(i) {
assert_eq!(&back_list_view.value(i), &orig_list_view.value(i));
}
}
let sliced_list = list.slice(1, 3);
let rows = converter
.convert_columns(&[Arc::clone(&sliced_list)])
.unwrap();
assert!(rows.row(0) < rows.row(1));
assert!(rows.row(1) < rows.row(2));
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(back.len(), 1);
back[0].to_data().validate_full().unwrap();
}
#[test]
fn test_list_view() {
test_single_list_view::<i32>();
test_nested_list_view::<i32>();
}
#[test]
fn test_large_list_view() {
test_single_list_view::<i64>();
test_nested_list_view::<i64>();
}
fn test_list_view_with_shared_values<O: OffsetSizeTrait>() {
let values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8]);
let field = Arc::new(Field::new_list_field(DataType::Int32, true));
let offsets = ScalarBuffer::<O>::from(vec![
O::from_usize(0).unwrap(),
O::from_usize(0).unwrap(),
O::from_usize(5).unwrap(),
O::from_usize(2).unwrap(),
O::from_usize(1).unwrap(),
O::from_usize(2).unwrap(),
]);
let sizes = ScalarBuffer::<O>::from(vec![
O::from_usize(3).unwrap(),
O::from_usize(3).unwrap(),
O::from_usize(2).unwrap(),
O::from_usize(2).unwrap(),
O::from_usize(4).unwrap(),
O::from_usize(1).unwrap(),
]);
let list_view: GenericListViewArray<O> =
GenericListViewArray::try_new(field, offsets, sizes, Arc::new(values), None).unwrap();
let d = list_view.data_type().clone();
let list = Arc::new(list_view) as ArrayRef;
let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
assert_eq!(rows.row(0), rows.row(1));
assert!(rows.row(0) < rows.row(2));
assert!(rows.row(3) > rows.row(0));
assert!(rows.row(4) > rows.row(0));
assert!(rows.row(5) < rows.row(3));
assert!(rows.row(5) > rows.row(4));
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(back.len(), 1);
back[0].to_data().validate_full().unwrap();
let back_list_view = back[0]
.as_any()
.downcast_ref::<GenericListViewArray<O>>()
.unwrap();
let orig_list_view = list
.as_any()
.downcast_ref::<GenericListViewArray<O>>()
.unwrap();
assert_eq!(back_list_view.len(), orig_list_view.len());
for i in 0..back_list_view.len() {
assert_eq!(back_list_view.is_valid(i), orig_list_view.is_valid(i));
if back_list_view.is_valid(i) {
assert_eq!(&back_list_view.value(i), &orig_list_view.value(i));
}
}
let options = SortOptions::default().desc();
let field = SortField::new_with_options(d, options);
let converter = RowConverter::new(vec![field]).unwrap();
let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
assert_eq!(rows.row(0), rows.row(1)); assert!(rows.row(0) > rows.row(2)); assert!(rows.row(3) < rows.row(0));
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(back.len(), 1);
back[0].to_data().validate_full().unwrap();
}
#[test]
fn test_list_view_shared_values() {
test_list_view_with_shared_values::<i32>();
}
#[test]
fn test_large_list_view_shared_values() {
test_list_view_with_shared_values::<i64>();
}
#[test]
fn test_fixed_size_list() {
let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 3);
builder.values().append_value(32);
builder.values().append_value(52);
builder.values().append_value(32);
builder.append(true);
builder.values().append_value(32);
builder.values().append_value(52);
builder.values().append_value(12);
builder.append(true);
builder.values().append_value(32);
builder.values().append_value(52);
builder.values().append_null();
builder.append(true);
builder.values().append_value(32); builder.values().append_value(52); builder.values().append_value(13); builder.append(false);
builder.values().append_value(32);
builder.values().append_null();
builder.values().append_null();
builder.append(true);
builder.values().append_null();
builder.values().append_null();
builder.values().append_null();
builder.append(true);
builder.values().append_value(17); builder.values().append_null(); builder.values().append_value(77); builder.append(false);
let list = Arc::new(builder.finish()) as ArrayRef;
let d = list.data_type().clone();
let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6));
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(back.len(), 1);
back[0].to_data().validate_full().unwrap();
assert_eq!(&back[0], &list);
let options = SortOptions::default().asc().with_nulls_first(false);
let field = SortField::new_with_options(d.clone(), options);
let converter = RowConverter::new(vec![field]).unwrap();
let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6));
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(back.len(), 1);
back[0].to_data().validate_full().unwrap();
assert_eq!(&back[0], &list);
let options = SortOptions::default().desc().with_nulls_first(false);
let field = SortField::new_with_options(d.clone(), options);
let converter = RowConverter::new(vec![field]).unwrap();
let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6));
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(back.len(), 1);
back[0].to_data().validate_full().unwrap();
assert_eq!(&back[0], &list);
let options = SortOptions::default().desc().with_nulls_first(true);
let field = SortField::new_with_options(d, options);
let converter = RowConverter::new(vec![field]).unwrap();
let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6));
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(back.len(), 1);
back[0].to_data().validate_full().unwrap();
assert_eq!(&back[0], &list);
let sliced_list = list.slice(1, 5);
let rows_on_sliced_list = converter
.convert_columns(&[Arc::clone(&sliced_list)])
.unwrap();
assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(3) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(4) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(4));
let back = converter.convert_rows(&rows_on_sliced_list).unwrap();
assert_eq!(back.len(), 1);
back[0].to_data().validate_full().unwrap();
assert_eq!(&back[0], &sliced_list);
}
#[test]
fn test_two_fixed_size_lists() {
let mut first = FixedSizeListBuilder::new(UInt8Builder::new(), 1);
first.values().append_value(100);
first.append(true);
first.values().append_value(101);
first.append(true);
first.values().append_value(102);
first.append(true);
first.values().append_null();
first.append(true);
first.values().append_null(); first.append(false);
let first = Arc::new(first.finish()) as ArrayRef;
let first_type = first.data_type().clone();
let mut second = FixedSizeListBuilder::new(UInt8Builder::new(), 1);
second.values().append_value(200);
second.append(true);
second.values().append_value(201);
second.append(true);
second.values().append_value(202);
second.append(true);
second.values().append_null();
second.append(true);
second.values().append_null(); second.append(false);
let second = Arc::new(second.finish()) as ArrayRef;
let second_type = second.data_type().clone();
let converter = RowConverter::new(vec![
SortField::new(first_type.clone()),
SortField::new(second_type.clone()),
])
.unwrap();
let rows = converter
.convert_columns(&[Arc::clone(&first), Arc::clone(&second)])
.unwrap();
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(back.len(), 2);
back[0].to_data().validate_full().unwrap();
assert_eq!(&back[0], &first);
back[1].to_data().validate_full().unwrap();
assert_eq!(&back[1], &second);
}
#[test]
fn test_fixed_size_list_with_variable_width_content() {
let mut first = FixedSizeListBuilder::new(
StructBuilder::from_fields(
vec![
Field::new(
"timestamp",
DataType::Timestamp(TimeUnit::Microsecond, Some(Arc::from("UTC"))),
false,
),
Field::new("offset_minutes", DataType::Int16, false),
Field::new("time_zone", DataType::Utf8, false),
],
1,
),
1,
);
first
.values()
.field_builder::<TimestampMicrosecondBuilder>(0)
.unwrap()
.append_null();
first
.values()
.field_builder::<Int16Builder>(1)
.unwrap()
.append_null();
first
.values()
.field_builder::<StringBuilder>(2)
.unwrap()
.append_null();
first.values().append(false);
first.append(false);
first
.values()
.field_builder::<TimestampMicrosecondBuilder>(0)
.unwrap()
.append_null();
first
.values()
.field_builder::<Int16Builder>(1)
.unwrap()
.append_null();
first
.values()
.field_builder::<StringBuilder>(2)
.unwrap()
.append_null();
first.values().append(false);
first.append(true);
first
.values()
.field_builder::<TimestampMicrosecondBuilder>(0)
.unwrap()
.append_value(0);
first
.values()
.field_builder::<Int16Builder>(1)
.unwrap()
.append_value(0);
first
.values()
.field_builder::<StringBuilder>(2)
.unwrap()
.append_value("UTC");
first.values().append(true);
first.append(true);
first
.values()
.field_builder::<TimestampMicrosecondBuilder>(0)
.unwrap()
.append_value(1126351800123456);
first
.values()
.field_builder::<Int16Builder>(1)
.unwrap()
.append_value(120);
first
.values()
.field_builder::<StringBuilder>(2)
.unwrap()
.append_value("Europe/Warsaw");
first.values().append(true);
first.append(true);
let first = Arc::new(first.finish()) as ArrayRef;
let first_type = first.data_type().clone();
let mut second = StringBuilder::new();
second.append_value("somewhere near");
second.append_null();
second.append_value("Greenwich");
second.append_value("Warsaw");
let second = Arc::new(second.finish()) as ArrayRef;
let second_type = second.data_type().clone();
let converter = RowConverter::new(vec![
SortField::new(first_type.clone()),
SortField::new(second_type.clone()),
])
.unwrap();
let rows = converter
.convert_columns(&[Arc::clone(&first), Arc::clone(&second)])
.unwrap();
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(back.len(), 2);
back[0].to_data().validate_full().unwrap();
assert_eq!(&back[0], &first);
back[1].to_data().validate_full().unwrap();
assert_eq!(&back[1], &second);
}
fn generate_primitive_array<K>(
rng: &mut impl RngCore,
len: usize,
valid_percent: f64,
) -> PrimitiveArray<K>
where
K: ArrowPrimitiveType,
StandardUniform: Distribution<K::Native>,
{
(0..len)
.map(|_| rng.random_bool(valid_percent).then(|| rng.random()))
.collect()
}
fn generate_boolean_array(
rng: &mut impl RngCore,
len: usize,
valid_percent: f64,
) -> BooleanArray {
(0..len)
.map(|_| rng.random_bool(valid_percent).then(|| rng.random_bool(0.5)))
.collect()
}
fn generate_strings<O: OffsetSizeTrait>(
rng: &mut impl RngCore,
len: usize,
valid_percent: f64,
) -> GenericStringArray<O> {
(0..len)
.map(|_| {
rng.random_bool(valid_percent).then(|| {
let len = rng.random_range(0..100);
let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
String::from_utf8(bytes).unwrap()
})
})
.collect()
}
fn generate_string_view(
rng: &mut impl RngCore,
len: usize,
valid_percent: f64,
) -> StringViewArray {
(0..len)
.map(|_| {
rng.random_bool(valid_percent).then(|| {
let len = rng.random_range(0..100);
let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
String::from_utf8(bytes).unwrap()
})
})
.collect()
}
fn generate_byte_view(
rng: &mut impl RngCore,
len: usize,
valid_percent: f64,
) -> BinaryViewArray {
(0..len)
.map(|_| {
rng.random_bool(valid_percent).then(|| {
let len = rng.random_range(0..100);
let bytes: Vec<_> = (0..len).map(|_| rng.random_range(0..128)).collect();
bytes
})
})
.collect()
}
fn generate_fixed_stringview_column(len: usize) -> StringViewArray {
let edge_cases = vec![
Some("bar".to_string()),
Some("bar\0".to_string()),
Some("LongerThan12Bytes".to_string()),
Some("LongerThan12Bytez".to_string()),
Some("LongerThan12Bytes\0".to_string()),
Some("LongerThan12Byt".to_string()),
Some("backend one".to_string()),
Some("backend two".to_string()),
Some("a".repeat(257)),
Some("a".repeat(300)),
];
let mut values = Vec::with_capacity(len);
for i in 0..len {
values.push(
edge_cases
.get(i % edge_cases.len())
.cloned()
.unwrap_or(None),
);
}
StringViewArray::from(values)
}
fn generate_dictionary<K>(
rng: &mut impl RngCore,
values: ArrayRef,
len: usize,
valid_percent: f64,
) -> DictionaryArray<K>
where
K: ArrowDictionaryKeyType,
K::Native: SampleUniform,
{
let min_key = K::Native::from_usize(0).unwrap();
let max_key = K::Native::from_usize(values.len()).unwrap();
let keys: PrimitiveArray<K> = (0..len)
.map(|_| {
rng.random_bool(valid_percent)
.then(|| rng.random_range(min_key..max_key))
})
.collect();
let data_type =
DataType::Dictionary(Box::new(K::DATA_TYPE), Box::new(values.data_type().clone()));
let data = keys
.into_data()
.into_builder()
.data_type(data_type)
.add_child_data(values.to_data())
.build()
.unwrap();
DictionaryArray::from(data)
}
fn generate_fixed_size_binary(
rng: &mut impl RngCore,
len: usize,
valid_percent: f64,
) -> FixedSizeBinaryArray {
let width = rng.random_range(0..20);
let mut builder = FixedSizeBinaryBuilder::new(width);
let mut b = vec![0; width as usize];
for _ in 0..len {
match rng.random_bool(valid_percent) {
true => {
b.iter_mut().for_each(|x| *x = rng.random());
builder.append_value(&b).unwrap();
}
false => builder.append_null(),
}
}
builder.finish()
}
fn generate_struct(rng: &mut impl RngCore, len: usize, valid_percent: f64) -> StructArray {
let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
let a = generate_primitive_array::<Int32Type>(rng, len, valid_percent);
let b = generate_strings::<i32>(rng, len, valid_percent);
let fields = Fields::from(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Utf8, true),
]);
let values = vec![Arc::new(a) as _, Arc::new(b) as _];
StructArray::new(fields, values, Some(nulls))
}
fn generate_list<R: RngCore, F>(
rng: &mut R,
len: usize,
valid_percent: f64,
values: F,
) -> ListArray
where
F: FnOnce(&mut R, usize) -> ArrayRef,
{
let offsets = OffsetBuffer::<i32>::from_lengths((0..len).map(|_| rng.random_range(0..10)));
let values_len = offsets.last().unwrap().to_usize().unwrap();
let values = values(rng, values_len);
let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
let field = Arc::new(Field::new_list_field(values.data_type().clone(), true));
ListArray::new(field, offsets, values, Some(nulls))
}
fn generate_list_view<F>(
rng: &mut impl RngCore,
len: usize,
valid_percent: f64,
values: F,
) -> ListViewArray
where
F: FnOnce(usize) -> ArrayRef,
{
let sizes: Vec<i32> = (0..len).map(|_| rng.random_range(0..10)).collect();
let values_len: usize = sizes.iter().map(|s| *s as usize).sum::<usize>().max(1);
let values = values(values_len);
let offsets: Vec<i32> = sizes
.iter()
.map(|&size| {
if size == 0 {
0
} else {
rng.random_range(0..=(values_len as i32 - size))
}
})
.collect();
let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
let field = Arc::new(Field::new_list_field(values.data_type().clone(), true));
ListViewArray::new(
field,
ScalarBuffer::from(offsets),
ScalarBuffer::from(sizes),
values,
Some(nulls),
)
}
fn generate_nulls(rng: &mut impl RngCore, len: usize) -> Option<NullBuffer> {
Some(NullBuffer::from_iter(
(0..len).map(|_| rng.random_bool(0.8)),
))
}
fn change_underlying_null_values_for_primitive<T: ArrowPrimitiveType>(
array: &PrimitiveArray<T>,
) -> PrimitiveArray<T> {
let (dt, values, nulls) = array.clone().into_parts();
let new_values = ScalarBuffer::<T::Native>::from_iter(
values
.iter()
.zip(nulls.as_ref().unwrap().iter())
.map(|(val, is_valid)| {
if is_valid {
*val
} else {
val.add_wrapping(T::Native::usize_as(1))
}
}),
);
PrimitiveArray::new(new_values, nulls).with_data_type(dt)
}
fn change_underline_null_values_for_byte_array<T: ByteArrayType>(
array: &GenericByteArray<T>,
) -> GenericByteArray<T> {
let (offsets, values, nulls) = array.clone().into_parts();
let new_offsets = OffsetBuffer::<T::Offset>::from_lengths(
offsets
.lengths()
.zip(nulls.as_ref().unwrap().iter())
.map(|(len, is_valid)| if is_valid { len } else { len + 1 }),
);
let mut new_bytes = Vec::<u8>::with_capacity(new_offsets[new_offsets.len() - 1].as_usize());
offsets
.windows(2)
.zip(nulls.as_ref().unwrap().iter())
.for_each(|(start_and_end, is_valid)| {
let start = start_and_end[0].as_usize();
let end = start_and_end[1].as_usize();
new_bytes.extend_from_slice(&values.as_slice()[start..end]);
if !is_valid {
new_bytes.push(b'c');
}
});
GenericByteArray::<T>::new(new_offsets, Buffer::from_vec(new_bytes), nulls)
}
fn change_underline_null_values_for_list_array<O: OffsetSizeTrait>(
array: &GenericListArray<O>,
) -> GenericListArray<O> {
let (field, offsets, values, nulls) = array.clone().into_parts();
let (new_values, new_offsets) = {
let concat_values = offsets
.windows(2)
.zip(nulls.as_ref().unwrap().iter())
.map(|(start_and_end, is_valid)| {
let start = start_and_end[0].as_usize();
let end = start_and_end[1].as_usize();
if is_valid {
return (start, end - start);
}
if end == values.len() {
(start, (end - start).saturating_sub(1))
} else {
(start, end - start + 1)
}
})
.map(|(start, length)| values.slice(start, length))
.collect::<Vec<_>>();
let new_offsets =
OffsetBuffer::<O>::from_lengths(concat_values.iter().map(|s| s.len()));
let new_values = {
let values = concat_values.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
arrow_select::concat::concat(&values).expect("should be able to concat")
};
(new_values, new_offsets)
};
GenericListArray::<O>::new(field, new_offsets, new_values, nulls)
}
fn change_underline_null_values(array: &ArrayRef) -> ArrayRef {
if array.null_count() == 0 {
return Arc::clone(array);
}
downcast_primitive_array!(
array => {
let output = change_underlying_null_values_for_primitive(array);
Arc::new(output)
}
DataType::Utf8 => {
Arc::new(change_underline_null_values_for_byte_array(array.as_string::<i32>()))
}
DataType::LargeUtf8 => {
Arc::new(change_underline_null_values_for_byte_array(array.as_string::<i64>()))
}
DataType::Binary => {
Arc::new(change_underline_null_values_for_byte_array(array.as_binary::<i32>()))
}
DataType::LargeBinary => {
Arc::new(change_underline_null_values_for_byte_array(array.as_binary::<i64>()))
}
DataType::List(_) => {
Arc::new(change_underline_null_values_for_list_array(array.as_list::<i32>()))
}
DataType::LargeList(_) => {
Arc::new(change_underline_null_values_for_list_array(array.as_list::<i64>()))
}
_ => {
Arc::clone(array)
}
)
}
fn generate_column(rng: &mut (impl RngCore + Clone), len: usize) -> ArrayRef {
match rng.random_range(0..23) {
0 => Arc::new(generate_primitive_array::<Int32Type>(rng, len, 0.8)),
1 => Arc::new(generate_primitive_array::<UInt32Type>(rng, len, 0.8)),
2 => Arc::new(generate_primitive_array::<Int64Type>(rng, len, 0.8)),
3 => Arc::new(generate_primitive_array::<UInt64Type>(rng, len, 0.8)),
4 => Arc::new(generate_primitive_array::<Float32Type>(rng, len, 0.8)),
5 => Arc::new(generate_primitive_array::<Float64Type>(rng, len, 0.8)),
6 => Arc::new(generate_strings::<i32>(rng, len, 0.8)),
7 => {
let dict_values_len = rng.random_range(1..len);
let strings = Arc::new(generate_strings::<i32>(rng, dict_values_len, 1.0));
Arc::new(generate_dictionary::<Int64Type>(rng, strings, len, 0.8))
}
8 => {
let dict_values_len = rng.random_range(1..len);
let values = Arc::new(generate_primitive_array::<Int64Type>(
rng,
dict_values_len,
1.0,
));
Arc::new(generate_dictionary::<Int64Type>(rng, values, len, 0.8))
}
9 => Arc::new(generate_fixed_size_binary(rng, len, 0.8)),
10 => Arc::new(generate_struct(rng, len, 0.8)),
11 => Arc::new(generate_list(rng, len, 0.8, |rng, values_len| {
Arc::new(generate_primitive_array::<Int64Type>(rng, values_len, 0.8))
})),
12 => Arc::new(generate_list(rng, len, 0.8, |rng, values_len| {
Arc::new(generate_strings::<i32>(rng, values_len, 0.8))
})),
13 => Arc::new(generate_list(rng, len, 0.8, |rng, values_len| {
Arc::new(generate_struct(rng, values_len, 0.8))
})),
14 => Arc::new(generate_string_view(rng, len, 0.8)),
15 => Arc::new(generate_byte_view(rng, len, 0.8)),
16 => Arc::new(generate_fixed_stringview_column(len)),
17 => Arc::new(
generate_list(&mut rng.clone(), len + 1000, 0.8, |rng, values_len| {
Arc::new(generate_primitive_array::<Int64Type>(rng, values_len, 0.8))
})
.slice(500, len),
),
18 => Arc::new(generate_boolean_array(rng, len, 0.8)),
19 => Arc::new(generate_list_view(
&mut rng.clone(),
len,
0.8,
|values_len| Arc::new(generate_primitive_array::<Int64Type>(rng, values_len, 0.8)),
)),
20 => Arc::new(generate_list_view(
&mut rng.clone(),
len,
0.8,
|values_len| Arc::new(generate_strings::<i32>(rng, values_len, 0.8)),
)),
21 => Arc::new(generate_list_view(
&mut rng.clone(),
len,
0.8,
|values_len| Arc::new(generate_struct(rng, values_len, 0.8)),
)),
22 => Arc::new(
generate_list_view(&mut rng.clone(), len + 1000, 0.8, |values_len| {
Arc::new(generate_primitive_array::<Int64Type>(rng, values_len, 0.8))
})
.slice(500, len),
),
_ => unreachable!(),
}
}
fn print_row(cols: &[SortColumn], row: usize) -> String {
let t: Vec<_> = cols
.iter()
.map(|x| match x.values.is_valid(row) {
true => {
let opts = FormatOptions::default().with_null("NULL");
let formatter = ArrayFormatter::try_new(x.values.as_ref(), &opts).unwrap();
formatter.value(row).to_string()
}
false => "NULL".to_string(),
})
.collect();
t.join(",")
}
fn print_col_types(cols: &[SortColumn]) -> String {
let t: Vec<_> = cols
.iter()
.map(|x| x.values.data_type().to_string())
.collect();
t.join(",")
}
#[derive(Debug, PartialEq)]
enum Nulls {
AsIs,
Different,
None,
}
#[test]
#[cfg_attr(miri, ignore)]
fn fuzz_test() {
let mut rng = StdRng::seed_from_u64(42);
for _ in 0..100 {
for null_behavior in [Nulls::AsIs, Nulls::Different, Nulls::None] {
let num_columns = rng.random_range(1..5);
let len = rng.random_range(5..100);
let mut arrays: Vec<_> = (0..num_columns)
.map(|_| generate_column(&mut rng, len))
.collect();
match null_behavior {
Nulls::AsIs => {
}
Nulls::Different => {
arrays = arrays
.into_iter()
.map(|a| replace_array_nulls(a, generate_nulls(&mut rng, len)))
.collect()
}
Nulls::None => {
arrays = arrays
.into_iter()
.map(|a| replace_array_nulls(a, None))
.collect()
}
}
let options: Vec<_> = (0..num_columns)
.map(|_| SortOptions {
descending: rng.random_bool(0.5),
nulls_first: rng.random_bool(0.5),
})
.collect();
let sort_columns: Vec<_> = options
.iter()
.zip(&arrays)
.map(|(o, c)| SortColumn {
values: Arc::clone(c),
options: Some(*o),
})
.collect();
let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap();
let columns: Vec<SortField> = options
.into_iter()
.zip(&arrays)
.map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o))
.collect();
let converter = RowConverter::new(columns).unwrap();
let rows = converter.convert_columns(&arrays).unwrap();
if !matches!(null_behavior, Nulls::None) {
assert_same_rows_when_changing_input_underlying_null_values(
&arrays, &converter, &rows,
);
}
for i in 0..len {
for j in 0..len {
let row_i = rows.row(i);
let row_j = rows.row(j);
let row_cmp = row_i.cmp(&row_j);
let lex_cmp = comparator.compare(i, j);
assert_eq!(
row_cmp,
lex_cmp,
"({:?} vs {:?}) vs ({:?} vs {:?}) for types {}",
print_row(&sort_columns, i),
print_row(&sort_columns, j),
row_i,
row_j,
print_col_types(&sort_columns)
);
}
}
{
let mut rows_iter = rows.iter();
let mut rows_lengths_iter = rows.lengths();
for (index, row) in rows_iter.by_ref().enumerate() {
let len = rows_lengths_iter
.next()
.expect("Reached end of length iterator while still have rows");
assert_eq!(
row.data.len(),
len,
"Row length mismatch: {} vs {}",
row.data.len(),
len
);
assert_eq!(
len,
rows.row_len(index),
"Row length mismatch at index {}: {} vs {}",
index,
len,
rows.row_len(index)
);
}
assert_eq!(
rows_lengths_iter.next(),
None,
"Length iterator did not reach end"
);
}
let back = converter.convert_rows(&rows).unwrap();
for (actual, expected) in back.iter().zip(&arrays) {
actual.to_data().validate_full().unwrap();
dictionary_eq(actual, expected)
}
let rows = rows.try_into_binary().expect("reasonable size");
let parser = converter.parser();
let back = converter
.convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
.unwrap();
for (actual, expected) in back.iter().zip(&arrays) {
actual.to_data().validate_full().unwrap();
dictionary_eq(actual, expected)
}
let rows = converter.from_binary(rows);
let back = converter.convert_rows(&rows).unwrap();
for (actual, expected) in back.iter().zip(&arrays) {
actual.to_data().validate_full().unwrap();
dictionary_eq(actual, expected)
}
}
}
}
fn replace_array_nulls(array: ArrayRef, new_nulls: Option<NullBuffer>) -> ArrayRef {
make_array(
array
.into_data()
.into_builder()
.nulls(new_nulls)
.build()
.unwrap(),
)
}
fn assert_same_rows_when_changing_input_underlying_null_values(
arrays: &[ArrayRef],
converter: &RowConverter,
rows: &Rows,
) {
let arrays_with_different_data_behind_nulls = arrays
.iter()
.map(|arr| change_underline_null_values(arr))
.collect::<Vec<_>>();
if arrays
.iter()
.zip(arrays_with_different_data_behind_nulls.iter())
.all(|(a, b)| Arc::ptr_eq(a, b))
{
return;
}
let rows_with_different_nulls = converter
.convert_columns(&arrays_with_different_data_behind_nulls)
.unwrap();
assert_eq!(
rows.iter().collect::<Vec<_>>(),
rows_with_different_nulls.iter().collect::<Vec<_>>(),
"Different underlying nulls should not output different rows"
)
}
#[test]
fn test_clear() {
let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
let mut rows = converter.empty_rows(3, 128);
let first = Int32Array::from(vec![None, Some(2), Some(4)]);
let second = Int32Array::from(vec![Some(2), None, Some(4)]);
let arrays = [Arc::new(first) as ArrayRef, Arc::new(second) as ArrayRef];
for array in arrays.iter() {
rows.clear();
converter
.append(&mut rows, std::slice::from_ref(array))
.unwrap();
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(&back[0], array);
}
let mut rows_expected = converter.empty_rows(3, 128);
converter.append(&mut rows_expected, &arrays[1..]).unwrap();
for (i, (actual, expected)) in rows.iter().zip(rows_expected.iter()).enumerate() {
assert_eq!(
actual, expected,
"For row {i}: expected {expected:?}, actual: {actual:?}",
);
}
}
#[test]
fn test_append_codec_dictionary_binary() {
use DataType::*;
let converter = RowConverter::new(vec![SortField::new(Dictionary(
Box::new(Int32),
Box::new(Binary),
))])
.unwrap();
let mut rows = converter.empty_rows(4, 128);
let keys = Int32Array::from_iter_values([0, 1, 2, 3]);
let values = BinaryArray::from(vec![
Some("a".as_bytes()),
Some(b"b"),
Some(b"c"),
Some(b"d"),
]);
let dict_array = DictionaryArray::new(keys, Arc::new(values));
rows.clear();
let array = Arc::new(dict_array) as ArrayRef;
converter
.append(&mut rows, std::slice::from_ref(&array))
.unwrap();
let back = converter.convert_rows(&rows).unwrap();
dictionary_eq(&back[0], &array);
}
#[test]
fn test_list_prefix() {
let mut a = ListBuilder::new(Int8Builder::new());
a.append_value([None]);
a.append_value([None, None]);
let a = a.finish();
let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
assert_eq!(rows.row(0).cmp(&rows.row(1)), Ordering::Less);
}
#[test]
fn map_should_be_marked_as_unsupported() {
let map_data_type = Field::new_map(
"map",
"entries",
Field::new("key", DataType::Utf8, false),
Field::new("value", DataType::Utf8, true),
false,
true,
)
.data_type()
.clone();
let is_supported = RowConverter::supports_fields(&[SortField::new(map_data_type)]);
assert!(!is_supported, "Map should not be supported");
}
#[test]
fn should_fail_to_create_row_converter_for_unsupported_map_type() {
let map_data_type = Field::new_map(
"map",
"entries",
Field::new("key", DataType::Utf8, false),
Field::new("value", DataType::Utf8, true),
false,
true,
)
.data_type()
.clone();
let converter = RowConverter::new(vec![SortField::new(map_data_type)]);
match converter {
Err(ArrowError::NotYetImplemented(message)) => {
assert!(
message.contains("Row format support not yet implemented for"),
"Expected NotYetImplemented error for map data type, got: {message}",
);
}
Err(e) => panic!("Expected NotYetImplemented error, got: {e}"),
Ok(_) => panic!("Expected NotYetImplemented error for map data type"),
}
}
#[test]
fn test_values_buffer_smaller_when_utf8_validation_disabled() {
fn get_values_buffer_len(col: ArrayRef) -> (usize, usize) {
let converter = RowConverter::new(vec![SortField::new(DataType::Utf8View)]).unwrap();
let rows = converter.convert_columns(&[col]).unwrap();
let converted = converter.convert_rows(&rows).unwrap();
let unchecked_values_len = converted[0].as_string_view().data_buffers()[0].len();
let rows = rows.try_into_binary().expect("reasonable size");
let parser = converter.parser();
let converted = converter
.convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
.unwrap();
let checked_values_len = converted[0].as_string_view().data_buffers()[0].len();
(unchecked_values_len, checked_values_len)
}
let col = Arc::new(StringViewArray::from_iter([
Some("hello"), None, Some("short"), Some("tiny"), ])) as ArrayRef;
let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
assert_eq!(unchecked_values_len, 0);
assert_eq!(checked_values_len, 14);
let col = Arc::new(StringViewArray::from_iter([
Some("this is a very long string over 12 bytes"),
Some("another long string to test the buffer"),
])) as ArrayRef;
let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
assert!(unchecked_values_len > 0);
assert_eq!(unchecked_values_len, checked_values_len);
let col = Arc::new(StringViewArray::from_iter([
Some("tiny"), Some("thisisexact13"), None,
Some("short"), ])) as ArrayRef;
let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
assert_eq!(unchecked_values_len, 13);
assert!(checked_values_len > unchecked_values_len);
}
#[test]
fn test_sparse_union() {
let int_array = Int32Array::from(vec![Some(1), None, Some(3), None, Some(5)]);
let str_array = StringArray::from(vec![None, Some("b"), None, Some("d"), None]);
let type_ids = vec![0, 1, 0, 1, 0].into();
let union_fields = [
(0, Arc::new(Field::new("int", DataType::Int32, false))),
(1, Arc::new(Field::new("str", DataType::Utf8, false))),
]
.into_iter()
.collect();
let union_array = UnionArray::try_new(
union_fields,
type_ids,
None,
vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
)
.unwrap();
let union_type = union_array.data_type().clone();
let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
let rows = converter
.convert_columns(&[Arc::new(union_array.clone())])
.unwrap();
let back = converter.convert_rows(&rows).unwrap();
let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap();
assert_eq!(union_array.len(), back_union.len());
for i in 0..union_array.len() {
assert_eq!(union_array.type_id(i), back_union.type_id(i));
}
}
#[test]
fn test_sparse_union_with_nulls() {
let int_array = Int32Array::from(vec![Some(1), None, Some(3), None, Some(5)]);
let str_array = StringArray::from(vec![None::<&str>; 5]);
let type_ids = vec![0, 1, 0, 1, 0].into();
let union_fields = [
(0, Arc::new(Field::new("int", DataType::Int32, true))),
(1, Arc::new(Field::new("str", DataType::Utf8, true))),
]
.into_iter()
.collect();
let union_array = UnionArray::try_new(
union_fields,
type_ids,
None,
vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
)
.unwrap();
let union_type = union_array.data_type().clone();
let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
let rows = converter
.convert_columns(&[Arc::new(union_array.clone())])
.unwrap();
let back = converter.convert_rows(&rows).unwrap();
let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap();
assert_eq!(union_array.len(), back_union.len());
for i in 0..union_array.len() {
let expected_null = union_array.is_null(i);
let actual_null = back_union.is_null(i);
assert_eq!(expected_null, actual_null, "Null mismatch at index {i}");
if !expected_null {
assert_eq!(union_array.type_id(i), back_union.type_id(i));
}
}
}
#[test]
fn test_dense_union() {
let int_array = Int32Array::from(vec![1, 3, 5]);
let str_array = StringArray::from(vec!["a", "b"]);
let type_ids = vec![0, 1, 0, 1, 0].into();
let offsets = vec![0, 0, 1, 1, 2].into();
let union_fields = [
(0, Arc::new(Field::new("int", DataType::Int32, false))),
(1, Arc::new(Field::new("str", DataType::Utf8, false))),
]
.into_iter()
.collect();
let union_array = UnionArray::try_new(
union_fields,
type_ids,
Some(offsets), vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
)
.unwrap();
let union_type = union_array.data_type().clone();
let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
let rows = converter
.convert_columns(&[Arc::new(union_array.clone())])
.unwrap();
let back = converter.convert_rows(&rows).unwrap();
let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap();
assert_eq!(union_array.len(), back_union.len());
for i in 0..union_array.len() {
assert_eq!(union_array.type_id(i), back_union.type_id(i));
}
}
#[test]
fn test_dense_union_with_nulls() {
let int_array = Int32Array::from(vec![Some(1), None, Some(5)]);
let str_array = StringArray::from(vec![Some("a"), None]);
let type_ids = vec![0, 1, 0, 1, 0].into();
let offsets = vec![0, 0, 1, 1, 2].into();
let union_fields = [
(0, Arc::new(Field::new("int", DataType::Int32, true))),
(1, Arc::new(Field::new("str", DataType::Utf8, true))),
]
.into_iter()
.collect();
let union_array = UnionArray::try_new(
union_fields,
type_ids,
Some(offsets),
vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
)
.unwrap();
let union_type = union_array.data_type().clone();
let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
let rows = converter
.convert_columns(&[Arc::new(union_array.clone())])
.unwrap();
let back = converter.convert_rows(&rows).unwrap();
let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap();
assert_eq!(union_array.len(), back_union.len());
for i in 0..union_array.len() {
let expected_null = union_array.is_null(i);
let actual_null = back_union.is_null(i);
assert_eq!(expected_null, actual_null, "Null mismatch at index {i}");
if !expected_null {
assert_eq!(union_array.type_id(i), back_union.type_id(i));
}
}
}
#[test]
fn test_union_ordering() {
let int_array = Int32Array::from(vec![100, 5, 20]);
let str_array = StringArray::from(vec!["z", "a"]);
let type_ids = vec![0, 1, 0, 1, 0].into();
let offsets = vec![0, 0, 1, 1, 2].into();
let union_fields = [
(0, Arc::new(Field::new("int", DataType::Int32, false))),
(1, Arc::new(Field::new("str", DataType::Utf8, false))),
]
.into_iter()
.collect();
let union_array = UnionArray::try_new(
union_fields,
type_ids,
Some(offsets),
vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
)
.unwrap();
let union_type = union_array.data_type().clone();
let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
let rows = converter.convert_columns(&[Arc::new(union_array)]).unwrap();
assert!(rows.row(2) < rows.row(1));
assert!(rows.row(0) < rows.row(3));
assert!(rows.row(2) < rows.row(4));
assert!(rows.row(4) < rows.row(0));
assert!(rows.row(3) < rows.row(1));
}
#[test]
fn test_row_converter_roundtrip_with_many_union_columns() {
let fields1 = UnionFields::try_new(
vec![0, 1],
vec![
Field::new("int", DataType::Int32, true),
Field::new("string", DataType::Utf8, true),
],
)
.unwrap();
let int_array1 = Int32Array::from(vec![Some(67), None]);
let string_array1 = StringArray::from(vec![None::<&str>, Some("hello")]);
let type_ids1 = vec![0i8, 1].into();
let union_array1 = UnionArray::try_new(
fields1.clone(),
type_ids1,
None,
vec![
Arc::new(int_array1) as ArrayRef,
Arc::new(string_array1) as ArrayRef,
],
)
.unwrap();
let fields2 = UnionFields::try_new(
vec![0, 1],
vec![
Field::new("int", DataType::Int32, true),
Field::new("string", DataType::Utf8, true),
],
)
.unwrap();
let int_array2 = Int32Array::from(vec![Some(100), None]);
let string_array2 = StringArray::from(vec![None::<&str>, Some("world")]);
let type_ids2 = vec![0i8, 1].into();
let union_array2 = UnionArray::try_new(
fields2.clone(),
type_ids2,
None,
vec![
Arc::new(int_array2) as ArrayRef,
Arc::new(string_array2) as ArrayRef,
],
)
.unwrap();
let field1 = Field::new("col1", DataType::Union(fields1, UnionMode::Sparse), true);
let field2 = Field::new("col2", DataType::Union(fields2, UnionMode::Sparse), true);
let sort_field1 = SortField::new(field1.data_type().clone());
let sort_field2 = SortField::new(field2.data_type().clone());
let converter = RowConverter::new(vec![sort_field1, sort_field2]).unwrap();
let rows = converter
.convert_columns(&[
Arc::new(union_array1.clone()) as ArrayRef,
Arc::new(union_array2.clone()) as ArrayRef,
])
.unwrap();
let out = converter.convert_rows(&rows).unwrap();
let [col1, col2] = out.as_slice() else {
panic!("expected 2 columns")
};
let col1 = col1.as_any().downcast_ref::<UnionArray>().unwrap();
let col2 = col2.as_any().downcast_ref::<UnionArray>().unwrap();
for (expected, got) in [union_array1, union_array2].iter().zip([col1, col2]) {
assert_eq!(expected.len(), got.len());
assert_eq!(expected.type_ids(), got.type_ids());
for i in 0..expected.len() {
assert_eq!(expected.value(i).as_ref(), got.value(i).as_ref());
}
}
}
#[test]
fn test_row_converter_roundtrip_with_one_union_column() {
let fields = UnionFields::try_new(
vec![0, 1],
vec![
Field::new("int", DataType::Int32, true),
Field::new("string", DataType::Utf8, true),
],
)
.unwrap();
let int_array = Int32Array::from(vec![Some(67), None]);
let string_array = StringArray::from(vec![None::<&str>, Some("hello")]);
let type_ids = vec![0i8, 1].into();
let union_array = UnionArray::try_new(
fields.clone(),
type_ids,
None,
vec![
Arc::new(int_array) as ArrayRef,
Arc::new(string_array) as ArrayRef,
],
)
.unwrap();
let field = Field::new("col", DataType::Union(fields, UnionMode::Sparse), true);
let sort_field = SortField::new(field.data_type().clone());
let converter = RowConverter::new(vec![sort_field]).unwrap();
let rows = converter
.convert_columns(&[Arc::new(union_array.clone()) as ArrayRef])
.unwrap();
let out = converter.convert_rows(&rows).unwrap();
let [col1] = out.as_slice() else {
panic!("expected 1 column")
};
let col = col1.as_any().downcast_ref::<UnionArray>().unwrap();
assert_eq!(col.len(), union_array.len());
assert_eq!(col.type_ids(), union_array.type_ids());
for i in 0..col.len() {
assert_eq!(col.value(i).as_ref(), union_array.value(i).as_ref());
}
}
#[test]
fn test_row_converter_roundtrip_with_non_default_union_type_ids() {
let fields = UnionFields::try_new(
vec![70, 85],
vec![
Field::new("int", DataType::Int32, true),
Field::new("string", DataType::Utf8, true),
],
)
.unwrap();
let int_array = Int32Array::from(vec![Some(67), None]);
let string_array = StringArray::from(vec![None::<&str>, Some("hello")]);
let type_ids = vec![70i8, 85].into();
let union_array = UnionArray::try_new(
fields.clone(),
type_ids,
None,
vec![
Arc::new(int_array) as ArrayRef,
Arc::new(string_array) as ArrayRef,
],
)
.unwrap();
let field = Field::new("col", DataType::Union(fields, UnionMode::Sparse), true);
let sort_field = SortField::new(field.data_type().clone());
let converter = RowConverter::new(vec![sort_field]).unwrap();
let rows = converter
.convert_columns(&[Arc::new(union_array.clone()) as ArrayRef])
.unwrap();
let out = converter.convert_rows(&rows).unwrap();
let [col1] = out.as_slice() else {
panic!("expected 1 column")
};
let col = col1.as_any().downcast_ref::<UnionArray>().unwrap();
assert_eq!(col.len(), union_array.len());
assert_eq!(col.type_ids(), union_array.type_ids());
for i in 0..col.len() {
assert_eq!(col.value(i).as_ref(), union_array.value(i).as_ref());
}
}
#[test]
fn rows_size_should_count_for_capacity() {
let row_converter = RowConverter::new(vec![SortField::new(DataType::UInt8)]).unwrap();
let empty_rows_size_with_preallocate_rows_and_data = {
let rows = row_converter.empty_rows(1000, 1000);
rows.size()
};
let empty_rows_size_with_preallocate_rows = {
let rows = row_converter.empty_rows(1000, 0);
rows.size()
};
let empty_rows_size_with_preallocate_data = {
let rows = row_converter.empty_rows(0, 1000);
rows.size()
};
let empty_rows_size_without_preallocate = {
let rows = row_converter.empty_rows(0, 0);
rows.size()
};
assert!(
empty_rows_size_with_preallocate_rows_and_data > empty_rows_size_with_preallocate_rows,
"{empty_rows_size_with_preallocate_rows_and_data} should be larger than {empty_rows_size_with_preallocate_rows}"
);
assert!(
empty_rows_size_with_preallocate_rows_and_data > empty_rows_size_with_preallocate_data,
"{empty_rows_size_with_preallocate_rows_and_data} should be larger than {empty_rows_size_with_preallocate_data}"
);
assert!(
empty_rows_size_with_preallocate_rows > empty_rows_size_without_preallocate,
"{empty_rows_size_with_preallocate_rows} should be larger than {empty_rows_size_without_preallocate}"
);
assert!(
empty_rows_size_with_preallocate_data > empty_rows_size_without_preallocate,
"{empty_rows_size_with_preallocate_data} should be larger than {empty_rows_size_without_preallocate}"
);
}
#[test]
fn test_struct_no_child_fields() {
fn run_test(array: ArrayRef) {
let sort_fields = vec![SortField::new(array.data_type().clone())];
let converter = RowConverter::new(sort_fields).unwrap();
let r = converter.convert_columns(&[Arc::clone(&array)]).unwrap();
let back = converter.convert_rows(&r).unwrap();
assert_eq!(back.len(), 1);
assert_eq!(&back[0], &array);
}
let s = Arc::new(StructArray::new_empty_fields(5, None)) as ArrayRef;
run_test(s);
let s = Arc::new(StructArray::new_empty_fields(
5,
Some(vec![true, false, true, false, false].into()),
)) as ArrayRef;
run_test(s);
}
#[test]
fn reserve_should_increase_capacity_to_the_requested_size() {
let row_converter = RowConverter::new(vec![SortField::new(DataType::UInt8)]).unwrap();
let mut empty_rows = row_converter.empty_rows(0, 0);
empty_rows.reserve(50, 50);
let before_size = empty_rows.size();
empty_rows.reserve(50, 50);
assert_eq!(
empty_rows.size(),
before_size,
"Size should not change when reserving already reserved space"
);
empty_rows.reserve(10, 20);
assert_eq!(
empty_rows.size(),
before_size,
"Size should not change when already have space for the expected reserved data"
);
empty_rows.reserve(100, 20);
assert!(
empty_rows.size() > before_size,
"Size should increase when reserving more space than previously reserved"
);
let before_size = empty_rows.size();
empty_rows.reserve(20, 100);
assert!(
empty_rows.size() > before_size,
"Size should increase when reserving more space than previously reserved"
);
}
#[test]
fn empty_rows_should_return_empty_lengths_iterator() {
let rows = RowConverter::new(vec![SortField::new(DataType::UInt8)])
.unwrap()
.empty_rows(0, 0);
let mut lengths_iter = rows.lengths();
assert_eq!(lengths_iter.next(), None);
}
#[test]
fn test_nested_null_list() {
let null_array = Arc::new(NullArray::new(3));
let list: ArrayRef = Arc::new(ListArray::new(
Field::new_list_field(DataType::Null, true).into(),
OffsetBuffer::from_lengths(vec![1, 0, 2]),
null_array,
None,
));
let converter = RowConverter::new(vec![SortField::new(list.data_type().clone())]).unwrap();
let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(&list, &back[0]);
}
#[test]
fn test_double_nested_null_list() {
let null_array = Arc::new(NullArray::new(1));
let nested_field = Arc::new(Field::new_list_field(DataType::Null, true));
let nested_list = Arc::new(ListArray::new(
nested_field.clone(),
OffsetBuffer::from_lengths(vec![1]),
null_array,
None,
));
let list = Arc::new(ListArray::new(
Field::new_list_field(DataType::List(nested_field), true).into(),
OffsetBuffer::from_lengths(vec![1]),
nested_list,
None,
)) as ArrayRef;
let converter = RowConverter::new(vec![SortField::new(list.data_type().clone())]).unwrap();
let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(&list, &back[0]);
}
#[test]
fn test_large_list_null() {
let null_array = Arc::new(NullArray::new(3));
let list: ArrayRef = Arc::new(LargeListArray::new(
Field::new_list_field(DataType::Null, true).into(),
OffsetBuffer::from_lengths(vec![1, 0, 2]),
null_array,
None,
));
let converter = RowConverter::new(vec![SortField::new(list.data_type().clone())]).unwrap();
let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(&list, &back[0]);
}
#[test]
fn test_fixed_size_list_null() {
let null_array = Arc::new(NullArray::new(6));
let list: ArrayRef = Arc::new(FixedSizeListArray::new(
Arc::new(Field::new_list_field(DataType::Null, true)),
2,
null_array,
None,
));
let converter = RowConverter::new(vec![SortField::new(list.data_type().clone())]).unwrap();
let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(&list, &back[0]);
}
#[test]
fn test_list_null_variations() {
let null_array = Arc::new(NullArray::new(3));
let list: ArrayRef = Arc::new(ListArray::new(
Field::new_list_field(DataType::Null, true).into(),
OffsetBuffer::from_lengths(vec![1, 0, 2]),
null_array,
None,
));
let converter = RowConverter::new(vec![SortField::new(list.data_type().clone())]).unwrap();
let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(&list, &back[0]);
let null_array = Arc::new(NullArray::new(3));
let list: ArrayRef = Arc::new(ListArray::new(
Field::new_list_field(DataType::Null, true).into(),
OffsetBuffer::from_lengths(vec![1, 0, 2]),
null_array,
Some(vec![true, false, true].into()),
));
let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(&list, &back[0]);
let null_array = Arc::new(NullArray::new(0));
let list: ArrayRef = Arc::new(ListArray::new(
Field::new_list_field(DataType::Null, true).into(),
OffsetBuffer::from_lengths(vec![]),
null_array,
None,
));
let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(&list, &back[0]);
let null_array = Arc::new(NullArray::new(0));
let list: ArrayRef = Arc::new(ListArray::new(
Field::new_list_field(DataType::Null, true).into(),
OffsetBuffer::from_lengths(vec![0, 0, 0]),
null_array,
None,
));
let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(&list, &back[0]);
}
#[test]
fn test_list_null_descending() {
let null_array = Arc::new(NullArray::new(3));
let list: ArrayRef = Arc::new(ListArray::new(
Field::new_list_field(DataType::Null, true).into(),
OffsetBuffer::from_lengths(vec![1, 0, 2]),
null_array,
None,
));
let options = SortOptions::default().with_descending(true);
let field = SortField::new_with_options(list.data_type().clone(), options);
let converter = RowConverter::new(vec![field]).unwrap();
let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(&list, &back[0]);
}
#[test]
fn test_struct_with_null_field() {
let null_array = Arc::new(NullArray::new(3));
let int_array = Arc::new(Int32Array::from(vec![1, 2, 3]));
let struct_array: ArrayRef = Arc::new(StructArray::new(
vec![
Arc::new(Field::new("a", DataType::Null, true)),
Arc::new(Field::new("b", DataType::Int32, true)),
]
.into(),
vec![null_array, int_array],
Some(vec![true, true, false].into()), ));
let converter =
RowConverter::new(vec![SortField::new(struct_array.data_type().clone())]).unwrap();
let rows = converter
.convert_columns(&[Arc::clone(&struct_array)])
.unwrap();
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(&struct_array, &back[0]);
}
#[test]
fn test_nested_struct_with_null() {
let inner_null = Arc::new(NullArray::new(2));
let inner_struct = Arc::new(StructArray::new(
vec![Arc::new(Field::new("x", DataType::Null, true))].into(),
vec![inner_null],
None,
));
let y_array = Arc::new(Int32Array::from(vec![10, 20]));
let outer_struct: ArrayRef = Arc::new(StructArray::new(
vec![
Arc::new(Field::new("inner", inner_struct.data_type().clone(), true)),
Arc::new(Field::new("y", DataType::Int32, true)),
]
.into(),
vec![inner_struct, y_array],
None,
));
let converter =
RowConverter::new(vec![SortField::new(outer_struct.data_type().clone())]).unwrap();
let rows = converter
.convert_columns(&[Arc::clone(&outer_struct)])
.unwrap();
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(&outer_struct, &back[0]);
}
#[test]
fn test_map_null_not_supported() {
let map_data_type = Field::new_map(
"map",
"entries",
Field::new("key", DataType::Utf8, false),
Field::new("value", DataType::Null, true),
false,
true,
)
.data_type()
.clone();
let result = RowConverter::new(vec![SortField::new(map_data_type)]);
assert!(
result.is_err(),
"Map should not be supported by RowConverter"
);
assert!(
result
.unwrap_err()
.to_string()
.contains("not yet implemented")
);
}
#[test]
fn empty_row_iter_next_back() {
let rows = RowConverter::new(vec![SortField::new(DataType::UInt8)])
.unwrap()
.empty_rows(0, 0);
let mut rows_iter = rows.iter();
assert_eq!(rows_iter.next_back(), None);
assert_eq!(rows_iter.next_back(), None);
assert_eq!(rows_iter.next_back(), None);
}
#[test]
fn row_iter_next_back() {
let row_converter = RowConverter::new(vec![SortField::new(DataType::UInt8)]).unwrap();
let mut rng = StdRng::seed_from_u64(42);
let array = generate_primitive_array::<UInt8Type>(&mut rng, 100, 0.8);
let rows = row_converter.convert_columns(&[Arc::new(array)]).unwrap();
let mut rows_iter = rows.iter();
let mut bytes: Vec<u8> = vec![];
while let Some(row) = rows_iter.next_back() {
bytes.extend(row.data.iter().rev());
}
bytes.reverse();
assert_eq!(
bytes,
&rows.buffer.as_slice()[..*rows.offsets.last().unwrap()]
);
assert_eq!(rows_iter.next_back(), None);
assert_eq!(rows_iter.next(), None);
}
}