use arrow_array::cast::AsArray;
use arrow_array::{Array, RecordBatch, RecordBatchReader};
use arrow_schema::{ArrowError, DataType as ArrowType, FieldRef, Schema, SchemaRef};
use arrow_select::filter::filter_record_batch;
pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
pub use selection::{RowSelection, RowSelectionCursor, RowSelectionPolicy, RowSelector};
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
pub use crate::arrow::array_reader::RowGroups;
use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
use crate::arrow::schema::{
ParquetField, parquet_to_arrow_schema_and_fields, virtual_type::is_virtual_column,
};
use crate::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels_with_virtual};
use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
use crate::bloom_filter::{
SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
};
use crate::column::page::{PageIterator, PageReader};
#[cfg(feature = "encryption")]
use crate::encryption::decrypt::FileDecryptionProperties;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{
PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader,
ParquetStatisticsPolicy, RowGroupMetaData,
};
use crate::file::reader::{ChunkReader, SerializedPageReader};
use crate::schema::types::SchemaDescriptor;
use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
pub use read_plan::{ReadPlan, ReadPlanBuilder};
mod filter;
pub mod metrics;
mod read_plan;
pub(crate) mod selection;
pub mod statistics;
pub struct ArrowReaderBuilder<T> {
pub(crate) input: T,
pub(crate) metadata: Arc<ParquetMetaData>,
pub(crate) schema: SchemaRef,
pub(crate) fields: Option<Arc<ParquetField>>,
pub(crate) batch_size: usize,
pub(crate) row_groups: Option<Vec<usize>>,
pub(crate) projection: ProjectionMask,
pub(crate) filter: Option<RowFilter>,
pub(crate) selection: Option<RowSelection>,
pub(crate) row_selection_policy: RowSelectionPolicy,
pub(crate) limit: Option<usize>,
pub(crate) offset: Option<usize>,
pub(crate) metrics: ArrowReaderMetrics,
pub(crate) max_predicate_cache_size: usize,
}
impl<T: Debug> Debug for ArrowReaderBuilder<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ArrowReaderBuilder<T>")
.field("input", &self.input)
.field("metadata", &self.metadata)
.field("schema", &self.schema)
.field("fields", &self.fields)
.field("batch_size", &self.batch_size)
.field("row_groups", &self.row_groups)
.field("projection", &self.projection)
.field("filter", &self.filter)
.field("selection", &self.selection)
.field("row_selection_policy", &self.row_selection_policy)
.field("limit", &self.limit)
.field("offset", &self.offset)
.field("metrics", &self.metrics)
.finish()
}
}
impl<T> ArrowReaderBuilder<T> {
pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
Self {
input,
metadata: metadata.metadata,
schema: metadata.schema,
fields: metadata.fields,
batch_size: 1024,
row_groups: None,
projection: ProjectionMask::all(),
filter: None,
selection: None,
row_selection_policy: RowSelectionPolicy::default(),
limit: None,
offset: None,
metrics: ArrowReaderMetrics::Disabled,
max_predicate_cache_size: 100 * 1024 * 1024, }
}
pub fn metadata(&self) -> &Arc<ParquetMetaData> {
&self.metadata
}
pub fn parquet_schema(&self) -> &SchemaDescriptor {
self.metadata.file_metadata().schema_descr()
}
pub fn schema(&self) -> &SchemaRef {
&self.schema
}
pub fn with_batch_size(self, batch_size: usize) -> Self {
let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
Self { batch_size, ..self }
}
pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
Self {
row_groups: Some(row_groups),
..self
}
}
pub fn with_projection(self, mask: ProjectionMask) -> Self {
Self {
projection: mask,
..self
}
}
pub fn with_row_selection_policy(self, policy: RowSelectionPolicy) -> Self {
Self {
row_selection_policy: policy,
..self
}
}
pub fn with_row_selection(self, selection: RowSelection) -> Self {
Self {
selection: Some(selection),
..self
}
}
pub fn with_row_filter(self, filter: RowFilter) -> Self {
Self {
filter: Some(filter),
..self
}
}
pub fn with_limit(self, limit: usize) -> Self {
Self {
limit: Some(limit),
..self
}
}
pub fn with_offset(self, offset: usize) -> Self {
Self {
offset: Some(offset),
..self
}
}
pub fn with_metrics(self, metrics: ArrowReaderMetrics) -> Self {
Self { metrics, ..self }
}
pub fn with_max_predicate_cache_size(self, max_predicate_cache_size: usize) -> Self {
Self {
max_predicate_cache_size,
..self
}
}
}
#[derive(Debug, Clone, Default)]
pub struct ArrowReaderOptions {
skip_arrow_metadata: bool,
supplied_schema: Option<SchemaRef>,
pub(crate) column_index: PageIndexPolicy,
pub(crate) offset_index: PageIndexPolicy,
metadata_options: ParquetMetaDataOptions,
#[cfg(feature = "encryption")]
pub(crate) file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
virtual_columns: Vec<FieldRef>,
}
impl ArrowReaderOptions {
pub fn new() -> Self {
Self::default()
}
pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
Self {
skip_arrow_metadata,
..self
}
}
pub fn with_schema(self, schema: SchemaRef) -> Self {
Self {
supplied_schema: Some(schema),
skip_arrow_metadata: true,
..self
}
}
#[deprecated(since = "57.2.0", note = "Use `with_page_index_policy` instead")]
pub fn with_page_index(self, page_index: bool) -> Self {
self.with_page_index_policy(PageIndexPolicy::from(page_index))
}
pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
self.with_column_index_policy(policy)
.with_offset_index_policy(policy)
}
pub fn with_column_index_policy(mut self, policy: PageIndexPolicy) -> Self {
self.column_index = policy;
self
}
pub fn with_offset_index_policy(mut self, policy: PageIndexPolicy) -> Self {
self.offset_index = policy;
self
}
pub fn with_parquet_schema(mut self, schema: Arc<SchemaDescriptor>) -> Self {
self.metadata_options.set_schema(schema);
self
}
pub fn with_encoding_stats_as_mask(mut self, val: bool) -> Self {
self.metadata_options.set_encoding_stats_as_mask(val);
self
}
pub fn with_encoding_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
self.metadata_options.set_encoding_stats_policy(policy);
self
}
pub fn with_column_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
self.metadata_options.set_column_stats_policy(policy);
self
}
pub fn with_size_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
self.metadata_options.set_size_stats_policy(policy);
self
}
#[cfg(feature = "encryption")]
pub fn with_file_decryption_properties(
self,
file_decryption_properties: Arc<FileDecryptionProperties>,
) -> Self {
Self {
file_decryption_properties: Some(file_decryption_properties),
..self
}
}
pub fn with_virtual_columns(self, virtual_columns: Vec<FieldRef>) -> Result<Self> {
for field in &virtual_columns {
if !is_virtual_column(field) {
return Err(ParquetError::General(format!(
"Field '{}' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'",
field.name()
)));
}
}
Ok(Self {
virtual_columns,
..self
})
}
#[deprecated(
since = "57.2.0",
note = "Use `column_index_policy` or `offset_index_policy` instead"
)]
pub fn page_index(&self) -> bool {
self.offset_index != PageIndexPolicy::Skip && self.column_index != PageIndexPolicy::Skip
}
pub fn offset_index_policy(&self) -> PageIndexPolicy {
self.offset_index
}
pub fn column_index_policy(&self) -> PageIndexPolicy {
self.column_index
}
pub fn metadata_options(&self) -> &ParquetMetaDataOptions {
&self.metadata_options
}
#[cfg(feature = "encryption")]
pub fn file_decryption_properties(&self) -> Option<&Arc<FileDecryptionProperties>> {
self.file_decryption_properties.as_ref()
}
}
#[derive(Debug, Clone)]
pub struct ArrowReaderMetadata {
pub(crate) metadata: Arc<ParquetMetaData>,
pub(crate) schema: SchemaRef,
pub(crate) fields: Option<Arc<ParquetField>>,
}
impl ArrowReaderMetadata {
pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
let metadata = ParquetMetaDataReader::new()
.with_column_index_policy(options.column_index)
.with_offset_index_policy(options.offset_index)
.with_metadata_options(Some(options.metadata_options.clone()));
#[cfg(feature = "encryption")]
let metadata = metadata.with_decryption_properties(
options.file_decryption_properties.as_ref().map(Arc::clone),
);
let metadata = metadata.parse_and_finish(reader)?;
Self::try_new(Arc::new(metadata), options)
}
pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
match options.supplied_schema {
Some(supplied_schema) => Self::with_supplied_schema(
metadata,
supplied_schema.clone(),
&options.virtual_columns,
),
None => {
let kv_metadata = match options.skip_arrow_metadata {
true => None,
false => metadata.file_metadata().key_value_metadata(),
};
let (schema, fields) = parquet_to_arrow_schema_and_fields(
metadata.file_metadata().schema_descr(),
ProjectionMask::all(),
kv_metadata,
&options.virtual_columns,
)?;
Ok(Self {
metadata,
schema: Arc::new(schema),
fields: fields.map(Arc::new),
})
}
}
}
fn with_supplied_schema(
metadata: Arc<ParquetMetaData>,
supplied_schema: SchemaRef,
virtual_columns: &[FieldRef],
) -> Result<Self> {
let parquet_schema = metadata.file_metadata().schema_descr();
let field_levels = parquet_to_arrow_field_levels_with_virtual(
parquet_schema,
ProjectionMask::all(),
Some(supplied_schema.fields()),
virtual_columns,
)?;
let fields = field_levels.fields;
let inferred_len = fields.len();
let supplied_len = supplied_schema.fields().len() + virtual_columns.len();
if inferred_len != supplied_len {
return Err(arrow_err!(format!(
"Incompatible supplied Arrow schema: expected {} columns received {}",
inferred_len, supplied_len
)));
}
let mut errors = Vec::new();
let field_iter = supplied_schema.fields().iter().zip(fields.iter());
for (field1, field2) in field_iter {
if field1.data_type() != field2.data_type() {
errors.push(format!(
"data type mismatch for field {}: requested {} but found {}",
field1.name(),
field1.data_type(),
field2.data_type()
));
}
if field1.is_nullable() != field2.is_nullable() {
errors.push(format!(
"nullability mismatch for field {}: expected {:?} but found {:?}",
field1.name(),
field1.is_nullable(),
field2.is_nullable()
));
}
if field1.metadata() != field2.metadata() {
errors.push(format!(
"metadata mismatch for field {}: expected {:?} but found {:?}",
field1.name(),
field1.metadata(),
field2.metadata()
));
}
}
if !errors.is_empty() {
let message = errors.join(", ");
return Err(ParquetError::ArrowError(format!(
"Incompatible supplied Arrow schema: {message}",
)));
}
Ok(Self {
metadata,
schema: supplied_schema,
fields: field_levels.levels.map(Arc::new),
})
}
pub fn metadata(&self) -> &Arc<ParquetMetaData> {
&self.metadata
}
pub fn parquet_schema(&self) -> &SchemaDescriptor {
self.metadata.file_metadata().schema_descr()
}
pub fn schema(&self) -> &SchemaRef {
&self.schema
}
}
#[doc(hidden)]
pub struct SyncReader<T: ChunkReader>(T);
impl<T: Debug + ChunkReader> Debug for SyncReader<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("SyncReader").field(&self.0).finish()
}
}
pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
pub fn try_new(reader: T) -> Result<Self> {
Self::try_new_with_options(reader, Default::default())
}
pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
let metadata = ArrowReaderMetadata::load(&reader, options)?;
Ok(Self::new_with_metadata(reader, metadata))
}
pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
Self::new_builder(SyncReader(input), metadata)
}
pub fn get_row_group_column_bloom_filter(
&self,
row_group_idx: usize,
column_idx: usize,
) -> Result<Option<Sbbf>> {
let metadata = self.metadata.row_group(row_group_idx);
let column_metadata = metadata.column(column_idx);
let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
offset
.try_into()
.map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
} else {
return Ok(None);
};
let buffer = match column_metadata.bloom_filter_length() {
Some(length) => self.input.0.get_bytes(offset, length as usize),
None => self.input.0.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
}?;
let (header, bitset_offset) =
chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
match header.algorithm {
BloomFilterAlgorithm::BLOCK => {
}
}
match header.compression {
BloomFilterCompression::UNCOMPRESSED => {
}
}
match header.hash {
BloomFilterHash::XXHASH => {
}
}
let bitset = match column_metadata.bloom_filter_length() {
Some(_) => buffer.slice(
(TryInto::<usize>::try_into(bitset_offset).unwrap()
- TryInto::<usize>::try_into(offset).unwrap())..,
),
None => {
let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
ParquetError::General("Bloom filter length is invalid".to_string())
})?;
self.input.0.get_bytes(bitset_offset, bitset_length)?
}
};
Ok(Some(Sbbf::new(&bitset)))
}
pub fn build(self) -> Result<ParquetRecordBatchReader> {
let Self {
input,
metadata,
schema: _,
fields,
batch_size,
row_groups,
projection,
mut filter,
selection,
row_selection_policy,
limit,
offset,
metrics,
max_predicate_cache_size: _,
} = self;
let batch_size = batch_size.min(metadata.file_metadata().num_rows() as usize);
let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect());
let reader = ReaderRowGroups {
reader: Arc::new(input.0),
metadata,
row_groups,
};
let mut plan_builder = ReadPlanBuilder::new(batch_size)
.with_selection(selection)
.with_row_selection_policy(row_selection_policy);
if let Some(filter) = filter.as_mut() {
for predicate in filter.predicates.iter_mut() {
if !plan_builder.selects_any() {
break;
}
let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
.with_parquet_metadata(&reader.metadata)
.build_array_reader(fields.as_deref(), predicate.projection())?;
plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
}
}
let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
.with_parquet_metadata(&reader.metadata)
.build_array_reader(fields.as_deref(), &projection)?;
let read_plan = plan_builder
.limited(reader.num_rows())
.with_offset(offset)
.with_limit(limit)
.build_limited()
.build();
Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
}
}
struct ReaderRowGroups<T: ChunkReader> {
reader: Arc<T>,
metadata: Arc<ParquetMetaData>,
row_groups: Vec<usize>,
}
impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
fn num_rows(&self) -> usize {
let meta = self.metadata.row_groups();
self.row_groups
.iter()
.map(|x| meta[*x].num_rows() as usize)
.sum()
}
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
Ok(Box::new(ReaderPageIterator {
column_idx: i,
reader: self.reader.clone(),
metadata: self.metadata.clone(),
row_groups: self.row_groups.clone().into_iter(),
}))
}
fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
Box::new(
self.row_groups
.iter()
.map(move |i| self.metadata.row_group(*i)),
)
}
fn metadata(&self) -> &ParquetMetaData {
self.metadata.as_ref()
}
}
struct ReaderPageIterator<T: ChunkReader> {
reader: Arc<T>,
column_idx: usize,
row_groups: std::vec::IntoIter<usize>,
metadata: Arc<ParquetMetaData>,
}
impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
let rg = self.metadata.row_group(rg_idx);
let column_chunk_metadata = rg.column(self.column_idx);
let offset_index = self.metadata.offset_index();
let page_locations = offset_index
.filter(|i| !i[rg_idx].is_empty())
.map(|i| i[rg_idx][self.column_idx].page_locations.clone());
let total_rows = rg.num_rows() as usize;
let reader = self.reader.clone();
SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
.add_crypto_context(
rg_idx,
self.column_idx,
self.metadata.as_ref(),
column_chunk_metadata,
)
}
}
impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
type Item = Result<Box<dyn PageReader>>;
fn next(&mut self) -> Option<Self::Item> {
let rg_idx = self.row_groups.next()?;
let page_reader = self
.next_page_reader(rg_idx)
.map(|page_reader| Box::new(page_reader) as _);
Some(page_reader)
}
}
impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
pub struct ParquetRecordBatchReader {
array_reader: Box<dyn ArrayReader>,
schema: SchemaRef,
read_plan: ReadPlan,
}
impl Debug for ParquetRecordBatchReader {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ParquetRecordBatchReader")
.field("array_reader", &"...")
.field("schema", &self.schema)
.field("read_plan", &self.read_plan)
.finish()
}
}
impl Iterator for ParquetRecordBatchReader {
type Item = Result<RecordBatch, ArrowError>;
fn next(&mut self) -> Option<Self::Item> {
self.next_inner()
.map_err(|arrow_err| arrow_err.into())
.transpose()
}
}
impl ParquetRecordBatchReader {
fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
let mut read_records = 0;
let batch_size = self.batch_size();
if batch_size == 0 {
return Ok(None);
}
match self.read_plan.row_selection_cursor_mut() {
RowSelectionCursor::Mask(mask_cursor) => {
while !mask_cursor.is_empty() {
let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size) else {
return Ok(None);
};
if mask_chunk.initial_skip > 0 {
let skipped = self.array_reader.skip_records(mask_chunk.initial_skip)?;
if skipped != mask_chunk.initial_skip {
return Err(general_err!(
"failed to skip rows, expected {}, got {}",
mask_chunk.initial_skip,
skipped
));
}
}
if mask_chunk.chunk_rows == 0 {
if mask_cursor.is_empty() && mask_chunk.selected_rows == 0 {
return Ok(None);
}
continue;
}
let mask = mask_cursor.mask_values_for(&mask_chunk)?;
let read = self.array_reader.read_records(mask_chunk.chunk_rows)?;
if read == 0 {
return Err(general_err!(
"reached end of column while expecting {} rows",
mask_chunk.chunk_rows
));
}
if read != mask_chunk.chunk_rows {
return Err(general_err!(
"insufficient rows read from array reader - expected {}, got {}",
mask_chunk.chunk_rows,
read
));
}
let array = self.array_reader.consume_batch()?;
let struct_array = array.as_struct_opt().ok_or_else(|| {
ArrowError::ParquetError(
"Struct array reader should return struct array".to_string(),
)
})?;
let filtered_batch =
filter_record_batch(&RecordBatch::from(struct_array), &mask)?;
if filtered_batch.num_rows() != mask_chunk.selected_rows {
return Err(general_err!(
"filtered rows mismatch selection - expected {}, got {}",
mask_chunk.selected_rows,
filtered_batch.num_rows()
));
}
if filtered_batch.num_rows() == 0 {
continue;
}
return Ok(Some(filtered_batch));
}
}
RowSelectionCursor::Selectors(selectors_cursor) => {
while read_records < batch_size && !selectors_cursor.is_empty() {
let front = selectors_cursor.next_selector();
if front.skip {
let skipped = self.array_reader.skip_records(front.row_count)?;
if skipped != front.row_count {
return Err(general_err!(
"failed to skip rows, expected {}, got {}",
front.row_count,
skipped
));
}
continue;
}
if front.row_count == 0 {
continue;
}
let need_read = batch_size - read_records;
let to_read = match front.row_count.checked_sub(need_read) {
Some(remaining) if remaining != 0 => {
selectors_cursor.return_selector(RowSelector::select(remaining));
need_read
}
_ => front.row_count,
};
match self.array_reader.read_records(to_read)? {
0 => break,
rec => read_records += rec,
};
}
}
RowSelectionCursor::All => {
self.array_reader.read_records(batch_size)?;
}
};
let array = self.array_reader.consume_batch()?;
let struct_array = array.as_struct_opt().ok_or_else(|| {
ArrowError::ParquetError("Struct array reader should return struct array".to_string())
})?;
Ok(if struct_array.len() > 0 {
Some(RecordBatch::from(struct_array))
} else {
None
})
}
}
impl RecordBatchReader for ParquetRecordBatchReader {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
impl ParquetRecordBatchReader {
pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
ParquetRecordBatchReaderBuilder::try_new(reader)?
.with_batch_size(batch_size)
.build()
}
pub fn try_new_with_row_groups(
levels: &FieldLevels,
row_groups: &dyn RowGroups,
batch_size: usize,
selection: Option<RowSelection>,
) -> Result<Self> {
let metrics = ArrowReaderMetrics::disabled();
let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
.with_parquet_metadata(row_groups.metadata())
.build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;
let read_plan = ReadPlanBuilder::new(batch_size)
.with_selection(selection)
.build();
Ok(Self {
array_reader,
schema: Arc::new(Schema::new(levels.fields.clone())),
read_plan,
})
}
pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
let schema = match array_reader.get_data_type() {
ArrowType::Struct(fields) => Schema::new(fields.clone()),
_ => unreachable!("Struct array reader's data type is not struct!"),
};
Self {
array_reader,
schema: Arc::new(schema),
read_plan,
}
}
#[inline(always)]
pub(crate) fn batch_size(&self) -> usize {
self.read_plan.batch_size()
}
}
#[cfg(test)]
pub(crate) mod tests {
use std::cmp::min;
use std::collections::{HashMap, VecDeque};
use std::fmt::Formatter;
use std::fs::File;
use std::io::Seek;
use std::path::PathBuf;
use std::sync::Arc;
use rand::rngs::StdRng;
use rand::{Rng, RngCore, SeedableRng, random, rng};
use tempfile::tempfile;
use crate::arrow::arrow_reader::{
ArrowPredicateFn, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
};
use crate::arrow::schema::{
add_encoded_arrow_schema_to_metadata,
virtual_type::{RowGroupIndex, RowNumber},
};
use crate::arrow::{ArrowWriter, ProjectionMask};
use crate::basic::{ConvertedType, Encoding, LogicalType, Repetition, Type as PhysicalType};
use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
use crate::data_type::{
BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
FloatType, Int32Type, Int64Type, Int96, Int96Type,
};
use crate::errors::Result;
use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetStatisticsPolicy};
use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
use crate::file::writer::SerializedFileWriter;
use crate::schema::parser::parse_message_type;
use crate::schema::types::{Type, TypePtr};
use crate::util::test_common::rand_gen::RandGen;
use arrow_array::builder::*;
use arrow_array::cast::AsArray;
use arrow_array::types::{
Date32Type, Date64Type, Decimal32Type, Decimal64Type, Decimal128Type, Decimal256Type,
DecimalType, Float16Type, Float32Type, Float64Type, Time32MillisecondType,
Time64MicrosecondType,
};
use arrow_array::*;
use arrow_buffer::{ArrowNativeType, Buffer, IntervalDayTime, NullBuffer, i256};
use arrow_data::{ArrayData, ArrayDataBuilder};
use arrow_schema::{DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit};
use arrow_select::concat::concat_batches;
use bytes::Bytes;
use half::f16;
use num_traits::PrimInt;
#[test]
fn test_arrow_reader_all_columns() {
let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
let original_schema = Arc::clone(builder.schema());
let reader = builder.build().unwrap();
assert_eq!(original_schema.fields(), reader.schema().fields());
}
#[test]
fn test_reuse_schema() {
let file = get_test_file("parquet/alltypes-java.parquet");
let builder = ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
let expected = builder.metadata;
let schema = expected.file_metadata().schema_descr_ptr();
let arrow_options = ArrowReaderOptions::new().with_parquet_schema(schema.clone());
let builder =
ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
assert_eq!(expected.as_ref(), builder.metadata.as_ref());
}
#[test]
fn test_page_encoding_stats_mask() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_tiny_pages.parquet");
let file = File::open(path).unwrap();
let arrow_options = ArrowReaderOptions::new().with_encoding_stats_as_mask(true);
let builder =
ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
let row_group_metadata = builder.metadata.row_group(0);
let page_encoding_stats = row_group_metadata
.column(0)
.page_encoding_stats_mask()
.unwrap();
assert!(page_encoding_stats.is_only(Encoding::PLAIN));
let page_encoding_stats = row_group_metadata
.column(2)
.page_encoding_stats_mask()
.unwrap();
assert!(page_encoding_stats.is_only(Encoding::PLAIN_DICTIONARY));
}
#[test]
fn test_stats_stats_skipped() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_tiny_pages.parquet");
let file = File::open(path).unwrap();
let arrow_options = ArrowReaderOptions::new()
.with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll)
.with_column_stats_policy(ParquetStatisticsPolicy::SkipAll);
let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
file.try_clone().unwrap(),
arrow_options,
)
.unwrap();
let row_group_metadata = builder.metadata.row_group(0);
for column in row_group_metadata.columns() {
assert!(column.page_encoding_stats().is_none());
assert!(column.page_encoding_stats_mask().is_none());
assert!(column.statistics().is_none());
}
let arrow_options = ArrowReaderOptions::new()
.with_encoding_stats_as_mask(true)
.with_encoding_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]))
.with_column_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]));
let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
file.try_clone().unwrap(),
arrow_options,
)
.unwrap();
let row_group_metadata = builder.metadata.row_group(0);
for (idx, column) in row_group_metadata.columns().iter().enumerate() {
assert!(column.page_encoding_stats().is_none());
assert_eq!(column.page_encoding_stats_mask().is_some(), idx == 0);
assert_eq!(column.statistics().is_some(), idx == 0);
}
}
#[test]
fn test_size_stats_stats_skipped() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/repeated_primitive_no_list.parquet");
let file = File::open(path).unwrap();
let arrow_options =
ArrowReaderOptions::new().with_size_stats_policy(ParquetStatisticsPolicy::SkipAll);
let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
file.try_clone().unwrap(),
arrow_options,
)
.unwrap();
let row_group_metadata = builder.metadata.row_group(0);
for column in row_group_metadata.columns() {
assert!(column.repetition_level_histogram().is_none());
assert!(column.definition_level_histogram().is_none());
assert!(column.unencoded_byte_array_data_bytes().is_none());
}
let arrow_options = ArrowReaderOptions::new()
.with_encoding_stats_as_mask(true)
.with_size_stats_policy(ParquetStatisticsPolicy::skip_except(&[1]));
let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
file.try_clone().unwrap(),
arrow_options,
)
.unwrap();
let row_group_metadata = builder.metadata.row_group(0);
for (idx, column) in row_group_metadata.columns().iter().enumerate() {
assert_eq!(column.repetition_level_histogram().is_some(), idx == 1);
assert_eq!(column.definition_level_histogram().is_some(), idx == 1);
assert_eq!(column.unencoded_byte_array_data_bytes().is_some(), idx == 1);
}
}
#[test]
fn test_arrow_reader_single_column() {
let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
let original_schema = Arc::clone(builder.schema());
let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
let reader = builder.with_projection(mask).build().unwrap();
assert_eq!(1, reader.schema().fields().len());
assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
}
#[test]
fn test_arrow_reader_single_column_by_name() {
let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
let original_schema = Arc::clone(builder.schema());
let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
let reader = builder.with_projection(mask).build().unwrap();
assert_eq!(1, reader.schema().fields().len());
assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
}
#[test]
fn test_null_column_reader_test() {
let mut file = tempfile::tempfile().unwrap();
let schema = "
message message {
OPTIONAL INT32 int32;
}
";
let schema = Arc::new(parse_message_type(schema).unwrap());
let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
generate_single_column_file_with_data::<Int32Type>(
&[vec![], vec![]],
Some(&def_levels),
file.try_clone().unwrap(), schema,
Some(Field::new("int32", ArrowDataType::Null, true)),
&Default::default(),
)
.unwrap();
file.rewind().unwrap();
let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(batches.len(), 4);
for batch in &batches[0..3] {
assert_eq!(batch.num_rows(), 2);
assert_eq!(batch.num_columns(), 1);
assert_eq!(batch.column(0).null_count(), 2);
}
assert_eq!(batches[3].num_rows(), 1);
assert_eq!(batches[3].num_columns(), 1);
assert_eq!(batches[3].column(0).null_count(), 1);
}
#[test]
fn test_primitive_single_column_reader_test() {
run_single_column_reader_tests::<BoolType, _, BoolType>(
2,
ConvertedType::NONE,
None,
|vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
&[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
);
run_single_column_reader_tests::<Int32Type, _, Int32Type>(
2,
ConvertedType::NONE,
None,
|vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
&[
Encoding::PLAIN,
Encoding::RLE_DICTIONARY,
Encoding::DELTA_BINARY_PACKED,
Encoding::BYTE_STREAM_SPLIT,
],
);
run_single_column_reader_tests::<Int64Type, _, Int64Type>(
2,
ConvertedType::NONE,
None,
|vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
&[
Encoding::PLAIN,
Encoding::RLE_DICTIONARY,
Encoding::DELTA_BINARY_PACKED,
Encoding::BYTE_STREAM_SPLIT,
],
);
run_single_column_reader_tests::<FloatType, _, FloatType>(
2,
ConvertedType::NONE,
None,
|vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
&[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
);
}
#[test]
fn test_unsigned_primitive_single_column_reader_test() {
run_single_column_reader_tests::<Int32Type, _, Int32Type>(
2,
ConvertedType::UINT_32,
Some(ArrowDataType::UInt32),
|vals| {
Arc::new(UInt32Array::from_iter(
vals.iter().map(|x| x.map(|x| x as u32)),
))
},
&[
Encoding::PLAIN,
Encoding::RLE_DICTIONARY,
Encoding::DELTA_BINARY_PACKED,
],
);
run_single_column_reader_tests::<Int64Type, _, Int64Type>(
2,
ConvertedType::UINT_64,
Some(ArrowDataType::UInt64),
|vals| {
Arc::new(UInt64Array::from_iter(
vals.iter().map(|x| x.map(|x| x as u64)),
))
},
&[
Encoding::PLAIN,
Encoding::RLE_DICTIONARY,
Encoding::DELTA_BINARY_PACKED,
],
);
}
#[test]
fn test_unsigned_roundtrip() {
let schema = Arc::new(Schema::new(vec![
Field::new("uint32", ArrowDataType::UInt32, true),
Field::new("uint64", ArrowDataType::UInt64, true),
]));
let mut buf = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
let original = RecordBatch::try_new(
schema,
vec![
Arc::new(UInt32Array::from_iter_values([
0,
i32::MAX as u32,
u32::MAX,
])),
Arc::new(UInt64Array::from_iter_values([
0,
i64::MAX as u64,
u64::MAX,
])),
],
)
.unwrap();
writer.write(&original).unwrap();
writer.close().unwrap();
let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
let ret = reader.next().unwrap().unwrap();
assert_eq!(ret, original);
ret.column(0)
.as_any()
.downcast_ref::<UInt32Array>()
.unwrap();
ret.column(1)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
}
#[test]
fn test_float16_roundtrip() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("float16", ArrowDataType::Float16, false),
Field::new("float16-nullable", ArrowDataType::Float16, true),
]));
let mut buf = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
let original = RecordBatch::try_new(
schema,
vec![
Arc::new(Float16Array::from_iter_values([
f16::EPSILON,
f16::MIN,
f16::MAX,
f16::NAN,
f16::INFINITY,
f16::NEG_INFINITY,
f16::ONE,
f16::NEG_ONE,
f16::ZERO,
f16::NEG_ZERO,
f16::E,
f16::PI,
f16::FRAC_1_PI,
])),
Arc::new(Float16Array::from(vec![
None,
None,
None,
Some(f16::NAN),
Some(f16::INFINITY),
Some(f16::NEG_INFINITY),
None,
None,
None,
None,
None,
None,
Some(f16::FRAC_1_PI),
])),
],
)?;
writer.write(&original)?;
writer.close()?;
let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
let ret = reader.next().unwrap()?;
assert_eq!(ret, original);
ret.column(0).as_primitive::<Float16Type>();
ret.column(1).as_primitive::<Float16Type>();
Ok(())
}
#[test]
fn test_time_utc_roundtrip() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new(
"time_millis",
ArrowDataType::Time32(TimeUnit::Millisecond),
true,
)
.with_metadata(HashMap::from_iter(vec![(
"adjusted_to_utc".to_string(),
"".to_string(),
)])),
Field::new(
"time_micros",
ArrowDataType::Time64(TimeUnit::Microsecond),
true,
)
.with_metadata(HashMap::from_iter(vec![(
"adjusted_to_utc".to_string(),
"".to_string(),
)])),
]));
let mut buf = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
let original = RecordBatch::try_new(
schema,
vec![
Arc::new(Time32MillisecondArray::from(vec![
Some(-1),
Some(0),
Some(86_399_000),
Some(86_400_000),
Some(86_401_000),
None,
])),
Arc::new(Time64MicrosecondArray::from(vec![
Some(-1),
Some(0),
Some(86_399 * 1_000_000),
Some(86_400 * 1_000_000),
Some(86_401 * 1_000_000),
None,
])),
],
)?;
writer.write(&original)?;
writer.close()?;
let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
let ret = reader.next().unwrap()?;
assert_eq!(ret, original);
ret.column(0).as_primitive::<Time32MillisecondType>();
ret.column(1).as_primitive::<Time64MicrosecondType>();
Ok(())
}
#[test]
fn test_date32_roundtrip() -> Result<()> {
use arrow_array::Date32Array;
let schema = Arc::new(Schema::new(vec![Field::new(
"date32",
ArrowDataType::Date32,
false,
)]));
let mut buf = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
let original = RecordBatch::try_new(
schema,
vec![Arc::new(Date32Array::from(vec![
-1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
]))],
)?;
writer.write(&original)?;
writer.close()?;
let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
let ret = reader.next().unwrap()?;
assert_eq!(ret, original);
ret.column(0).as_primitive::<Date32Type>();
Ok(())
}
#[test]
fn test_date64_roundtrip() -> Result<()> {
use arrow_array::Date64Array;
let schema = Arc::new(Schema::new(vec![
Field::new("small-date64", ArrowDataType::Date64, false),
Field::new("big-date64", ArrowDataType::Date64, false),
Field::new("invalid-date64", ArrowDataType::Date64, false),
]));
let mut default_buf = Vec::with_capacity(1024);
let mut coerce_buf = Vec::with_capacity(1024);
let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
let mut coerce_writer =
ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
let original = RecordBatch::try_new(
schema,
vec![
Arc::new(Date64Array::from(vec![
-1_000_000 * NUM_MILLISECONDS_IN_DAY,
-1_000 * NUM_MILLISECONDS_IN_DAY,
0,
1_000 * NUM_MILLISECONDS_IN_DAY,
1_000_000 * NUM_MILLISECONDS_IN_DAY,
])),
Arc::new(Date64Array::from(vec![
-10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
-1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
0,
1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
])),
Arc::new(Date64Array::from(vec![
-1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
-1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1,
1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
])),
],
)?;
default_writer.write(&original)?;
coerce_writer.write(&original)?;
default_writer.close()?;
coerce_writer.close()?;
let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
let default_ret = default_reader.next().unwrap()?;
let coerce_ret = coerce_reader.next().unwrap()?;
assert_eq!(default_ret, original);
assert_eq!(coerce_ret.column(0), original.column(0));
assert_ne!(coerce_ret.column(1), original.column(1));
assert_ne!(coerce_ret.column(2), original.column(2));
default_ret.column(0).as_primitive::<Date64Type>();
coerce_ret.column(0).as_primitive::<Date64Type>();
Ok(())
}
struct RandFixedLenGen {}
impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
fn r#gen(len: i32) -> FixedLenByteArray {
let mut v = vec![0u8; len as usize];
rng().fill_bytes(&mut v);
ByteArray::from(v).into()
}
}
#[test]
fn test_fixed_length_binary_column_reader() {
run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
20,
ConvertedType::NONE,
None,
|vals| {
let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
for val in vals {
match val {
Some(b) => builder.append_value(b).unwrap(),
None => builder.append_null(),
}
}
Arc::new(builder.finish())
},
&[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
);
}
#[test]
fn test_interval_day_time_column_reader() {
run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
12,
ConvertedType::INTERVAL,
None,
|vals| {
Arc::new(
vals.iter()
.map(|x| {
x.as_ref().map(|b| IntervalDayTime {
days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
milliseconds: i32::from_le_bytes(
b.as_ref()[8..12].try_into().unwrap(),
),
})
})
.collect::<IntervalDayTimeArray>(),
)
},
&[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
);
}
#[test]
fn test_int96_single_column_reader_test() {
let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
type TypeHintAndConversionFunction =
(Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
let resolutions: Vec<TypeHintAndConversionFunction> = vec![
(None, |vals: &[Option<Int96>]| {
Arc::new(TimestampNanosecondArray::from_iter(
vals.iter().map(|x| x.map(|x| x.to_nanos())),
)) as ArrayRef
}),
(
Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
|vals: &[Option<Int96>]| {
Arc::new(TimestampSecondArray::from_iter(
vals.iter().map(|x| x.map(|x| x.to_seconds())),
)) as ArrayRef
},
),
(
Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
|vals: &[Option<Int96>]| {
Arc::new(TimestampMillisecondArray::from_iter(
vals.iter().map(|x| x.map(|x| x.to_millis())),
)) as ArrayRef
},
),
(
Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
|vals: &[Option<Int96>]| {
Arc::new(TimestampMicrosecondArray::from_iter(
vals.iter().map(|x| x.map(|x| x.to_micros())),
)) as ArrayRef
},
),
(
Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
|vals: &[Option<Int96>]| {
Arc::new(TimestampNanosecondArray::from_iter(
vals.iter().map(|x| x.map(|x| x.to_nanos())),
)) as ArrayRef
},
),
(
Some(ArrowDataType::Timestamp(
TimeUnit::Second,
Some(Arc::from("-05:00")),
)),
|vals: &[Option<Int96>]| {
Arc::new(
TimestampSecondArray::from_iter(
vals.iter().map(|x| x.map(|x| x.to_seconds())),
)
.with_timezone("-05:00"),
) as ArrayRef
},
),
];
resolutions.iter().for_each(|(arrow_type, converter)| {
run_single_column_reader_tests::<Int96Type, _, Int96Type>(
2,
ConvertedType::NONE,
arrow_type.clone(),
converter,
encodings,
);
})
}
#[test]
fn test_int96_from_spark_file_with_provided_schema() {
use arrow_schema::DataType::Timestamp;
let test_data = arrow::util::test_util::parquet_test_data();
let path = format!("{test_data}/int96_from_spark.parquet");
let file = File::open(path).unwrap();
let supplied_schema = Arc::new(Schema::new(vec![Field::new(
"a",
Timestamp(TimeUnit::Microsecond, None),
true,
)]));
let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
let mut record_reader =
ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
.unwrap()
.build()
.unwrap();
let batch = record_reader.next().unwrap().unwrap();
assert_eq!(batch.num_columns(), 1);
let column = batch.column(0);
assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
let expected = Arc::new(Int64Array::from(vec![
Some(1704141296123456),
Some(1704070800000000),
Some(253402225200000000),
Some(1735599600000000),
None,
Some(9089380393200000000),
]));
let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
let casted_timestamps = binding.as_primitive::<types::Int64Type>();
assert_eq!(casted_timestamps.len(), expected.len());
casted_timestamps
.iter()
.zip(expected.iter())
.for_each(|(lhs, rhs)| {
assert_eq!(lhs, rhs);
});
}
#[test]
fn test_int96_from_spark_file_without_provided_schema() {
use arrow_schema::DataType::Timestamp;
let test_data = arrow::util::test_util::parquet_test_data();
let path = format!("{test_data}/int96_from_spark.parquet");
let file = File::open(path).unwrap();
let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
.unwrap()
.build()
.unwrap();
let batch = record_reader.next().unwrap().unwrap();
assert_eq!(batch.num_columns(), 1);
let column = batch.column(0);
assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
let expected = Arc::new(Int64Array::from(vec![
Some(1704141296123456000), Some(1704070800000000000), Some(-4852191831933722624), Some(1735599600000000000), None,
Some(-4864435138808946688), ]));
let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
let casted_timestamps = binding.as_primitive::<types::Int64Type>();
assert_eq!(casted_timestamps.len(), expected.len());
casted_timestamps
.iter()
.zip(expected.iter())
.for_each(|(lhs, rhs)| {
assert_eq!(lhs, rhs);
});
}
struct RandUtf8Gen {}
impl RandGen<ByteArrayType> for RandUtf8Gen {
fn r#gen(len: i32) -> ByteArray {
Int32Type::r#gen(len).to_string().as_str().into()
}
}
#[test]
fn test_utf8_single_column_reader_test() {
fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
})))
}
let encodings = &[
Encoding::PLAIN,
Encoding::RLE_DICTIONARY,
Encoding::DELTA_LENGTH_BYTE_ARRAY,
Encoding::DELTA_BYTE_ARRAY,
];
run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2,
ConvertedType::NONE,
None,
|vals| {
Arc::new(BinaryArray::from_iter(
vals.iter().map(|x| x.as_ref().map(|x| x.data())),
))
},
encodings,
);
run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2,
ConvertedType::UTF8,
None,
string_converter::<i32>,
encodings,
);
run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2,
ConvertedType::UTF8,
Some(ArrowDataType::Utf8),
string_converter::<i32>,
encodings,
);
run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2,
ConvertedType::UTF8,
Some(ArrowDataType::LargeUtf8),
string_converter::<i64>,
encodings,
);
let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
for key in &small_key_types {
for encoding in encodings {
let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
opts.encoding = *encoding;
let data_type =
ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
opts,
2,
ConvertedType::UTF8,
Some(data_type.clone()),
move |vals| {
let vals = string_converter::<i32>(vals);
arrow::compute::cast(&vals, &data_type).unwrap()
},
);
}
}
let key_types = [
ArrowDataType::Int16,
ArrowDataType::UInt16,
ArrowDataType::Int32,
ArrowDataType::UInt32,
ArrowDataType::Int64,
ArrowDataType::UInt64,
];
for key in &key_types {
let data_type =
ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2,
ConvertedType::UTF8,
Some(data_type.clone()),
move |vals| {
let vals = string_converter::<i32>(vals);
arrow::compute::cast(&vals, &data_type).unwrap()
},
encodings,
);
let data_type = ArrowDataType::Dictionary(
Box::new(key.clone()),
Box::new(ArrowDataType::LargeUtf8),
);
run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2,
ConvertedType::UTF8,
Some(data_type.clone()),
move |vals| {
let vals = string_converter::<i64>(vals);
arrow::compute::cast(&vals, &data_type).unwrap()
},
encodings,
);
}
}
#[test]
fn test_decimal_nullable_struct() {
let decimals = Decimal256Array::from_iter_values(
[1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
);
let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
"decimals",
decimals.data_type().clone(),
false,
)])))
.len(8)
.null_bit_buffer(Some(Buffer::from(&[0b11101111])))
.child_data(vec![decimals.into_data()])
.build()
.unwrap();
let written =
RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
.unwrap();
let mut buffer = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
writer.write(&written).unwrap();
writer.close().unwrap();
let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(&written.slice(0, 3), &read[0]);
assert_eq!(&written.slice(3, 3), &read[1]);
assert_eq!(&written.slice(6, 2), &read[2]);
}
#[test]
fn test_int32_nullable_struct() {
let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
"int32",
int32.data_type().clone(),
false,
)])))
.len(8)
.null_bit_buffer(Some(Buffer::from(&[0b11101111])))
.child_data(vec![int32.into_data()])
.build()
.unwrap();
let written =
RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
.unwrap();
let mut buffer = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
writer.write(&written).unwrap();
writer.close().unwrap();
let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(&written.slice(0, 3), &read[0]);
assert_eq!(&written.slice(3, 3), &read[1]);
assert_eq!(&written.slice(6, 2), &read[2]);
}
#[test]
fn test_decimal_list() {
let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
decimals.data_type().clone(),
false,
))))
.len(7)
.add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
.null_bit_buffer(Some(Buffer::from(&[0b01010111])))
.child_data(vec![decimals.into_data()])
.build()
.unwrap();
let written =
RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
.unwrap();
let mut buffer = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
writer.write(&written).unwrap();
writer.close().unwrap();
let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(&written.slice(0, 3), &read[0]);
assert_eq!(&written.slice(3, 3), &read[1]);
assert_eq!(&written.slice(6, 1), &read[2]);
}
#[test]
fn test_read_decimal_file() {
use arrow_array::Decimal128Array;
let testdata = arrow::util::test_util::parquet_test_data();
let file_variants = vec![
("byte_array", 4),
("fixed_length", 25),
("int32", 4),
("int64", 10),
];
for (prefix, target_precision) in file_variants {
let path = format!("{testdata}/{prefix}_decimal.parquet");
let file = File::open(path).unwrap();
let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
let batch = record_reader.next().unwrap().unwrap();
assert_eq!(batch.num_rows(), 24);
let col = batch
.column(0)
.as_any()
.downcast_ref::<Decimal128Array>()
.unwrap();
let expected = 1..25;
assert_eq!(col.precision(), target_precision);
assert_eq!(col.scale(), 2);
for (i, v) in expected.enumerate() {
assert_eq!(col.value(i), v * 100_i128);
}
}
}
#[test]
fn test_read_float16_nonzeros_file() {
use arrow_array::Float16Array;
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
let file = File::open(path).unwrap();
let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
let batch = record_reader.next().unwrap().unwrap();
assert_eq!(batch.num_rows(), 8);
let col = batch
.column(0)
.as_any()
.downcast_ref::<Float16Array>()
.unwrap();
let f16_two = f16::ONE + f16::ONE;
assert_eq!(col.null_count(), 1);
assert!(col.is_null(0));
assert_eq!(col.value(1), f16::ONE);
assert_eq!(col.value(2), -f16_two);
assert!(col.value(3).is_nan());
assert_eq!(col.value(4), f16::ZERO);
assert!(col.value(4).is_sign_positive());
assert_eq!(col.value(5), f16::NEG_ONE);
assert_eq!(col.value(6), f16::NEG_ZERO);
assert!(col.value(6).is_sign_negative());
assert_eq!(col.value(7), f16_two);
}
#[test]
fn test_read_float16_zeros_file() {
use arrow_array::Float16Array;
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/float16_zeros_and_nans.parquet");
let file = File::open(path).unwrap();
let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
let batch = record_reader.next().unwrap().unwrap();
assert_eq!(batch.num_rows(), 3);
let col = batch
.column(0)
.as_any()
.downcast_ref::<Float16Array>()
.unwrap();
assert_eq!(col.null_count(), 1);
assert!(col.is_null(0));
assert_eq!(col.value(1), f16::ZERO);
assert!(col.value(1).is_sign_positive());
assert!(col.value(2).is_nan());
}
#[test]
fn test_read_float32_float64_byte_stream_split() {
let path = format!(
"{}/byte_stream_split.zstd.parquet",
arrow::util::test_util::parquet_test_data(),
);
let file = File::open(path).unwrap();
let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
let mut row_count = 0;
for batch in record_reader {
let batch = batch.unwrap();
row_count += batch.num_rows();
let f32_col = batch.column(0).as_primitive::<Float32Type>();
let f64_col = batch.column(1).as_primitive::<Float64Type>();
for &x in f32_col.values() {
assert!(x > -10.0);
assert!(x < 10.0);
}
for &x in f64_col.values() {
assert!(x > -10.0);
assert!(x < 10.0);
}
}
assert_eq!(row_count, 300);
}
#[test]
fn test_read_extended_byte_stream_split() {
let path = format!(
"{}/byte_stream_split_extended.gzip.parquet",
arrow::util::test_util::parquet_test_data(),
);
let file = File::open(path).unwrap();
let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
let mut row_count = 0;
for batch in record_reader {
let batch = batch.unwrap();
row_count += batch.num_rows();
let f16_col = batch.column(0).as_primitive::<Float16Type>();
let f16_bss = batch.column(1).as_primitive::<Float16Type>();
assert_eq!(f16_col.len(), f16_bss.len());
f16_col
.iter()
.zip(f16_bss.iter())
.for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
let f32_col = batch.column(2).as_primitive::<Float32Type>();
let f32_bss = batch.column(3).as_primitive::<Float32Type>();
assert_eq!(f32_col.len(), f32_bss.len());
f32_col
.iter()
.zip(f32_bss.iter())
.for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
let f64_col = batch.column(4).as_primitive::<Float64Type>();
let f64_bss = batch.column(5).as_primitive::<Float64Type>();
assert_eq!(f64_col.len(), f64_bss.len());
f64_col
.iter()
.zip(f64_bss.iter())
.for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
assert_eq!(i32_col.len(), i32_bss.len());
i32_col
.iter()
.zip(i32_bss.iter())
.for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
assert_eq!(i64_col.len(), i64_bss.len());
i64_col
.iter()
.zip(i64_bss.iter())
.for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
let flba_col = batch.column(10).as_fixed_size_binary();
let flba_bss = batch.column(11).as_fixed_size_binary();
assert_eq!(flba_col.len(), flba_bss.len());
flba_col
.iter()
.zip(flba_bss.iter())
.for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
assert_eq!(dec_col.len(), dec_bss.len());
dec_col
.iter()
.zip(dec_bss.iter())
.for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
}
assert_eq!(row_count, 200);
}
#[test]
fn test_read_incorrect_map_schema_file() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/incorrect_map_schema.parquet");
let file = File::open(path).unwrap();
let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
let batch = record_reader.next().unwrap().unwrap();
assert_eq!(batch.num_rows(), 1);
let expected_schema = Schema::new(vec![Field::new(
"my_map",
ArrowDataType::Map(
Arc::new(Field::new(
"key_value",
ArrowDataType::Struct(Fields::from(vec![
Field::new("key", ArrowDataType::Utf8, false),
Field::new("value", ArrowDataType::Utf8, true),
])),
false,
)),
false,
),
true,
)]);
assert_eq!(batch.schema().as_ref(), &expected_schema);
assert_eq!(batch.num_rows(), 1);
assert_eq!(batch.column(0).null_count(), 0);
assert_eq!(
batch.column(0).as_map().keys().as_ref(),
&StringArray::from(vec!["parent", "name"])
);
assert_eq!(
batch.column(0).as_map().values().as_ref(),
&StringArray::from(vec!["another", "report"])
);
}
#[test]
fn test_read_dict_fixed_size_binary() {
let schema = Arc::new(Schema::new(vec![Field::new(
"a",
ArrowDataType::Dictionary(
Box::new(ArrowDataType::UInt8),
Box::new(ArrowDataType::FixedSizeBinary(8)),
),
true,
)]));
let keys = UInt8Array::from_iter_values(vec![0, 0, 1]);
let values = FixedSizeBinaryArray::try_from_iter(
vec![
(0u8..8u8).collect::<Vec<u8>>(),
(24u8..32u8).collect::<Vec<u8>>(),
]
.into_iter(),
)
.unwrap();
let arr = UInt8DictionaryArray::new(keys, Arc::new(values));
let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
let mut buffer = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(read.len(), 1);
assert_eq!(&batch, &read[0])
}
#[test]
fn test_read_nullable_structs_with_binary_dict_as_first_child_column() {
let struct_fields = Fields::from(vec![
Field::new(
"city",
ArrowDataType::Dictionary(
Box::new(ArrowDataType::UInt8),
Box::new(ArrowDataType::Utf8),
),
true,
),
Field::new("name", ArrowDataType::Utf8, true),
]);
let schema = Arc::new(Schema::new(vec![Field::new(
"items",
ArrowDataType::Struct(struct_fields.clone()),
true,
)]));
let items_arr = StructArray::new(
struct_fields,
vec![
Arc::new(DictionaryArray::new(
UInt8Array::from_iter_values(vec![0, 1, 1, 0, 2]),
Arc::new(StringArray::from_iter_values(vec![
"quebec",
"fredericton",
"halifax",
])),
)),
Arc::new(StringArray::from_iter_values(vec![
"albert", "terry", "lance", "", "tim",
])),
],
Some(NullBuffer::from_iter(vec![true, true, true, false, true])),
);
let batch = RecordBatch::try_new(schema, vec![Arc::new(items_arr)]).unwrap();
let mut buffer = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(read.len(), 1);
assert_eq!(&batch, &read[0])
}
#[derive(Clone)]
struct TestOptions {
num_row_groups: usize,
num_rows: usize,
record_batch_size: usize,
null_percent: Option<usize>,
write_batch_size: usize,
max_data_page_size: usize,
max_dict_page_size: usize,
writer_version: WriterVersion,
enabled_statistics: EnabledStatistics,
encoding: Encoding,
row_selections: Option<(RowSelection, usize)>,
row_filter: Option<Vec<bool>>,
limit: Option<usize>,
offset: Option<usize>,
}
impl std::fmt::Debug for TestOptions {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TestOptions")
.field("num_row_groups", &self.num_row_groups)
.field("num_rows", &self.num_rows)
.field("record_batch_size", &self.record_batch_size)
.field("null_percent", &self.null_percent)
.field("write_batch_size", &self.write_batch_size)
.field("max_data_page_size", &self.max_data_page_size)
.field("max_dict_page_size", &self.max_dict_page_size)
.field("writer_version", &self.writer_version)
.field("enabled_statistics", &self.enabled_statistics)
.field("encoding", &self.encoding)
.field("row_selections", &self.row_selections.is_some())
.field("row_filter", &self.row_filter.is_some())
.field("limit", &self.limit)
.field("offset", &self.offset)
.finish()
}
}
impl Default for TestOptions {
fn default() -> Self {
Self {
num_row_groups: 2,
num_rows: 100,
record_batch_size: 15,
null_percent: None,
write_batch_size: 64,
max_data_page_size: 1024 * 1024,
max_dict_page_size: 1024 * 1024,
writer_version: WriterVersion::PARQUET_1_0,
enabled_statistics: EnabledStatistics::Page,
encoding: Encoding::PLAIN,
row_selections: None,
row_filter: None,
limit: None,
offset: None,
}
}
}
impl TestOptions {
fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
Self {
num_row_groups,
num_rows,
record_batch_size,
..Default::default()
}
}
fn with_null_percent(self, null_percent: usize) -> Self {
Self {
null_percent: Some(null_percent),
..self
}
}
fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
Self {
max_data_page_size,
..self
}
}
fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
Self {
max_dict_page_size,
..self
}
}
fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
Self {
enabled_statistics,
..self
}
}
fn with_row_selections(self) -> Self {
assert!(self.row_filter.is_none(), "Must set row selection first");
let mut rng = rng();
let step = rng.random_range(self.record_batch_size..self.num_rows);
let row_selections = create_test_selection(
step,
self.num_row_groups * self.num_rows,
rng.random::<bool>(),
);
Self {
row_selections: Some(row_selections),
..self
}
}
fn with_row_filter(self) -> Self {
let row_count = match &self.row_selections {
Some((_, count)) => *count,
None => self.num_row_groups * self.num_rows,
};
let mut rng = rng();
Self {
row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
..self
}
}
fn with_limit(self, limit: usize) -> Self {
Self {
limit: Some(limit),
..self
}
}
fn with_offset(self, offset: usize) -> Self {
Self {
offset: Some(offset),
..self
}
}
fn writer_props(&self) -> WriterProperties {
let builder = WriterProperties::builder()
.set_data_page_size_limit(self.max_data_page_size)
.set_write_batch_size(self.write_batch_size)
.set_writer_version(self.writer_version)
.set_statistics_enabled(self.enabled_statistics);
let builder = match self.encoding {
Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
.set_dictionary_enabled(true)
.set_dictionary_page_size_limit(self.max_dict_page_size),
_ => builder
.set_dictionary_enabled(false)
.set_encoding(self.encoding),
};
builder.build()
}
}
fn run_single_column_reader_tests<T, F, G>(
rand_max: i32,
converted_type: ConvertedType,
arrow_type: Option<ArrowDataType>,
converter: F,
encodings: &[Encoding],
) where
T: DataType,
G: RandGen<T>,
F: Fn(&[Option<T::T>]) -> ArrayRef,
{
let all_options = vec![
TestOptions::new(2, 100, 15),
TestOptions::new(3, 25, 5),
TestOptions::new(4, 100, 25),
TestOptions::new(3, 256, 73).with_max_data_page_size(128),
TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
TestOptions::new(2, 256, 127).with_null_percent(0),
TestOptions::new(2, 256, 93).with_null_percent(25),
TestOptions::new(4, 100, 25).with_limit(0),
TestOptions::new(4, 100, 25).with_limit(50),
TestOptions::new(4, 100, 25).with_limit(10),
TestOptions::new(4, 100, 25).with_limit(101),
TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
TestOptions::new(2, 256, 91)
.with_null_percent(25)
.with_enabled_statistics(EnabledStatistics::Chunk),
TestOptions::new(2, 256, 91)
.with_null_percent(25)
.with_enabled_statistics(EnabledStatistics::None),
TestOptions::new(2, 128, 91)
.with_null_percent(100)
.with_enabled_statistics(EnabledStatistics::None),
TestOptions::new(2, 100, 15).with_row_selections(),
TestOptions::new(3, 25, 5).with_row_selections(),
TestOptions::new(4, 100, 25).with_row_selections(),
TestOptions::new(3, 256, 73)
.with_max_data_page_size(128)
.with_row_selections(),
TestOptions::new(3, 256, 57)
.with_max_dict_page_size(128)
.with_row_selections(),
TestOptions::new(2, 256, 127)
.with_null_percent(0)
.with_row_selections(),
TestOptions::new(2, 256, 93)
.with_null_percent(25)
.with_row_selections(),
TestOptions::new(2, 256, 93)
.with_null_percent(25)
.with_row_selections()
.with_limit(10),
TestOptions::new(2, 256, 93)
.with_null_percent(25)
.with_row_selections()
.with_offset(20)
.with_limit(10),
TestOptions::new(4, 100, 25).with_row_filter(),
TestOptions::new(4, 100, 25)
.with_row_selections()
.with_row_filter(),
TestOptions::new(2, 256, 93)
.with_null_percent(25)
.with_max_data_page_size(10)
.with_row_filter(),
TestOptions::new(2, 256, 93)
.with_null_percent(25)
.with_max_data_page_size(10)
.with_row_selections()
.with_row_filter(),
TestOptions::new(2, 256, 93)
.with_enabled_statistics(EnabledStatistics::None)
.with_max_data_page_size(10)
.with_row_selections(),
];
all_options.into_iter().for_each(|opts| {
for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
for encoding in encodings {
let opts = TestOptions {
writer_version,
encoding: *encoding,
..opts.clone()
};
single_column_reader_test::<T, _, G>(
opts,
rand_max,
converted_type,
arrow_type.clone(),
&converter,
)
}
}
});
}
fn single_column_reader_test<T, F, G>(
opts: TestOptions,
rand_max: i32,
converted_type: ConvertedType,
arrow_type: Option<ArrowDataType>,
converter: F,
) where
T: DataType,
G: RandGen<T>,
F: Fn(&[Option<T::T>]) -> ArrayRef,
{
println!(
"Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
T::get_physical_type(),
converted_type,
arrow_type,
opts
);
let (repetition, def_levels) = match opts.null_percent.as_ref() {
Some(null_percent) => {
let mut rng = rng();
let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
.map(|_| {
std::iter::from_fn(|| {
Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
})
.take(opts.num_rows)
.collect()
})
.collect();
(Repetition::OPTIONAL, Some(def_levels))
}
None => (Repetition::REQUIRED, None),
};
let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
.map(|idx| {
let null_count = match def_levels.as_ref() {
Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
None => 0,
};
G::gen_vec(rand_max, opts.num_rows - null_count)
})
.collect();
let len = match T::get_physical_type() {
crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
crate::basic::Type::INT96 => 12,
_ => -1,
};
let fields = vec![Arc::new(
Type::primitive_type_builder("leaf", T::get_physical_type())
.with_repetition(repetition)
.with_converted_type(converted_type)
.with_length(len)
.build()
.unwrap(),
)];
let schema = Arc::new(
Type::group_type_builder("test_schema")
.with_fields(fields)
.build()
.unwrap(),
);
let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
let mut file = tempfile::tempfile().unwrap();
generate_single_column_file_with_data::<T>(
&values,
def_levels.as_ref(),
file.try_clone().unwrap(), schema,
arrow_field,
&opts,
)
.unwrap();
file.rewind().unwrap();
let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::from(
opts.enabled_statistics == EnabledStatistics::Page,
));
let mut builder =
ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
let expected_data = match opts.row_selections {
Some((selections, row_count)) => {
let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
let mut skip_data: Vec<Option<T::T>> = vec![];
let dequeue: VecDeque<RowSelector> = selections.clone().into();
for select in dequeue {
if select.skip {
without_skip_data.drain(0..select.row_count);
} else {
skip_data.extend(without_skip_data.drain(0..select.row_count));
}
}
builder = builder.with_row_selection(selections);
assert_eq!(skip_data.len(), row_count);
skip_data
}
None => {
let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
expected_data
}
};
let mut expected_data = match opts.row_filter {
Some(filter) => {
let expected_data = expected_data
.into_iter()
.zip(filter.iter())
.filter_map(|(d, f)| f.then(|| d))
.collect();
let mut filter_offset = 0;
let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
ProjectionMask::all(),
move |b| {
let array = BooleanArray::from_iter(
filter
.iter()
.skip(filter_offset)
.take(b.num_rows())
.map(|x| Some(*x)),
);
filter_offset += b.num_rows();
Ok(array)
},
))]);
builder = builder.with_row_filter(filter);
expected_data
}
None => expected_data,
};
if let Some(offset) = opts.offset {
builder = builder.with_offset(offset);
expected_data = expected_data.into_iter().skip(offset).collect();
}
if let Some(limit) = opts.limit {
builder = builder.with_limit(limit);
expected_data = expected_data.into_iter().take(limit).collect();
}
let mut record_reader = builder
.with_batch_size(opts.record_batch_size)
.build()
.unwrap();
let mut total_read = 0;
loop {
let maybe_batch = record_reader.next();
if total_read < expected_data.len() {
let end = min(total_read + opts.record_batch_size, expected_data.len());
let batch = maybe_batch.unwrap().unwrap();
assert_eq!(end - total_read, batch.num_rows());
let a = converter(&expected_data[total_read..end]);
let b = batch.column(0);
assert_eq!(a.data_type(), b.data_type());
assert_eq!(a.to_data(), b.to_data());
assert_eq!(
a.as_any().type_id(),
b.as_any().type_id(),
"incorrect type ids"
);
total_read = end;
} else {
assert!(maybe_batch.is_none());
break;
}
}
}
fn gen_expected_data<T: DataType>(
def_levels: Option<&Vec<Vec<i16>>>,
values: &[Vec<T::T>],
) -> Vec<Option<T::T>> {
let data: Vec<Option<T::T>> = match def_levels {
Some(levels) => {
let mut values_iter = values.iter().flatten();
levels
.iter()
.flatten()
.map(|d| match d {
1 => Some(values_iter.next().cloned().unwrap()),
0 => None,
_ => unreachable!(),
})
.collect()
}
None => values.iter().flatten().cloned().map(Some).collect(),
};
data
}
fn generate_single_column_file_with_data<T: DataType>(
values: &[Vec<T::T>],
def_levels: Option<&Vec<Vec<i16>>>,
file: File,
schema: TypePtr,
field: Option<Field>,
opts: &TestOptions,
) -> Result<ParquetMetaData> {
let mut writer_props = opts.writer_props();
if let Some(field) = field {
let arrow_schema = Schema::new(vec![field]);
add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
}
let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
for (idx, v) in values.iter().enumerate() {
let def_levels = def_levels.map(|d| d[idx].as_slice());
let mut row_group_writer = writer.next_row_group()?;
{
let mut column_writer = row_group_writer
.next_column()?
.expect("Column writer is none!");
column_writer
.typed::<T>()
.write_batch(v, def_levels, None)?;
column_writer.close()?;
}
row_group_writer.close()?;
}
writer.close()
}
fn get_test_file(file_name: &str) -> File {
let mut path = PathBuf::new();
path.push(arrow::util::test_util::arrow_test_data());
path.push(file_name);
File::open(path.as_path()).expect("File not found!")
}
#[test]
fn test_read_structs() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/nested_structs.rust.parquet");
let file = File::open(&path).unwrap();
let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
for batch in record_batch_reader {
batch.unwrap();
}
let file = File::open(&path).unwrap();
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
let projected_reader = builder
.with_projection(mask)
.with_batch_size(60)
.build()
.unwrap();
let expected_schema = Schema::new(vec![
Field::new(
"roll_num",
ArrowDataType::Struct(Fields::from(vec![Field::new(
"count",
ArrowDataType::UInt64,
false,
)])),
false,
),
Field::new(
"PC_CUR",
ArrowDataType::Struct(Fields::from(vec![
Field::new("mean", ArrowDataType::Int64, false),
Field::new("sum", ArrowDataType::Int64, false),
])),
false,
),
]);
assert_eq!(&expected_schema, projected_reader.schema().as_ref());
for batch in projected_reader {
let batch = batch.unwrap();
assert_eq!(batch.schema().as_ref(), &expected_schema);
}
}
#[test]
fn test_read_structs_by_name() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/nested_structs.rust.parquet");
let file = File::open(&path).unwrap();
let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
for batch in record_batch_reader {
batch.unwrap();
}
let file = File::open(&path).unwrap();
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
let mask = ProjectionMask::columns(
builder.parquet_schema(),
["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
);
let projected_reader = builder
.with_projection(mask)
.with_batch_size(60)
.build()
.unwrap();
let expected_schema = Schema::new(vec![
Field::new(
"roll_num",
ArrowDataType::Struct(Fields::from(vec![Field::new(
"count",
ArrowDataType::UInt64,
false,
)])),
false,
),
Field::new(
"PC_CUR",
ArrowDataType::Struct(Fields::from(vec![
Field::new("mean", ArrowDataType::Int64, false),
Field::new("sum", ArrowDataType::Int64, false),
])),
false,
),
]);
assert_eq!(&expected_schema, projected_reader.schema().as_ref());
for batch in projected_reader {
let batch = batch.unwrap();
assert_eq!(batch.schema().as_ref(), &expected_schema);
}
}
#[test]
fn test_read_maps() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/nested_maps.snappy.parquet");
let file = File::open(path).unwrap();
let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
for batch in record_batch_reader {
batch.unwrap();
}
}
#[test]
fn test_nested_nullability() {
let message_type = "message nested {
OPTIONAL Group group {
REQUIRED INT32 leaf;
}
}";
let file = tempfile::tempfile().unwrap();
let schema = Arc::new(parse_message_type(message_type).unwrap());
{
let mut writer =
SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
.unwrap();
{
let mut row_group_writer = writer.next_row_group().unwrap();
let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
column_writer
.typed::<Int32Type>()
.write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
.unwrap();
column_writer.close().unwrap();
row_group_writer.close().unwrap();
}
writer.close().unwrap();
}
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
let reader = builder.with_projection(mask).build().unwrap();
let expected_schema = Schema::new(vec![Field::new(
"group",
ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
true,
)]);
let batch = reader.into_iter().next().unwrap().unwrap();
assert_eq!(batch.schema().as_ref(), &expected_schema);
assert_eq!(batch.num_rows(), 4);
assert_eq!(batch.column(0).null_count(), 2);
}
#[test]
fn test_dictionary_preservation() {
let fields = vec![Arc::new(
Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
.with_repetition(Repetition::OPTIONAL)
.with_converted_type(ConvertedType::UTF8)
.build()
.unwrap(),
)];
let schema = Arc::new(
Type::group_type_builder("test_schema")
.with_fields(fields)
.build()
.unwrap(),
);
let dict_type = ArrowDataType::Dictionary(
Box::new(ArrowDataType::Int32),
Box::new(ArrowDataType::Utf8),
);
let arrow_field = Field::new("leaf", dict_type, true);
let mut file = tempfile::tempfile().unwrap();
let values = vec![
vec![
ByteArray::from("hello"),
ByteArray::from("a"),
ByteArray::from("b"),
ByteArray::from("d"),
],
vec![
ByteArray::from("c"),
ByteArray::from("a"),
ByteArray::from("b"),
],
];
let def_levels = vec![
vec![1, 0, 0, 1, 0, 0, 1, 1],
vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
];
let opts = TestOptions {
encoding: Encoding::RLE_DICTIONARY,
..Default::default()
};
generate_single_column_file_with_data::<ByteArrayType>(
&values,
Some(&def_levels),
file.try_clone().unwrap(), schema,
Some(arrow_field),
&opts,
)
.unwrap();
file.rewind().unwrap();
let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
let batches = record_reader
.collect::<Result<Vec<RecordBatch>, _>>()
.unwrap();
assert_eq!(batches.len(), 6);
assert!(batches.iter().all(|x| x.num_columns() == 1));
let row_counts = batches
.iter()
.map(|x| (x.num_rows(), x.column(0).null_count()))
.collect::<Vec<_>>();
assert_eq!(
row_counts,
vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
);
let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
}
#[test]
fn test_read_null_list() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/null_list.parquet");
let file = File::open(path).unwrap();
let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
let batch = record_batch_reader.next().unwrap().unwrap();
assert_eq!(batch.num_rows(), 1);
assert_eq!(batch.num_columns(), 1);
assert_eq!(batch.column(0).len(), 1);
let list = batch
.column(0)
.as_any()
.downcast_ref::<ListArray>()
.unwrap();
assert_eq!(list.len(), 1);
assert!(list.is_valid(0));
let val = list.value(0);
assert_eq!(val.len(), 0);
}
#[test]
fn test_null_schema_inference() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/null_list.parquet");
let file = File::open(path).unwrap();
let arrow_field = Field::new(
"emptylist",
ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
true,
);
let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
let schema = builder.schema();
assert_eq!(schema.fields().len(), 1);
assert_eq!(schema.field(0), &arrow_field);
}
#[test]
fn test_skip_metadata() {
let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
let field = Field::new("col", col.data_type().clone(), true);
let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
let metadata = [("key".to_string(), "value".to_string())]
.into_iter()
.collect();
let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
assert_ne!(schema_with_metadata, schema_without_metadata);
let batch =
RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
let file = |version: WriterVersion| {
let props = WriterProperties::builder()
.set_writer_version(version)
.build();
let file = tempfile().unwrap();
let mut writer =
ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
.unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
file
};
let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
let v1_reader = file(WriterVersion::PARQUET_1_0);
let v2_reader = file(WriterVersion::PARQUET_2_0);
let arrow_reader =
ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
assert_eq!(arrow_reader.schema(), schema_with_metadata);
let reader =
ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
.unwrap()
.build()
.unwrap();
assert_eq!(reader.schema(), schema_without_metadata);
let arrow_reader =
ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
assert_eq!(arrow_reader.schema(), schema_with_metadata);
let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
.unwrap()
.build()
.unwrap();
assert_eq!(reader.schema(), schema_without_metadata);
}
fn write_parquet_from_iter<I, F>(value: I) -> File
where
I: IntoIterator<Item = (F, ArrayRef)>,
F: AsRef<str>,
{
let batch = RecordBatch::try_from_iter(value).unwrap();
let file = tempfile().unwrap();
let mut writer =
ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
file
}
fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
where
I: IntoIterator<Item = (F, ArrayRef)>,
F: AsRef<str>,
{
let file = write_parquet_from_iter(value);
let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
file.try_clone().unwrap(),
options_with_schema,
);
assert_eq!(builder.err().unwrap().to_string(), expected_error);
}
#[test]
fn test_schema_too_few_columns() {
run_schema_test_with_error(
vec![
("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
],
Arc::new(Schema::new(vec![Field::new(
"int64",
ArrowDataType::Int64,
false,
)])),
"Arrow: incompatible arrow schema, expected 2 struct fields got 1",
);
}
#[test]
fn test_schema_too_many_columns() {
run_schema_test_with_error(
vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
Arc::new(Schema::new(vec![
Field::new("int64", ArrowDataType::Int64, false),
Field::new("int32", ArrowDataType::Int32, false),
])),
"Arrow: incompatible arrow schema, expected 1 struct fields got 2",
);
}
#[test]
fn test_schema_mismatched_column_names() {
run_schema_test_with_error(
vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
Arc::new(Schema::new(vec![Field::new(
"other",
ArrowDataType::Int64,
false,
)])),
"Arrow: incompatible arrow schema, expected field named int64 got other",
);
}
#[test]
fn test_schema_incompatible_columns() {
run_schema_test_with_error(
vec![
(
"col1_invalid",
Arc::new(Int64Array::from(vec![0])) as ArrayRef,
),
(
"col2_valid",
Arc::new(Int32Array::from(vec![0])) as ArrayRef,
),
(
"col3_invalid",
Arc::new(Date64Array::from(vec![0])) as ArrayRef,
),
],
Arc::new(Schema::new(vec![
Field::new("col1_invalid", ArrowDataType::Int32, false),
Field::new("col2_valid", ArrowDataType::Int32, false),
Field::new("col3_invalid", ArrowDataType::Int32, false),
])),
"Arrow: Incompatible supplied Arrow schema: data type mismatch for field col1_invalid: requested Int32 but found Int64, data type mismatch for field col3_invalid: requested Int32 but found Int64",
);
}
#[test]
fn test_one_incompatible_nested_column() {
let nested_fields = Fields::from(vec![
Field::new("nested1_valid", ArrowDataType::Utf8, false),
Field::new("nested1_invalid", ArrowDataType::Int64, false),
]);
let nested = StructArray::try_new(
nested_fields,
vec![
Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
Arc::new(Int64Array::from(vec![0])) as ArrayRef,
],
None,
)
.expect("struct array");
let supplied_nested_fields = Fields::from(vec![
Field::new("nested1_valid", ArrowDataType::Utf8, false),
Field::new("nested1_invalid", ArrowDataType::Int32, false),
]);
run_schema_test_with_error(
vec![
("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
("nested", Arc::new(nested) as ArrayRef),
],
Arc::new(Schema::new(vec![
Field::new("col1", ArrowDataType::Int64, false),
Field::new("col2", ArrowDataType::Int32, false),
Field::new(
"nested",
ArrowDataType::Struct(supplied_nested_fields),
false,
),
])),
"Arrow: Incompatible supplied Arrow schema: data type mismatch for field nested: \
requested Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int32) \
but found Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int64)",
);
}
fn utf8_parquet() -> Bytes {
let input = StringArray::from_iter_values(vec!["foo", "bar", "baz"]);
let batch = RecordBatch::try_from_iter(vec![("column1", Arc::new(input) as _)]).unwrap();
let props = None;
let mut parquet_data = vec![];
let mut writer = ArrowWriter::try_new(&mut parquet_data, batch.schema(), props).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
Bytes::from(parquet_data)
}
#[test]
fn test_schema_error_bad_types() {
let parquet_data = utf8_parquet();
let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
"column1",
arrow::datatypes::DataType::Int32,
false,
)]));
let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
let err =
ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
.unwrap_err();
assert_eq!(
err.to_string(),
"Arrow: Incompatible supplied Arrow schema: data type mismatch for field column1: requested Int32 but found Utf8"
)
}
#[test]
fn test_schema_error_bad_nullability() {
let parquet_data = utf8_parquet();
let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
"column1",
arrow::datatypes::DataType::Utf8,
true,
)]));
let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
let err =
ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
.unwrap_err();
assert_eq!(
err.to_string(),
"Arrow: Incompatible supplied Arrow schema: nullability mismatch for field column1: expected true but found false"
)
}
#[test]
fn test_read_binary_as_utf8() {
let file = write_parquet_from_iter(vec![
(
"binary_to_utf8",
Arc::new(BinaryArray::from(vec![
b"one".as_ref(),
b"two".as_ref(),
b"three".as_ref(),
])) as ArrayRef,
),
(
"large_binary_to_large_utf8",
Arc::new(LargeBinaryArray::from(vec![
b"one".as_ref(),
b"two".as_ref(),
b"three".as_ref(),
])) as ArrayRef,
),
(
"binary_view_to_utf8_view",
Arc::new(BinaryViewArray::from(vec![
b"one".as_ref(),
b"two".as_ref(),
b"three".as_ref(),
])) as ArrayRef,
),
]);
let supplied_fields = Fields::from(vec![
Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
Field::new(
"large_binary_to_large_utf8",
ArrowDataType::LargeUtf8,
false,
),
Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
]);
let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
file.try_clone().unwrap(),
options,
)
.expect("reader builder with schema")
.build()
.expect("reader with schema");
let batch = arrow_reader.next().unwrap().unwrap();
assert_eq!(batch.num_columns(), 3);
assert_eq!(batch.num_rows(), 3);
assert_eq!(
batch
.column(0)
.as_string::<i32>()
.iter()
.collect::<Vec<_>>(),
vec![Some("one"), Some("two"), Some("three")]
);
assert_eq!(
batch
.column(1)
.as_string::<i64>()
.iter()
.collect::<Vec<_>>(),
vec![Some("one"), Some("two"), Some("three")]
);
assert_eq!(
batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
vec![Some("one"), Some("two"), Some("three")]
);
}
#[test]
#[should_panic(expected = "Invalid UTF8 sequence at")]
fn test_read_non_utf8_binary_as_utf8() {
let file = write_parquet_from_iter(vec![(
"non_utf8_binary",
Arc::new(BinaryArray::from(vec![
b"\xDE\x00\xFF".as_ref(),
b"\xDE\x01\xAA".as_ref(),
b"\xDE\x02\xFF".as_ref(),
])) as ArrayRef,
)]);
let supplied_fields = Fields::from(vec![Field::new(
"non_utf8_binary",
ArrowDataType::Utf8,
false,
)]);
let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
file.try_clone().unwrap(),
options,
)
.expect("reader builder with schema")
.build()
.expect("reader with schema");
arrow_reader.next().unwrap().unwrap_err();
}
#[test]
fn test_with_schema() {
let nested_fields = Fields::from(vec![
Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
]);
let nested_arrays: Vec<ArrayRef> = vec![
Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
];
let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
let file = write_parquet_from_iter(vec![
(
"int32_to_ts_second",
Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
),
(
"date32_to_date64",
Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
),
("nested", Arc::new(nested) as ArrayRef),
]);
let supplied_nested_fields = Fields::from(vec![
Field::new(
"utf8_to_dict",
ArrowDataType::Dictionary(
Box::new(ArrowDataType::Int32),
Box::new(ArrowDataType::Utf8),
),
false,
),
Field::new(
"int64_to_ts_nano",
ArrowDataType::Timestamp(
arrow::datatypes::TimeUnit::Nanosecond,
Some("+10:00".into()),
),
false,
),
]);
let supplied_schema = Arc::new(Schema::new(vec![
Field::new(
"int32_to_ts_second",
ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
false,
),
Field::new("date32_to_date64", ArrowDataType::Date64, false),
Field::new(
"nested",
ArrowDataType::Struct(supplied_nested_fields),
false,
),
]));
let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
file.try_clone().unwrap(),
options,
)
.expect("reader builder with schema")
.build()
.expect("reader with schema");
assert_eq!(arrow_reader.schema(), supplied_schema);
let batch = arrow_reader.next().unwrap().unwrap();
assert_eq!(batch.num_columns(), 3);
assert_eq!(batch.num_rows(), 4);
assert_eq!(
batch
.column(0)
.as_any()
.downcast_ref::<TimestampSecondArray>()
.expect("downcast to timestamp second")
.value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
.map(|v| v.to_string())
.expect("value as datetime"),
"1970-01-01 01:00:00 +01:00"
);
assert_eq!(
batch
.column(1)
.as_any()
.downcast_ref::<Date64Array>()
.expect("downcast to date64")
.value_as_date(0)
.map(|v| v.to_string())
.expect("value as date"),
"1970-01-01"
);
let nested = batch
.column(2)
.as_any()
.downcast_ref::<StructArray>()
.expect("downcast to struct");
let nested_dict = nested
.column(0)
.as_any()
.downcast_ref::<Int32DictionaryArray>()
.expect("downcast to dictionary");
assert_eq!(
nested_dict
.values()
.as_any()
.downcast_ref::<StringArray>()
.expect("downcast to string")
.iter()
.collect::<Vec<_>>(),
vec![Some("a"), Some("b")]
);
assert_eq!(
nested_dict.keys().iter().collect::<Vec<_>>(),
vec![Some(0), Some(0), Some(0), Some(1)]
);
assert_eq!(
nested
.column(1)
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.expect("downcast to timestamp nanosecond")
.value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
.map(|v| v.to_string())
.expect("value as datetime"),
"1970-01-01 10:00:00.000000001 +10:00"
);
}
#[test]
fn test_empty_projection() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_plain.parquet");
let file = File::open(path).unwrap();
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
let file_metadata = builder.metadata().file_metadata();
let expected_rows = file_metadata.num_rows() as usize;
let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
let batch_reader = builder
.with_projection(mask)
.with_batch_size(2)
.build()
.unwrap();
let mut total_rows = 0;
for maybe_batch in batch_reader {
let batch = maybe_batch.unwrap();
total_rows += batch.num_rows();
assert_eq!(batch.num_columns(), 0);
assert!(batch.num_rows() <= 2);
}
assert_eq!(total_rows, expected_rows);
}
fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
let schema = Arc::new(Schema::new(vec![Field::new(
"list",
ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
true,
)]));
let mut buf = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new(
&mut buf,
schema.clone(),
Some(
WriterProperties::builder()
.set_max_row_group_row_count(Some(row_group_size))
.build(),
),
)
.unwrap();
for _ in 0..2 {
let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
for _ in 0..(batch_size) {
list_builder.append(true);
}
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
.unwrap();
writer.write(&batch).unwrap();
}
writer.close().unwrap();
let mut record_reader =
ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
assert_eq!(
batch_size,
record_reader.next().unwrap().unwrap().num_rows()
);
assert_eq!(
batch_size,
record_reader.next().unwrap().unwrap().num_rows()
);
}
#[test]
fn test_row_group_exact_multiple() {
const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
test_row_group_batch(8, 8);
test_row_group_batch(10, 8);
test_row_group_batch(8, 10);
test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
}
fn get_expected_batches(
column: &RecordBatch,
selection: &RowSelection,
batch_size: usize,
) -> Vec<RecordBatch> {
let mut expected_batches = vec![];
let mut selection: VecDeque<_> = selection.clone().into();
let mut row_offset = 0;
let mut last_start = None;
while row_offset < column.num_rows() && !selection.is_empty() {
let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
while batch_remaining > 0 && !selection.is_empty() {
let (to_read, skip) = match selection.front_mut() {
Some(selection) if selection.row_count > batch_remaining => {
selection.row_count -= batch_remaining;
(batch_remaining, selection.skip)
}
Some(_) => {
let select = selection.pop_front().unwrap();
(select.row_count, select.skip)
}
None => break,
};
batch_remaining -= to_read;
match skip {
true => {
if let Some(last_start) = last_start.take() {
expected_batches.push(column.slice(last_start, row_offset - last_start))
}
row_offset += to_read
}
false => {
last_start.get_or_insert(row_offset);
row_offset += to_read
}
}
}
}
if let Some(last_start) = last_start.take() {
expected_batches.push(column.slice(last_start, row_offset - last_start))
}
for batch in &expected_batches[..expected_batches.len() - 1] {
assert_eq!(batch.num_rows(), batch_size);
}
expected_batches
}
fn create_test_selection(
step_len: usize,
total_len: usize,
skip_first: bool,
) -> (RowSelection, usize) {
let mut remaining = total_len;
let mut skip = skip_first;
let mut vec = vec![];
let mut selected_count = 0;
while remaining != 0 {
let step = if remaining > step_len {
step_len
} else {
remaining
};
vec.push(RowSelector {
row_count: step,
skip,
});
remaining -= step;
if !skip {
selected_count += step;
}
skip = !skip;
}
(vec.into(), selected_count)
}
#[test]
fn test_scan_row_with_selection() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
let test_file = File::open(&path).unwrap();
let mut serial_reader =
ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
let data = serial_reader.next().unwrap().unwrap();
let do_test = |batch_size: usize, selection_len: usize| {
for skip_first in [false, true] {
let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
let expected = get_expected_batches(&data, &selections, batch_size);
let skip_reader = create_skip_reader(&test_file, batch_size, selections);
assert_eq!(
skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
expected,
"batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
);
}
};
do_test(1000, 1000);
do_test(20, 20);
do_test(20, 5);
do_test(20, 5);
fn create_skip_reader(
test_file: &File,
batch_size: usize,
selections: RowSelection,
) -> ParquetRecordBatchReader {
let options =
ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
let file = test_file.try_clone().unwrap();
ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
.unwrap()
.with_batch_size(batch_size)
.with_row_selection(selections)
.build()
.unwrap()
}
}
#[test]
fn test_batch_size_overallocate() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_plain.parquet");
let test_file = File::open(path).unwrap();
let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
let num_rows = builder.metadata.file_metadata().num_rows();
let reader = builder
.with_batch_size(1024)
.with_projection(ProjectionMask::all())
.build()
.unwrap();
assert_ne!(1024, num_rows);
assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
}
#[test]
fn test_read_with_page_index_enabled() {
let testdata = arrow::util::test_util::parquet_test_data();
{
let path = format!("{testdata}/alltypes_tiny_pages.parquet");
let test_file = File::open(path).unwrap();
let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
test_file,
ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
)
.unwrap();
assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
let reader = builder.build().unwrap();
let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(batches.len(), 8);
}
{
let path = format!("{testdata}/alltypes_plain.parquet");
let test_file = File::open(path).unwrap();
let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
test_file,
ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
)
.unwrap();
assert!(builder.metadata().offset_index().is_none());
let reader = builder.build().unwrap();
let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(batches.len(), 1);
}
}
#[test]
fn test_raw_repetition() {
const MESSAGE_TYPE: &str = "
message Log {
OPTIONAL INT32 eventType;
REPEATED INT32 category;
REPEATED group filter {
OPTIONAL INT32 error;
}
}
";
let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
let props = Default::default();
let mut buf = Vec::with_capacity(1024);
let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
let mut row_group_writer = writer.next_row_group().unwrap();
let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
col_writer
.typed::<Int32Type>()
.write_batch(&[1], Some(&[1]), None)
.unwrap();
col_writer.close().unwrap();
let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
col_writer
.typed::<Int32Type>()
.write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
.unwrap();
col_writer.close().unwrap();
let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
col_writer
.typed::<Int32Type>()
.write_batch(&[1], Some(&[1]), Some(&[0]))
.unwrap();
col_writer.close().unwrap();
let rg_md = row_group_writer.close().unwrap();
assert_eq!(rg_md.num_rows(), 1);
writer.close().unwrap();
let bytes = Bytes::from(buf);
let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
let full = no_mask.next().unwrap().unwrap();
assert_eq!(full.num_columns(), 3);
for idx in 0..3 {
let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
let mut reader = b.with_projection(mask).build().unwrap();
let projected = reader.next().unwrap().unwrap();
assert_eq!(projected.num_columns(), 1);
assert_eq!(full.column(idx), projected.column(0));
}
}
#[test]
fn test_read_lz4_raw() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/lz4_raw_compressed.parquet");
let file = File::open(path).unwrap();
let batches = ParquetRecordBatchReader::try_new(file, 1024)
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(batches.len(), 1);
let batch = &batches[0];
assert_eq!(batch.num_columns(), 3);
assert_eq!(batch.num_rows(), 4);
let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
assert_eq!(
a.values(),
&[1593604800, 1593604800, 1593604801, 1593604801]
);
let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
let a: Vec<_> = a.iter().flatten().collect();
assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
}
#[test]
fn test_read_lz4_hadoop_fallback() {
for file in [
"hadoop_lz4_compressed.parquet",
"non_hadoop_lz4_compressed.parquet",
] {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/{file}");
let file = File::open(path).unwrap();
let expected_rows = 4;
let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(batches.len(), 1);
let batch = &batches[0];
assert_eq!(batch.num_columns(), 3);
assert_eq!(batch.num_rows(), expected_rows);
let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
assert_eq!(
a.values(),
&[1593604800, 1593604800, 1593604801, 1593604801]
);
let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
let b: Vec<_> = b.iter().flatten().collect();
assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
}
}
#[test]
fn test_read_lz4_hadoop_large() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
let file = File::open(path).unwrap();
let expected_rows = 10000;
let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(batches.len(), 1);
let batch = &batches[0];
assert_eq!(batch.num_columns(), 1);
assert_eq!(batch.num_rows(), expected_rows);
let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
let a: Vec<_> = a.iter().flatten().collect();
assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
}
#[test]
#[cfg(feature = "snap")]
fn test_read_nested_lists() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/nested_lists.snappy.parquet");
let file = File::open(path).unwrap();
let f = file.try_clone().unwrap();
let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
let expected = reader.next().unwrap().unwrap();
assert_eq!(expected.num_rows(), 3);
let selection = RowSelection::from(vec![
RowSelector::skip(1),
RowSelector::select(1),
RowSelector::skip(1),
]);
let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
.unwrap()
.with_row_selection(selection)
.build()
.unwrap();
let actual = reader.next().unwrap().unwrap();
assert_eq!(actual.num_rows(), 1);
assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
}
#[test]
fn test_arbitrary_decimal() {
let values = [1, 2, 3, 4, 5, 6, 7, 8];
let decimals_19_0 = Decimal128Array::from_iter_values(values)
.with_precision_and_scale(19, 0)
.unwrap();
let decimals_12_0 = Decimal128Array::from_iter_values(values)
.with_precision_and_scale(12, 0)
.unwrap();
let decimals_17_10 = Decimal128Array::from_iter_values(values)
.with_precision_and_scale(17, 10)
.unwrap();
let written = RecordBatch::try_from_iter([
("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
])
.unwrap();
let mut buffer = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
writer.write(&written).unwrap();
writer.close().unwrap();
let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(&written.slice(0, 8), &read[0]);
}
#[test]
fn test_list_skip() {
let mut list = ListBuilder::new(Int32Builder::new());
list.append_value([Some(1), Some(2)]);
list.append_value([Some(3)]);
list.append_value([Some(4)]);
let list = list.finish();
let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
let props = WriterProperties::builder()
.set_data_page_row_count_limit(1)
.set_write_batch_size(2)
.build();
let mut buffer = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
.unwrap()
.with_row_selection(selection.into())
.build()
.unwrap();
let out = reader.next().unwrap().unwrap();
assert_eq!(out.num_rows(), 1);
assert_eq!(out, batch.slice(2, 1));
}
fn test_decimal32_roundtrip() {
let d = |values: Vec<i32>, p: u8| {
let iter = values.into_iter();
PrimitiveArray::<Decimal32Type>::from_iter_values(iter)
.with_precision_and_scale(p, 2)
.unwrap()
};
let d1 = d(vec![1, 2, 3, 4, 5], 9);
let batch = RecordBatch::try_from_iter([("d1", Arc::new(d1) as ArrayRef)]).unwrap();
let mut buffer = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
let t1 = builder.parquet_schema().columns()[0].physical_type();
assert_eq!(t1, PhysicalType::INT32);
let mut reader = builder.build().unwrap();
assert_eq!(batch.schema(), reader.schema());
let out = reader.next().unwrap().unwrap();
assert_eq!(batch, out);
}
fn test_decimal64_roundtrip() {
let d = |values: Vec<i64>, p: u8| {
let iter = values.into_iter();
PrimitiveArray::<Decimal64Type>::from_iter_values(iter)
.with_precision_and_scale(p, 2)
.unwrap()
};
let d1 = d(vec![1, 2, 3, 4, 5], 9);
let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
let batch = RecordBatch::try_from_iter([
("d1", Arc::new(d1) as ArrayRef),
("d2", Arc::new(d2) as ArrayRef),
("d3", Arc::new(d3) as ArrayRef),
])
.unwrap();
let mut buffer = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
let t1 = builder.parquet_schema().columns()[0].physical_type();
assert_eq!(t1, PhysicalType::INT32);
let t2 = builder.parquet_schema().columns()[1].physical_type();
assert_eq!(t2, PhysicalType::INT64);
let t3 = builder.parquet_schema().columns()[2].physical_type();
assert_eq!(t3, PhysicalType::INT64);
let mut reader = builder.build().unwrap();
assert_eq!(batch.schema(), reader.schema());
let out = reader.next().unwrap().unwrap();
assert_eq!(batch, out);
}
fn test_decimal_roundtrip<T: DecimalType>() {
let d = |values: Vec<usize>, p: u8| {
let iter = values.into_iter().map(T::Native::usize_as);
PrimitiveArray::<T>::from_iter_values(iter)
.with_precision_and_scale(p, 2)
.unwrap()
};
let d1 = d(vec![1, 2, 3, 4, 5], 9);
let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
let batch = RecordBatch::try_from_iter([
("d1", Arc::new(d1) as ArrayRef),
("d2", Arc::new(d2) as ArrayRef),
("d3", Arc::new(d3) as ArrayRef),
("d4", Arc::new(d4) as ArrayRef),
])
.unwrap();
let mut buffer = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
let t1 = builder.parquet_schema().columns()[0].physical_type();
assert_eq!(t1, PhysicalType::INT32);
let t2 = builder.parquet_schema().columns()[1].physical_type();
assert_eq!(t2, PhysicalType::INT64);
let t3 = builder.parquet_schema().columns()[2].physical_type();
assert_eq!(t3, PhysicalType::INT64);
let t4 = builder.parquet_schema().columns()[3].physical_type();
assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
let mut reader = builder.build().unwrap();
assert_eq!(batch.schema(), reader.schema());
let out = reader.next().unwrap().unwrap();
assert_eq!(batch, out);
}
#[test]
fn test_decimal() {
test_decimal32_roundtrip();
test_decimal64_roundtrip();
test_decimal_roundtrip::<Decimal128Type>();
test_decimal_roundtrip::<Decimal256Type>();
}
#[test]
fn test_list_selection() {
let schema = Arc::new(Schema::new(vec![Field::new_list(
"list",
Field::new_list_field(ArrowDataType::Utf8, true),
false,
)]));
let mut buf = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
for i in 0..2 {
let mut list_a_builder = ListBuilder::new(StringBuilder::new());
for j in 0..1024 {
list_a_builder.values().append_value(format!("{i} {j}"));
list_a_builder.append(true);
}
let batch =
RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
.unwrap();
writer.write(&batch).unwrap();
}
let _metadata = writer.close().unwrap();
let buf = Bytes::from(buf);
let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
.unwrap()
.with_row_selection(RowSelection::from(vec![
RowSelector::skip(100),
RowSelector::select(924),
RowSelector::skip(100),
RowSelector::select(924),
]))
.build()
.unwrap();
let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
let batch = concat_batches(&schema, &batches).unwrap();
assert_eq!(batch.num_rows(), 924 * 2);
let list = batch.column(0).as_list::<i32>();
for w in list.value_offsets().windows(2) {
assert_eq!(w[0] + 1, w[1])
}
let mut values = list.values().as_string::<i32>().iter();
for i in 0..2 {
for j in 100..1024 {
let expected = format!("{i} {j}");
assert_eq!(values.next().unwrap().unwrap(), &expected);
}
}
}
#[test]
fn test_list_selection_fuzz() {
let mut rng = rng();
let schema = Arc::new(Schema::new(vec![Field::new_list(
"list",
Field::new_list(
Field::LIST_FIELD_DEFAULT_NAME,
Field::new_list_field(ArrowDataType::Int32, true),
true,
),
true,
)]));
let mut buf = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
for _ in 0..2048 {
if rng.random_bool(0.2) {
list_a_builder.append(false);
continue;
}
let list_a_len = rng.random_range(0..10);
let list_b_builder = list_a_builder.values();
for _ in 0..list_a_len {
if rng.random_bool(0.2) {
list_b_builder.append(false);
continue;
}
let list_b_len = rng.random_range(0..10);
let int_builder = list_b_builder.values();
for _ in 0..list_b_len {
match rng.random_bool(0.2) {
true => int_builder.append_null(),
false => int_builder.append_value(rng.random()),
}
}
list_b_builder.append(true)
}
list_a_builder.append(true);
}
let array = Arc::new(list_a_builder.finish());
let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
writer.write(&batch).unwrap();
let _metadata = writer.close().unwrap();
let buf = Bytes::from(buf);
let cases = [
vec![
RowSelector::skip(100),
RowSelector::select(924),
RowSelector::skip(100),
RowSelector::select(924),
],
vec![
RowSelector::select(924),
RowSelector::skip(100),
RowSelector::select(924),
RowSelector::skip(100),
],
vec![
RowSelector::skip(1023),
RowSelector::select(1),
RowSelector::skip(1023),
RowSelector::select(1),
],
vec![
RowSelector::select(1),
RowSelector::skip(1023),
RowSelector::select(1),
RowSelector::skip(1023),
],
];
for batch_size in [100, 1024, 2048] {
for selection in &cases {
let selection = RowSelection::from(selection.clone());
let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
.unwrap()
.with_row_selection(selection.clone())
.with_batch_size(batch_size)
.build()
.unwrap();
let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
assert_eq!(actual.num_rows(), selection.row_count());
let mut batch_offset = 0;
let mut actual_offset = 0;
for selector in selection.iter() {
if selector.skip {
batch_offset += selector.row_count;
continue;
}
assert_eq!(
batch.slice(batch_offset, selector.row_count),
actual.slice(actual_offset, selector.row_count)
);
batch_offset += selector.row_count;
actual_offset += selector.row_count;
}
}
}
}
#[test]
fn test_read_old_nested_list() {
use arrow::datatypes::DataType;
use arrow::datatypes::ToByteSlice;
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/old_list_structure.parquet");
let test_file = File::open(path).unwrap();
let a_values = Int32Array::from(vec![1, 2, 3, 4]);
let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
"array",
DataType::Int32,
false,
))))
.len(2)
.add_buffer(a_value_offsets)
.add_child_data(a_values.into_data())
.build()
.unwrap();
let a = ListArray::from(a_list_data);
let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
let mut reader = builder.build().unwrap();
let out = reader.next().unwrap().unwrap();
assert_eq!(out.num_rows(), 1);
assert_eq!(out.num_columns(), 1);
let c0 = out.column(0);
let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
let r0 = c0arr.value(0);
let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
assert_eq!(r0arr, &a);
}
#[test]
fn test_map_no_value() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/map_no_value.parquet");
let file = File::open(path).unwrap();
let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
.unwrap()
.build()
.unwrap();
let out = reader.next().unwrap().unwrap();
assert_eq!(out.num_rows(), 3);
assert_eq!(out.num_columns(), 3);
let c0 = out.column(1).as_list::<i32>();
let c1 = out.column(2).as_list::<i32>();
assert_eq!(c0.len(), c1.len());
c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
}
#[test]
fn test_read_unknown_logical_type() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/unknown-logical-type.parquet");
let test_file = File::open(path).unwrap();
let builder = ParquetRecordBatchReaderBuilder::try_new(test_file)
.expect("Error creating reader builder");
let schema = builder.metadata().file_metadata().schema_descr();
assert_eq!(
schema.column(0).logical_type_ref(),
Some(&LogicalType::String)
);
assert_eq!(
schema.column(1).logical_type_ref(),
Some(&LogicalType::_Unknown { field_id: 2555 })
);
assert_eq!(schema.column(1).physical_type(), PhysicalType::BYTE_ARRAY);
let mut reader = builder.build().unwrap();
let out = reader.next().unwrap().unwrap();
assert_eq!(out.num_rows(), 3);
assert_eq!(out.num_columns(), 2);
}
#[test]
fn test_read_row_numbers() {
let file = write_parquet_from_iter(vec![(
"value",
Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
)]);
let supplied_fields = Fields::from(vec![Field::new("value", ArrowDataType::Int64, false)]);
let row_number_field = Arc::new(
Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
);
let options = ArrowReaderOptions::new()
.with_schema(Arc::new(Schema::new(supplied_fields)))
.with_virtual_columns(vec![row_number_field.clone()])
.unwrap();
let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
file.try_clone().unwrap(),
options,
)
.expect("reader builder with schema")
.build()
.expect("reader with schema");
let batch = arrow_reader.next().unwrap().unwrap();
let schema = Arc::new(Schema::new(vec![
Field::new("value", ArrowDataType::Int64, false),
(*row_number_field).clone(),
]));
assert_eq!(batch.schema(), schema);
assert_eq!(batch.num_columns(), 2);
assert_eq!(batch.num_rows(), 3);
assert_eq!(
batch
.column(0)
.as_primitive::<types::Int64Type>()
.iter()
.collect::<Vec<_>>(),
vec![Some(1), Some(2), Some(3)]
);
assert_eq!(
batch
.column(1)
.as_primitive::<types::Int64Type>()
.iter()
.collect::<Vec<_>>(),
vec![Some(0), Some(1), Some(2)]
);
}
#[test]
fn test_read_only_row_numbers() {
let file = write_parquet_from_iter(vec![(
"value",
Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
)]);
let row_number_field = Arc::new(
Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
);
let options = ArrowReaderOptions::new()
.with_virtual_columns(vec![row_number_field.clone()])
.unwrap();
let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
let num_columns = metadata
.metadata
.file_metadata()
.schema_descr()
.num_columns();
let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
.with_projection(ProjectionMask::none(num_columns))
.build()
.expect("reader with schema");
let batch = arrow_reader.next().unwrap().unwrap();
let schema = Arc::new(Schema::new(vec![row_number_field]));
assert_eq!(batch.schema(), schema);
assert_eq!(batch.num_columns(), 1);
assert_eq!(batch.num_rows(), 3);
assert_eq!(
batch
.column(0)
.as_primitive::<types::Int64Type>()
.iter()
.collect::<Vec<_>>(),
vec![Some(0), Some(1), Some(2)]
);
}
#[test]
fn test_read_row_numbers_row_group_order() -> Result<()> {
let array = Int64Array::from_iter_values(5000..5100);
let batch = RecordBatch::try_from_iter([("col", Arc::new(array) as ArrayRef)])?;
let mut buffer = Vec::new();
let options = WriterProperties::builder()
.set_max_row_group_row_count(Some(50))
.build();
let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema().clone(), Some(options))?;
for batch_chunk in (0..10).map(|i| batch.slice(i * 10, 10)) {
writer.write(&batch_chunk)?;
}
writer.close()?;
let row_number_field = Arc::new(
Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
);
let buffer = Bytes::from(buffer);
let options =
ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field.clone()])?;
let arrow_reader =
ParquetRecordBatchReaderBuilder::try_new_with_options(buffer.clone(), options.clone())?
.build()?;
assert_eq!(
ValuesAndRowNumbers {
values: (5000..5100).collect(),
row_numbers: (0..100).collect()
},
ValuesAndRowNumbers::new_from_reader(arrow_reader)
);
let arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(buffer, options)?
.with_row_groups(vec![1, 0])
.build()?;
assert_eq!(
ValuesAndRowNumbers {
values: (5050..5100).chain(5000..5050).collect(),
row_numbers: (50..100).chain(0..50).collect(),
},
ValuesAndRowNumbers::new_from_reader(arrow_reader)
);
Ok(())
}
#[derive(Debug, PartialEq)]
struct ValuesAndRowNumbers {
values: Vec<i64>,
row_numbers: Vec<i64>,
}
impl ValuesAndRowNumbers {
fn new_from_reader(reader: ParquetRecordBatchReader) -> Self {
let mut values = vec![];
let mut row_numbers = vec![];
for batch in reader {
let batch = batch.expect("Could not read batch");
values.extend(
batch
.column_by_name("col")
.expect("Could not get col column")
.as_primitive::<arrow::datatypes::Int64Type>()
.iter()
.map(|v| v.expect("Could not get value")),
);
row_numbers.extend(
batch
.column_by_name("row_number")
.expect("Could not get row_number column")
.as_primitive::<arrow::datatypes::Int64Type>()
.iter()
.map(|v| v.expect("Could not get row number"))
.collect::<Vec<_>>(),
);
}
Self {
values,
row_numbers,
}
}
}
#[test]
fn test_with_virtual_columns_rejects_non_virtual_fields() {
let regular_field = Arc::new(Field::new("regular_column", ArrowDataType::Int64, false));
assert_eq!(
ArrowReaderOptions::new()
.with_virtual_columns(vec![regular_field])
.unwrap_err()
.to_string(),
"Parquet error: Field 'regular_column' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'"
);
}
#[test]
fn test_row_numbers_with_multiple_row_groups() {
test_row_numbers_with_multiple_row_groups_helper(
false,
|path, selection, _row_filter, batch_size| {
let file = File::open(path).unwrap();
let row_number_field = Arc::new(
Field::new("row_number", ArrowDataType::Int64, false)
.with_extension_type(RowNumber),
);
let options = ArrowReaderOptions::new()
.with_virtual_columns(vec![row_number_field])
.unwrap();
let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
.unwrap()
.with_row_selection(selection)
.with_batch_size(batch_size)
.build()
.expect("Could not create reader");
reader
.collect::<Result<Vec<_>, _>>()
.expect("Could not read")
},
);
}
#[test]
fn test_row_numbers_with_multiple_row_groups_and_filter() {
test_row_numbers_with_multiple_row_groups_helper(
true,
|path, selection, row_filter, batch_size| {
let file = File::open(path).unwrap();
let row_number_field = Arc::new(
Field::new("row_number", ArrowDataType::Int64, false)
.with_extension_type(RowNumber),
);
let options = ArrowReaderOptions::new()
.with_virtual_columns(vec![row_number_field])
.unwrap();
let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
.unwrap()
.with_row_selection(selection)
.with_batch_size(batch_size)
.with_row_filter(row_filter.expect("No filter"))
.build()
.expect("Could not create reader");
reader
.collect::<Result<Vec<_>, _>>()
.expect("Could not read")
},
);
}
#[test]
fn test_read_row_group_indices() {
let array1 = Int64Array::from(vec![1, 2]);
let array2 = Int64Array::from(vec![3, 4]);
let array3 = Int64Array::from(vec![5, 6]);
let batch1 =
RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as ArrayRef)]).unwrap();
let batch2 =
RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as ArrayRef)]).unwrap();
let batch3 =
RecordBatch::try_from_iter(vec![("value", Arc::new(array3) as ArrayRef)]).unwrap();
let mut buffer = Vec::new();
let options = WriterProperties::builder()
.set_max_row_group_row_count(Some(2))
.build();
let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap();
writer.write(&batch1).unwrap();
writer.write(&batch2).unwrap();
writer.write(&batch3).unwrap();
writer.close().unwrap();
let file = Bytes::from(buffer);
let row_group_index_field = Arc::new(
Field::new("row_group_index", ArrowDataType::Int64, false)
.with_extension_type(RowGroupIndex),
);
let options = ArrowReaderOptions::new()
.with_virtual_columns(vec![row_group_index_field.clone()])
.unwrap();
let mut arrow_reader =
ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), options)
.expect("reader builder with virtual columns")
.build()
.expect("reader with virtual columns");
let batch = arrow_reader.next().unwrap().unwrap();
assert_eq!(batch.num_columns(), 2);
assert_eq!(batch.num_rows(), 6);
assert_eq!(
batch
.column(0)
.as_primitive::<types::Int64Type>()
.iter()
.collect::<Vec<_>>(),
vec![Some(1), Some(2), Some(3), Some(4), Some(5), Some(6)]
);
assert_eq!(
batch
.column(1)
.as_primitive::<types::Int64Type>()
.iter()
.collect::<Vec<_>>(),
vec![Some(0), Some(0), Some(1), Some(1), Some(2), Some(2)]
);
}
#[test]
fn test_read_only_row_group_indices() {
let array1 = Int64Array::from(vec![1, 2, 3]);
let array2 = Int64Array::from(vec![4, 5]);
let batch1 =
RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as ArrayRef)]).unwrap();
let batch2 =
RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as ArrayRef)]).unwrap();
let mut buffer = Vec::new();
let options = WriterProperties::builder()
.set_max_row_group_row_count(Some(3))
.build();
let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap();
writer.write(&batch1).unwrap();
writer.write(&batch2).unwrap();
writer.close().unwrap();
let file = Bytes::from(buffer);
let row_group_index_field = Arc::new(
Field::new("row_group_index", ArrowDataType::Int64, false)
.with_extension_type(RowGroupIndex),
);
let options = ArrowReaderOptions::new()
.with_virtual_columns(vec![row_group_index_field.clone()])
.unwrap();
let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
let num_columns = metadata
.metadata
.file_metadata()
.schema_descr()
.num_columns();
let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
.with_projection(ProjectionMask::none(num_columns))
.build()
.expect("reader with virtual columns only");
let batch = arrow_reader.next().unwrap().unwrap();
let schema = Arc::new(Schema::new(vec![(*row_group_index_field).clone()]));
assert_eq!(batch.schema(), schema);
assert_eq!(batch.num_columns(), 1);
assert_eq!(batch.num_rows(), 5);
assert_eq!(
batch
.column(0)
.as_primitive::<types::Int64Type>()
.iter()
.collect::<Vec<_>>(),
vec![Some(0), Some(0), Some(0), Some(1), Some(1)]
);
}
#[test]
fn test_read_row_group_indices_with_selection() -> Result<()> {
let mut buffer = Vec::new();
let options = WriterProperties::builder()
.set_max_row_group_row_count(Some(10))
.build();
let schema = Arc::new(Schema::new(vec![Field::new(
"value",
ArrowDataType::Int64,
false,
)]));
let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(options))?;
for i in 0..3 {
let start = i * 10;
let array = Int64Array::from_iter_values(start..start + 10);
let batch = RecordBatch::try_from_iter(vec![("value", Arc::new(array) as ArrayRef)])?;
writer.write(&batch)?;
}
writer.close()?;
let file = Bytes::from(buffer);
let row_group_index_field = Arc::new(
Field::new("rg_idx", ArrowDataType::Int64, false).with_extension_type(RowGroupIndex),
);
let options =
ArrowReaderOptions::new().with_virtual_columns(vec![row_group_index_field])?;
let arrow_reader =
ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), options.clone())?
.with_row_groups(vec![2, 1, 0])
.build()?;
let batches: Vec<_> = arrow_reader.collect::<Result<Vec<_>, _>>()?;
let combined = concat_batches(&batches[0].schema(), &batches)?;
let values = combined.column(0).as_primitive::<types::Int64Type>();
let first_val = values.value(0);
let last_val = values.value(combined.num_rows() - 1);
assert_eq!(first_val, 20);
assert_eq!(last_val, 9);
let rg_indices = combined.column(1).as_primitive::<types::Int64Type>();
assert_eq!(rg_indices.value(0), 2);
assert_eq!(rg_indices.value(10), 1);
assert_eq!(rg_indices.value(20), 0);
Ok(())
}
pub(crate) fn test_row_numbers_with_multiple_row_groups_helper<F>(
use_filter: bool,
test_case: F,
) where
F: FnOnce(PathBuf, RowSelection, Option<RowFilter>, usize) -> Vec<RecordBatch>,
{
let seed: u64 = random();
println!("test_row_numbers_with_multiple_row_groups seed: {}", seed);
let mut rng = StdRng::seed_from_u64(seed);
use tempfile::TempDir;
let tempdir = TempDir::new().expect("Could not create temp dir");
let (bytes, metadata) = generate_file_with_row_numbers(&mut rng);
let path = tempdir.path().join("test.parquet");
std::fs::write(&path, bytes).expect("Could not write file");
let mut case = vec![];
let mut remaining = metadata.file_metadata().num_rows();
while remaining > 0 {
let row_count = rng.random_range(1..=remaining);
remaining -= row_count;
case.push(RowSelector {
row_count: row_count as usize,
skip: rng.random_bool(0.5),
});
}
let filter = use_filter.then(|| {
let filter = (0..metadata.file_metadata().num_rows())
.map(|_| rng.random_bool(0.99))
.collect::<Vec<_>>();
let mut filter_offset = 0;
RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
ProjectionMask::all(),
move |b| {
let array = BooleanArray::from_iter(
filter
.iter()
.skip(filter_offset)
.take(b.num_rows())
.map(|x| Some(*x)),
);
filter_offset += b.num_rows();
Ok(array)
},
))])
});
let selection = RowSelection::from(case);
let batches = test_case(path, selection.clone(), filter, rng.random_range(1..4096));
if selection.skipped_row_count() == metadata.file_metadata().num_rows() as usize {
assert!(batches.into_iter().all(|batch| batch.num_rows() == 0));
return;
}
let actual = concat_batches(batches.first().expect("No batches").schema_ref(), &batches)
.expect("Failed to concatenate");
let values = actual
.column(0)
.as_primitive::<types::Int64Type>()
.iter()
.collect::<Vec<_>>();
let row_numbers = actual
.column(1)
.as_primitive::<types::Int64Type>()
.iter()
.collect::<Vec<_>>();
assert_eq!(
row_numbers
.into_iter()
.map(|number| number.map(|number| number + 1))
.collect::<Vec<_>>(),
values
);
}
fn generate_file_with_row_numbers(rng: &mut impl Rng) -> (Bytes, ParquetMetaData) {
let schema = Arc::new(Schema::new(Fields::from(vec![Field::new(
"value",
ArrowDataType::Int64,
false,
)])));
let mut buf = Vec::with_capacity(1024);
let mut writer =
ArrowWriter::try_new(&mut buf, schema.clone(), None).expect("Could not create writer");
let mut values = 1..=rng.random_range(1..4096);
while !values.is_empty() {
let batch_values = values
.by_ref()
.take(rng.random_range(1..4096))
.collect::<Vec<_>>();
let array = Arc::new(Int64Array::from(batch_values)) as ArrayRef;
let batch =
RecordBatch::try_from_iter([("value", array)]).expect("Could not create batch");
writer.write(&batch).expect("Could not write batch");
writer.flush().expect("Could not flush");
}
let metadata = writer.close().expect("Could not close writer");
(Bytes::from(buf), metadata)
}
}