use std::collections::{HashMap, HashSet};
use std::ops::Range;
use std::str::FromStr;
use std::sync::Arc;
use arrow_arith::boolean::{and, and_kleene, is_not_null, is_null, not, or, or_kleene};
use arrow_array::{Array, ArrayRef, BooleanArray, Datum as ArrowDatum, RecordBatch, Scalar};
use arrow_cast::cast::cast;
use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
use arrow_schema::{
ArrowError, DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
};
use arrow_string::like::starts_with;
use bytes::Bytes;
use fnv::FnvHashSet;
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use parquet::arrow::arrow_reader::{
ArrowPredicateFn, ArrowReaderMetadata, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::file::metadata::{
PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData,
};
use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};
use typed_builder::TypedBuilder;
use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
use crate::delete_vector::DeleteVector;
use crate::error::Result;
use crate::expr::visitors::bound_predicate_visitor::{BoundPredicateVisitor, visit};
use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator;
use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator;
use crate::expr::{BoundPredicate, BoundReference};
use crate::io::{FileIO, FileMetadata, FileRead};
use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field};
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type};
use crate::utils::available_parallelism;
use crate::{Error, ErrorKind};
const DEFAULT_RANGE_COALESCE_BYTES: u64 = 1024 * 1024;
const DEFAULT_RANGE_FETCH_CONCURRENCY: usize = 10;
const DEFAULT_METADATA_SIZE_HINT: usize = 512 * 1024;
#[derive(Clone, Copy, Debug, TypedBuilder)]
#[builder(field_defaults(setter(prefix = "with_")))]
pub(crate) struct ParquetReadOptions {
#[builder(default = Some(DEFAULT_METADATA_SIZE_HINT))]
pub(crate) metadata_size_hint: Option<usize>,
#[builder(default = DEFAULT_RANGE_COALESCE_BYTES)]
pub(crate) range_coalesce_bytes: u64,
#[builder(default = DEFAULT_RANGE_FETCH_CONCURRENCY)]
pub(crate) range_fetch_concurrency: usize,
#[builder(default = true)]
pub(crate) preload_column_index: bool,
#[builder(default = true)]
pub(crate) preload_offset_index: bool,
#[builder(default = false)]
pub(crate) preload_page_index: bool,
}
impl ParquetReadOptions {
pub(crate) fn metadata_size_hint(&self) -> Option<usize> {
self.metadata_size_hint
}
pub(crate) fn range_coalesce_bytes(&self) -> u64 {
self.range_coalesce_bytes
}
pub(crate) fn range_fetch_concurrency(&self) -> usize {
self.range_fetch_concurrency
}
pub(crate) fn preload_column_index(&self) -> bool {
self.preload_column_index
}
pub(crate) fn preload_offset_index(&self) -> bool {
self.preload_offset_index
}
pub(crate) fn preload_page_index(&self) -> bool {
self.preload_page_index
}
}
pub struct ArrowReaderBuilder {
batch_size: Option<usize>,
file_io: FileIO,
concurrency_limit_data_files: usize,
row_group_filtering_enabled: bool,
row_selection_enabled: bool,
parquet_read_options: ParquetReadOptions,
}
impl ArrowReaderBuilder {
pub fn new(file_io: FileIO) -> Self {
let num_cpus = available_parallelism().get();
ArrowReaderBuilder {
batch_size: None,
file_io,
concurrency_limit_data_files: num_cpus,
row_group_filtering_enabled: true,
row_selection_enabled: false,
parquet_read_options: ParquetReadOptions::builder().build(),
}
}
pub fn with_data_file_concurrency_limit(mut self, val: usize) -> Self {
self.concurrency_limit_data_files = val;
self
}
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = Some(batch_size);
self
}
pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
self.row_group_filtering_enabled = row_group_filtering_enabled;
self
}
pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
self.row_selection_enabled = row_selection_enabled;
self
}
pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self {
self.parquet_read_options.metadata_size_hint = Some(metadata_size_hint);
self
}
pub fn with_range_coalesce_bytes(mut self, range_coalesce_bytes: u64) -> Self {
self.parquet_read_options.range_coalesce_bytes = range_coalesce_bytes;
self
}
pub fn with_range_fetch_concurrency(mut self, range_fetch_concurrency: usize) -> Self {
self.parquet_read_options.range_fetch_concurrency = range_fetch_concurrency;
self
}
pub fn build(self) -> ArrowReader {
ArrowReader {
batch_size: self.batch_size,
file_io: self.file_io.clone(),
delete_file_loader: CachingDeleteFileLoader::new(
self.file_io.clone(),
self.concurrency_limit_data_files,
),
concurrency_limit_data_files: self.concurrency_limit_data_files,
row_group_filtering_enabled: self.row_group_filtering_enabled,
row_selection_enabled: self.row_selection_enabled,
parquet_read_options: self.parquet_read_options,
}
}
}
#[derive(Clone)]
pub struct ArrowReader {
batch_size: Option<usize>,
file_io: FileIO,
delete_file_loader: CachingDeleteFileLoader,
concurrency_limit_data_files: usize,
row_group_filtering_enabled: bool,
row_selection_enabled: bool,
parquet_read_options: ParquetReadOptions,
}
impl ArrowReader {
pub fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> {
let file_io = self.file_io.clone();
let batch_size = self.batch_size;
let concurrency_limit_data_files = self.concurrency_limit_data_files;
let row_group_filtering_enabled = self.row_group_filtering_enabled;
let row_selection_enabled = self.row_selection_enabled;
let parquet_read_options = self.parquet_read_options;
let stream: ArrowRecordBatchStream = if concurrency_limit_data_files == 1 {
Box::pin(
tasks
.and_then(move |task| {
let file_io = file_io.clone();
Self::process_file_scan_task(
task,
batch_size,
file_io,
self.delete_file_loader.clone(),
row_group_filtering_enabled,
row_selection_enabled,
parquet_read_options,
)
})
.map_err(|err| {
Error::new(ErrorKind::Unexpected, "file scan task generate failed")
.with_source(err)
})
.try_flatten(),
)
} else {
Box::pin(
tasks
.map_ok(move |task| {
let file_io = file_io.clone();
Self::process_file_scan_task(
task,
batch_size,
file_io,
self.delete_file_loader.clone(),
row_group_filtering_enabled,
row_selection_enabled,
parquet_read_options,
)
})
.map_err(|err| {
Error::new(ErrorKind::Unexpected, "file scan task generate failed")
.with_source(err)
})
.try_buffer_unordered(concurrency_limit_data_files)
.try_flatten_unordered(concurrency_limit_data_files),
)
};
Ok(stream)
}
async fn process_file_scan_task(
task: FileScanTask,
batch_size: Option<usize>,
file_io: FileIO,
delete_file_loader: CachingDeleteFileLoader,
row_group_filtering_enabled: bool,
row_selection_enabled: bool,
parquet_read_options: ParquetReadOptions,
) -> Result<ArrowRecordBatchStream> {
let should_load_page_index =
(row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty();
let mut parquet_read_options = parquet_read_options;
parquet_read_options.preload_page_index = should_load_page_index;
let delete_filter_rx =
delete_file_loader.load_deletes(&task.deletes, Arc::clone(&task.schema));
let (parquet_file_reader, arrow_metadata) = Self::open_parquet_file(
&task.data_file_path,
&file_io,
task.file_size_in_bytes,
parquet_read_options,
)
.await?;
let missing_field_ids = arrow_metadata
.schema()
.fields()
.iter()
.next()
.is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none());
let arrow_metadata = if missing_field_ids {
let arrow_schema = if let Some(name_mapping) = &task.name_mapping {
apply_name_mapping_to_arrow_schema(
Arc::clone(arrow_metadata.schema()),
name_mapping,
)?
} else {
add_fallback_field_ids_to_arrow_schema(arrow_metadata.schema())
};
let options = ArrowReaderOptions::new().with_schema(arrow_schema);
ArrowReaderMetadata::try_new(Arc::clone(arrow_metadata.metadata()), options).map_err(
|e| {
Error::new(
ErrorKind::Unexpected,
"Failed to create ArrowReaderMetadata with field ID schema",
)
.with_source(e)
},
)?
} else {
arrow_metadata
};
let mut record_batch_stream_builder =
ParquetRecordBatchStreamBuilder::new_with_metadata(parquet_file_reader, arrow_metadata);
let project_field_ids_without_metadata: Vec<i32> = task
.project_field_ids
.iter()
.filter(|&&id| !is_metadata_field(id))
.copied()
.collect();
let projection_mask = Self::get_arrow_projection_mask(
&project_field_ids_without_metadata,
&task.schema,
record_batch_stream_builder.parquet_schema(),
record_batch_stream_builder.schema(),
missing_field_ids, )?;
record_batch_stream_builder =
record_batch_stream_builder.with_projection(projection_mask.clone());
let mut record_batch_transformer_builder =
RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids());
if task.project_field_ids().contains(&RESERVED_FIELD_ID_FILE) {
let file_datum = Datum::string(task.data_file_path.clone());
record_batch_transformer_builder =
record_batch_transformer_builder.with_constant(RESERVED_FIELD_ID_FILE, file_datum);
}
if let (Some(partition_spec), Some(partition_data)) =
(task.partition_spec.clone(), task.partition.clone())
{
record_batch_transformer_builder =
record_batch_transformer_builder.with_partition(partition_spec, partition_data)?;
}
let mut record_batch_transformer = record_batch_transformer_builder.build();
if let Some(batch_size) = batch_size {
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
}
let delete_filter = delete_filter_rx.await.unwrap()?;
let delete_predicate = delete_filter.build_equality_delete_predicate(&task).await?;
let final_predicate = match (&task.predicate, delete_predicate) {
(None, None) => None,
(Some(predicate), None) => Some(predicate.clone()),
(None, Some(ref predicate)) => Some(predicate.clone()),
(Some(filter_predicate), Some(delete_predicate)) => {
Some(filter_predicate.clone().and(delete_predicate))
}
};
let mut selected_row_group_indices = None;
let mut row_selection = None;
if task.start != 0 || task.length != 0 {
let byte_range_filtered_row_groups = Self::filter_row_groups_by_byte_range(
record_batch_stream_builder.metadata(),
task.start,
task.length,
)?;
selected_row_group_indices = Some(byte_range_filtered_row_groups);
}
if let Some(predicate) = final_predicate {
let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map(
record_batch_stream_builder.parquet_schema(),
&predicate,
)?;
let row_filter = Self::get_row_filter(
&predicate,
record_batch_stream_builder.parquet_schema(),
&iceberg_field_ids,
&field_id_map,
)?;
record_batch_stream_builder = record_batch_stream_builder.with_row_filter(row_filter);
if row_group_filtering_enabled {
let predicate_filtered_row_groups = Self::get_selected_row_group_indices(
&predicate,
record_batch_stream_builder.metadata(),
&field_id_map,
&task.schema,
)?;
selected_row_group_indices = match selected_row_group_indices {
Some(byte_range_filtered) => {
let intersection: Vec<usize> = byte_range_filtered
.into_iter()
.filter(|idx| predicate_filtered_row_groups.contains(idx))
.collect();
Some(intersection)
}
None => Some(predicate_filtered_row_groups),
};
}
if row_selection_enabled {
row_selection = Some(Self::get_row_selection_for_filter_predicate(
&predicate,
record_batch_stream_builder.metadata(),
&selected_row_group_indices,
&field_id_map,
&task.schema,
)?);
}
}
let positional_delete_indexes = delete_filter.get_delete_vector(&task);
if let Some(positional_delete_indexes) = positional_delete_indexes {
let delete_row_selection = {
let positional_delete_indexes = positional_delete_indexes.lock().unwrap();
Self::build_deletes_row_selection(
record_batch_stream_builder.metadata().row_groups(),
&selected_row_group_indices,
&positional_delete_indexes,
)
}?;
row_selection = match row_selection {
None => Some(delete_row_selection),
Some(filter_row_selection) => {
Some(filter_row_selection.intersection(&delete_row_selection))
}
};
}
if let Some(row_selection) = row_selection {
record_batch_stream_builder =
record_batch_stream_builder.with_row_selection(row_selection);
}
if let Some(selected_row_group_indices) = selected_row_group_indices {
record_batch_stream_builder =
record_batch_stream_builder.with_row_groups(selected_row_group_indices);
}
let record_batch_stream =
record_batch_stream_builder
.build()?
.map(move |batch| match batch {
Ok(batch) => {
record_batch_transformer.process_record_batch(batch)
}
Err(err) => Err(err.into()),
});
Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
}
pub(crate) async fn open_parquet_file(
data_file_path: &str,
file_io: &FileIO,
file_size_in_bytes: u64,
parquet_read_options: ParquetReadOptions,
) -> Result<(ArrowFileReader, ArrowReaderMetadata)> {
let parquet_file = file_io.new_input(data_file_path)?;
let parquet_reader = parquet_file.reader().await?;
let mut reader = ArrowFileReader::new(
FileMetadata {
size: file_size_in_bytes,
},
parquet_reader,
)
.with_parquet_read_options(parquet_read_options);
let arrow_metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default())
.await
.map_err(|e| {
Error::new(ErrorKind::Unexpected, "Failed to load Parquet metadata").with_source(e)
})?;
Ok((reader, arrow_metadata))
}
fn build_deletes_row_selection(
row_group_metadata_list: &[RowGroupMetaData],
selected_row_groups: &Option<Vec<usize>>,
positional_deletes: &DeleteVector,
) -> Result<RowSelection> {
let mut results: Vec<RowSelector> = Vec::new();
let mut selected_row_groups_idx = 0;
let mut current_row_group_base_idx: u64 = 0;
let mut delete_vector_iter = positional_deletes.iter();
let mut next_deleted_row_idx_opt = delete_vector_iter.next();
for (idx, row_group_metadata) in row_group_metadata_list.iter().enumerate() {
let row_group_num_rows = row_group_metadata.num_rows() as u64;
let next_row_group_base_idx = current_row_group_base_idx + row_group_num_rows;
if let Some(selected_row_groups) = selected_row_groups {
if selected_row_groups_idx == selected_row_groups.len() {
break;
}
if idx == selected_row_groups[selected_row_groups_idx] {
selected_row_groups_idx += 1;
} else {
delete_vector_iter.advance_to(next_row_group_base_idx);
if let Some(cached_idx) = next_deleted_row_idx_opt
&& cached_idx < next_row_group_base_idx
{
next_deleted_row_idx_opt = delete_vector_iter.next();
}
current_row_group_base_idx += row_group_num_rows;
continue;
}
}
let mut next_deleted_row_idx = match next_deleted_row_idx_opt {
Some(next_deleted_row_idx) => {
if next_deleted_row_idx >= next_row_group_base_idx {
results.push(RowSelector::select(row_group_num_rows as usize));
current_row_group_base_idx += row_group_num_rows;
continue;
}
next_deleted_row_idx
}
_ => {
results.push(RowSelector::select(row_group_num_rows as usize));
current_row_group_base_idx += row_group_num_rows;
continue;
}
};
let mut current_idx = current_row_group_base_idx;
'chunks: while next_deleted_row_idx < next_row_group_base_idx {
if current_idx < next_deleted_row_idx {
let run_length = next_deleted_row_idx - current_idx;
results.push(RowSelector::select(run_length as usize));
current_idx += run_length;
}
let mut run_length = 0;
while next_deleted_row_idx == current_idx
&& next_deleted_row_idx < next_row_group_base_idx
{
run_length += 1;
current_idx += 1;
next_deleted_row_idx_opt = delete_vector_iter.next();
next_deleted_row_idx = match next_deleted_row_idx_opt {
Some(next_deleted_row_idx) => next_deleted_row_idx,
_ => {
results.push(RowSelector::skip(run_length));
break 'chunks;
}
};
}
if run_length > 0 {
results.push(RowSelector::skip(run_length));
}
}
if current_idx < next_row_group_base_idx {
results.push(RowSelector::select(
(next_row_group_base_idx - current_idx) as usize,
));
}
current_row_group_base_idx += row_group_num_rows;
}
Ok(results.into())
}
fn build_field_id_set_and_map(
parquet_schema: &SchemaDescriptor,
predicate: &BoundPredicate,
) -> Result<(HashSet<i32>, HashMap<i32, usize>)> {
let mut collector = CollectFieldIdVisitor {
field_ids: HashSet::default(),
};
visit(&mut collector, predicate)?;
let iceberg_field_ids = collector.field_ids();
let field_id_map = match build_field_id_map(parquet_schema)? {
Some(map) => map,
None => build_fallback_field_id_map(parquet_schema),
};
Ok((iceberg_field_ids, field_id_map))
}
fn include_leaf_field_id(field: &NestedField, field_ids: &mut Vec<i32>) {
match field.field_type.as_ref() {
Type::Primitive(_) => {
field_ids.push(field.id);
}
Type::Struct(struct_type) => {
for nested_field in struct_type.fields() {
Self::include_leaf_field_id(nested_field, field_ids);
}
}
Type::List(list_type) => {
Self::include_leaf_field_id(&list_type.element_field, field_ids);
}
Type::Map(map_type) => {
Self::include_leaf_field_id(&map_type.key_field, field_ids);
Self::include_leaf_field_id(&map_type.value_field, field_ids);
}
}
}
fn get_arrow_projection_mask(
field_ids: &[i32],
iceberg_schema_of_task: &Schema,
parquet_schema: &SchemaDescriptor,
arrow_schema: &ArrowSchemaRef,
use_fallback: bool, ) -> Result<ProjectionMask> {
fn type_promotion_is_valid(
file_type: Option<&PrimitiveType>,
projected_type: Option<&PrimitiveType>,
) -> bool {
match (file_type, projected_type) {
(Some(lhs), Some(rhs)) if lhs == rhs => true,
(Some(PrimitiveType::Int), Some(PrimitiveType::Long)) => true,
(Some(PrimitiveType::Float), Some(PrimitiveType::Double)) => true,
(
Some(PrimitiveType::Decimal {
precision: file_precision,
scale: file_scale,
}),
Some(PrimitiveType::Decimal {
precision: requested_precision,
scale: requested_scale,
}),
) if requested_precision >= file_precision && file_scale == requested_scale => true,
(Some(PrimitiveType::Fixed(16)), Some(PrimitiveType::Uuid)) => true,
_ => false,
}
}
if field_ids.is_empty() {
return Ok(ProjectionMask::all());
}
if use_fallback {
Self::get_arrow_projection_mask_fallback(field_ids, parquet_schema)
} else {
let mut leaf_field_ids = vec![];
for field_id in field_ids {
let field = iceberg_schema_of_task.field_by_id(*field_id);
if let Some(field) = field {
Self::include_leaf_field_id(field, &mut leaf_field_ids);
}
}
Self::get_arrow_projection_mask_with_field_ids(
&leaf_field_ids,
iceberg_schema_of_task,
parquet_schema,
arrow_schema,
type_promotion_is_valid,
)
}
}
fn get_arrow_projection_mask_with_field_ids(
leaf_field_ids: &[i32],
iceberg_schema_of_task: &Schema,
parquet_schema: &SchemaDescriptor,
arrow_schema: &ArrowSchemaRef,
type_promotion_is_valid: fn(Option<&PrimitiveType>, Option<&PrimitiveType>) -> bool,
) -> Result<ProjectionMask> {
let mut column_map = HashMap::new();
let fields = arrow_schema.fields();
let mut projected_fields: HashMap<FieldRef, i32> = HashMap::new();
let projected_arrow_schema = ArrowSchema::new_with_metadata(
fields.filter_leaves(|_, f| {
f.metadata()
.get(PARQUET_FIELD_ID_META_KEY)
.and_then(|field_id| i32::from_str(field_id).ok())
.is_some_and(|field_id| {
projected_fields.insert((*f).clone(), field_id);
leaf_field_ids.contains(&field_id)
})
}),
arrow_schema.metadata().clone(),
);
let iceberg_schema = arrow_schema_to_schema(&projected_arrow_schema)?;
fields.filter_leaves(|idx, field| {
let Some(field_id) = projected_fields.get(field).cloned() else {
return false;
};
let iceberg_field = iceberg_schema_of_task.field_by_id(field_id);
let parquet_iceberg_field = iceberg_schema.field_by_id(field_id);
if iceberg_field.is_none() || parquet_iceberg_field.is_none() {
return false;
}
if !type_promotion_is_valid(
parquet_iceberg_field
.unwrap()
.field_type
.as_primitive_type(),
iceberg_field.unwrap().field_type.as_primitive_type(),
) {
return false;
}
column_map.insert(field_id, idx);
true
});
let mut indices = vec![];
for field_id in leaf_field_ids {
if let Some(col_idx) = column_map.get(field_id) {
indices.push(*col_idx);
}
}
if indices.is_empty() {
Ok(ProjectionMask::all())
} else {
Ok(ProjectionMask::leaves(parquet_schema, indices))
}
}
fn get_arrow_projection_mask_fallback(
field_ids: &[i32],
parquet_schema: &SchemaDescriptor,
) -> Result<ProjectionMask> {
let parquet_root_fields = parquet_schema.root_schema().get_fields();
let mut root_indices = vec![];
for field_id in field_ids.iter() {
let parquet_pos = (*field_id - 1) as usize;
if parquet_pos < parquet_root_fields.len() {
root_indices.push(parquet_pos);
}
}
if root_indices.is_empty() {
Ok(ProjectionMask::all())
} else {
Ok(ProjectionMask::roots(parquet_schema, root_indices))
}
}
fn get_row_filter(
predicates: &BoundPredicate,
parquet_schema: &SchemaDescriptor,
iceberg_field_ids: &HashSet<i32>,
field_id_map: &HashMap<i32, usize>,
) -> Result<RowFilter> {
let mut column_indices = iceberg_field_ids
.iter()
.filter_map(|field_id| field_id_map.get(field_id).cloned())
.collect::<Vec<_>>();
column_indices.sort();
let mut converter = PredicateConverter {
parquet_schema,
column_map: field_id_map,
column_indices: &column_indices,
};
let projection_mask = ProjectionMask::leaves(parquet_schema, column_indices.clone());
let predicate_func = visit(&mut converter, predicates)?;
let arrow_predicate = ArrowPredicateFn::new(projection_mask, predicate_func);
Ok(RowFilter::new(vec![Box::new(arrow_predicate)]))
}
fn get_selected_row_group_indices(
predicate: &BoundPredicate,
parquet_metadata: &Arc<ParquetMetaData>,
field_id_map: &HashMap<i32, usize>,
snapshot_schema: &Schema,
) -> Result<Vec<usize>> {
let row_groups_metadata = parquet_metadata.row_groups();
let mut results = Vec::with_capacity(row_groups_metadata.len());
for (idx, row_group_metadata) in row_groups_metadata.iter().enumerate() {
if RowGroupMetricsEvaluator::eval(
predicate,
row_group_metadata,
field_id_map,
snapshot_schema,
)? {
results.push(idx);
}
}
Ok(results)
}
fn get_row_selection_for_filter_predicate(
predicate: &BoundPredicate,
parquet_metadata: &Arc<ParquetMetaData>,
selected_row_groups: &Option<Vec<usize>>,
field_id_map: &HashMap<i32, usize>,
snapshot_schema: &Schema,
) -> Result<RowSelection> {
let Some(column_index) = parquet_metadata.column_index() else {
return Err(Error::new(
ErrorKind::Unexpected,
"Parquet file metadata does not contain a column index",
));
};
let Some(offset_index) = parquet_metadata.offset_index() else {
return Err(Error::new(
ErrorKind::Unexpected,
"Parquet file metadata does not contain an offset index",
));
};
if let Some(selected_row_groups) = selected_row_groups
&& selected_row_groups.is_empty()
{
return Ok(RowSelection::from(Vec::new()));
}
let mut selected_row_groups_idx = 0;
let page_index = column_index
.iter()
.enumerate()
.zip(offset_index)
.zip(parquet_metadata.row_groups());
let mut results = Vec::new();
for (((idx, column_index), offset_index), row_group_metadata) in page_index {
if let Some(selected_row_groups) = selected_row_groups {
if idx == selected_row_groups[selected_row_groups_idx] {
selected_row_groups_idx += 1;
} else {
continue;
}
}
let selections_for_page = PageIndexEvaluator::eval(
predicate,
column_index,
offset_index,
row_group_metadata,
field_id_map,
snapshot_schema,
)?;
results.push(selections_for_page);
if let Some(selected_row_groups) = selected_row_groups
&& selected_row_groups_idx == selected_row_groups.len()
{
break;
}
}
Ok(results.into_iter().flatten().collect::<Vec<_>>().into())
}
fn filter_row_groups_by_byte_range(
parquet_metadata: &Arc<ParquetMetaData>,
start: u64,
length: u64,
) -> Result<Vec<usize>> {
let row_groups = parquet_metadata.row_groups();
let mut selected = Vec::new();
let end = start + length;
let mut current_byte_offset = 4u64;
for (idx, row_group) in row_groups.iter().enumerate() {
let row_group_size = row_group.compressed_size() as u64;
let row_group_end = current_byte_offset + row_group_size;
if current_byte_offset < end && start < row_group_end {
selected.push(idx);
}
current_byte_offset = row_group_end;
}
Ok(selected)
}
}
fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result<Option<HashMap<i32, usize>>> {
let mut column_map = HashMap::new();
for (idx, field) in parquet_schema.columns().iter().enumerate() {
let field_type = field.self_type();
match field_type {
ParquetType::PrimitiveType { basic_info, .. } => {
if !basic_info.has_id() {
return Ok(None);
}
column_map.insert(basic_info.id(), idx);
}
ParquetType::GroupType { .. } => {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Leave column in schema should be primitive type but got {field_type:?}"
),
));
}
};
}
Ok(Some(column_map))
}
fn build_fallback_field_id_map(parquet_schema: &SchemaDescriptor) -> HashMap<i32, usize> {
let mut column_map = HashMap::new();
for (idx, _field) in parquet_schema.columns().iter().enumerate() {
let field_id = (idx + 1) as i32;
column_map.insert(field_id, idx);
}
column_map
}
fn apply_name_mapping_to_arrow_schema(
arrow_schema: ArrowSchemaRef,
name_mapping: &NameMapping,
) -> Result<Arc<ArrowSchema>> {
debug_assert!(
arrow_schema
.fields()
.iter()
.next()
.is_none_or(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()),
"Schema already has field IDs - name mapping should not be applied"
);
use arrow_schema::Field;
let fields_with_mapped_ids: Vec<_> = arrow_schema
.fields()
.iter()
.map(|field| {
let mapped_field_opt = name_mapping
.fields()
.iter()
.find(|f| f.names().contains(&field.name().to_string()));
let mut metadata = field.metadata().clone();
if let Some(mapped_field) = mapped_field_opt
&& let Some(field_id) = mapped_field.field_id()
{
metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string());
}
Field::new(field.name(), field.data_type().clone(), field.is_nullable())
.with_metadata(metadata)
})
.collect();
Ok(Arc::new(ArrowSchema::new_with_metadata(
fields_with_mapped_ids,
arrow_schema.metadata().clone(),
)))
}
fn add_fallback_field_ids_to_arrow_schema(arrow_schema: &ArrowSchemaRef) -> Arc<ArrowSchema> {
debug_assert!(
arrow_schema
.fields()
.iter()
.next()
.is_none_or(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()),
"Schema already has field IDs"
);
use arrow_schema::Field;
let fields_with_fallback_ids: Vec<_> = arrow_schema
.fields()
.iter()
.enumerate()
.map(|(pos, field)| {
let mut metadata = field.metadata().clone();
let field_id = (pos + 1) as i32; metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string());
Field::new(field.name(), field.data_type().clone(), field.is_nullable())
.with_metadata(metadata)
})
.collect();
Arc::new(ArrowSchema::new_with_metadata(
fields_with_fallback_ids,
arrow_schema.metadata().clone(),
))
}
struct CollectFieldIdVisitor {
field_ids: HashSet<i32>,
}
impl CollectFieldIdVisitor {
fn field_ids(self) -> HashSet<i32> {
self.field_ids
}
}
impl BoundPredicateVisitor for CollectFieldIdVisitor {
type T = ();
fn always_true(&mut self) -> Result<()> {
Ok(())
}
fn always_false(&mut self) -> Result<()> {
Ok(())
}
fn and(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
Ok(())
}
fn or(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
Ok(())
}
fn not(&mut self, _inner: ()) -> Result<()> {
Ok(())
}
fn is_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
self.field_ids.insert(reference.field().id);
Ok(())
}
fn not_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
self.field_ids.insert(reference.field().id);
Ok(())
}
fn is_nan(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
self.field_ids.insert(reference.field().id);
Ok(())
}
fn not_nan(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
self.field_ids.insert(reference.field().id);
Ok(())
}
fn less_than(
&mut self,
reference: &BoundReference,
_literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<()> {
self.field_ids.insert(reference.field().id);
Ok(())
}
fn less_than_or_eq(
&mut self,
reference: &BoundReference,
_literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<()> {
self.field_ids.insert(reference.field().id);
Ok(())
}
fn greater_than(
&mut self,
reference: &BoundReference,
_literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<()> {
self.field_ids.insert(reference.field().id);
Ok(())
}
fn greater_than_or_eq(
&mut self,
reference: &BoundReference,
_literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<()> {
self.field_ids.insert(reference.field().id);
Ok(())
}
fn eq(
&mut self,
reference: &BoundReference,
_literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<()> {
self.field_ids.insert(reference.field().id);
Ok(())
}
fn not_eq(
&mut self,
reference: &BoundReference,
_literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<()> {
self.field_ids.insert(reference.field().id);
Ok(())
}
fn starts_with(
&mut self,
reference: &BoundReference,
_literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<()> {
self.field_ids.insert(reference.field().id);
Ok(())
}
fn not_starts_with(
&mut self,
reference: &BoundReference,
_literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<()> {
self.field_ids.insert(reference.field().id);
Ok(())
}
fn r#in(
&mut self,
reference: &BoundReference,
_literals: &FnvHashSet<Datum>,
_predicate: &BoundPredicate,
) -> Result<()> {
self.field_ids.insert(reference.field().id);
Ok(())
}
fn not_in(
&mut self,
reference: &BoundReference,
_literals: &FnvHashSet<Datum>,
_predicate: &BoundPredicate,
) -> Result<()> {
self.field_ids.insert(reference.field().id);
Ok(())
}
}
struct PredicateConverter<'a> {
pub parquet_schema: &'a SchemaDescriptor,
pub column_map: &'a HashMap<i32, usize>,
pub column_indices: &'a Vec<usize>,
}
impl PredicateConverter<'_> {
fn bound_reference(&mut self, reference: &BoundReference) -> Result<Option<usize>> {
if let Some(column_idx) = self.column_map.get(&reference.field().id) {
if self.parquet_schema.get_column_root(*column_idx).is_group() {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Leave column `{}` in predicates isn't a root column in Parquet schema.",
reference.field().name
),
));
}
let index = self
.column_indices
.iter()
.position(|&idx| idx == *column_idx)
.ok_or(Error::new(
ErrorKind::DataInvalid,
format!(
"Leave column `{}` in predicates cannot be found in the required column indices.",
reference.field().name
),
))?;
Ok(Some(index))
} else {
Ok(None)
}
}
fn build_always_true(&self) -> Result<Box<PredicateResult>> {
Ok(Box::new(|batch| {
Ok(BooleanArray::from(vec![true; batch.num_rows()]))
}))
}
fn build_always_false(&self) -> Result<Box<PredicateResult>> {
Ok(Box::new(|batch| {
Ok(BooleanArray::from(vec![false; batch.num_rows()]))
}))
}
}
fn project_column(
batch: &RecordBatch,
column_idx: usize,
) -> std::result::Result<ArrayRef, ArrowError> {
let column = batch.column(column_idx);
match column.data_type() {
DataType::Struct(_) => Err(ArrowError::SchemaError(
"Does not support struct column yet.".to_string(),
)),
_ => Ok(column.clone()),
}
}
type PredicateResult =
dyn FnMut(RecordBatch) -> std::result::Result<BooleanArray, ArrowError> + Send + 'static;
impl BoundPredicateVisitor for PredicateConverter<'_> {
type T = Box<PredicateResult>;
fn always_true(&mut self) -> Result<Box<PredicateResult>> {
self.build_always_true()
}
fn always_false(&mut self) -> Result<Box<PredicateResult>> {
self.build_always_false()
}
fn and(
&mut self,
mut lhs: Box<PredicateResult>,
mut rhs: Box<PredicateResult>,
) -> Result<Box<PredicateResult>> {
Ok(Box::new(move |batch| {
let left = lhs(batch.clone())?;
let right = rhs(batch)?;
and_kleene(&left, &right)
}))
}
fn or(
&mut self,
mut lhs: Box<PredicateResult>,
mut rhs: Box<PredicateResult>,
) -> Result<Box<PredicateResult>> {
Ok(Box::new(move |batch| {
let left = lhs(batch.clone())?;
let right = rhs(batch)?;
or_kleene(&left, &right)
}))
}
fn not(&mut self, mut inner: Box<PredicateResult>) -> Result<Box<PredicateResult>> {
Ok(Box::new(move |batch| {
let pred_ret = inner(batch)?;
not(&pred_ret)
}))
}
fn is_null(
&mut self,
reference: &BoundReference,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if let Some(idx) = self.bound_reference(reference)? {
Ok(Box::new(move |batch| {
let column = project_column(&batch, idx)?;
is_null(&column)
}))
} else {
self.build_always_true()
}
}
fn not_null(
&mut self,
reference: &BoundReference,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if let Some(idx) = self.bound_reference(reference)? {
Ok(Box::new(move |batch| {
let column = project_column(&batch, idx)?;
is_not_null(&column)
}))
} else {
self.build_always_false()
}
}
fn is_nan(
&mut self,
reference: &BoundReference,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if self.bound_reference(reference)?.is_some() {
self.build_always_true()
} else {
self.build_always_false()
}
}
fn not_nan(
&mut self,
reference: &BoundReference,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if self.bound_reference(reference)?.is_some() {
self.build_always_false()
} else {
self.build_always_true()
}
}
fn less_than(
&mut self,
reference: &BoundReference,
literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if let Some(idx) = self.bound_reference(reference)? {
let literal = get_arrow_datum(literal)?;
Ok(Box::new(move |batch| {
let left = project_column(&batch, idx)?;
let literal = try_cast_literal(&literal, left.data_type())?;
lt(&left, literal.as_ref())
}))
} else {
self.build_always_true()
}
}
fn less_than_or_eq(
&mut self,
reference: &BoundReference,
literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if let Some(idx) = self.bound_reference(reference)? {
let literal = get_arrow_datum(literal)?;
Ok(Box::new(move |batch| {
let left = project_column(&batch, idx)?;
let literal = try_cast_literal(&literal, left.data_type())?;
lt_eq(&left, literal.as_ref())
}))
} else {
self.build_always_true()
}
}
fn greater_than(
&mut self,
reference: &BoundReference,
literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if let Some(idx) = self.bound_reference(reference)? {
let literal = get_arrow_datum(literal)?;
Ok(Box::new(move |batch| {
let left = project_column(&batch, idx)?;
let literal = try_cast_literal(&literal, left.data_type())?;
gt(&left, literal.as_ref())
}))
} else {
self.build_always_false()
}
}
fn greater_than_or_eq(
&mut self,
reference: &BoundReference,
literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if let Some(idx) = self.bound_reference(reference)? {
let literal = get_arrow_datum(literal)?;
Ok(Box::new(move |batch| {
let left = project_column(&batch, idx)?;
let literal = try_cast_literal(&literal, left.data_type())?;
gt_eq(&left, literal.as_ref())
}))
} else {
self.build_always_false()
}
}
fn eq(
&mut self,
reference: &BoundReference,
literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if let Some(idx) = self.bound_reference(reference)? {
let literal = get_arrow_datum(literal)?;
Ok(Box::new(move |batch| {
let left = project_column(&batch, idx)?;
let literal = try_cast_literal(&literal, left.data_type())?;
eq(&left, literal.as_ref())
}))
} else {
self.build_always_false()
}
}
fn not_eq(
&mut self,
reference: &BoundReference,
literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if let Some(idx) = self.bound_reference(reference)? {
let literal = get_arrow_datum(literal)?;
Ok(Box::new(move |batch| {
let left = project_column(&batch, idx)?;
let literal = try_cast_literal(&literal, left.data_type())?;
neq(&left, literal.as_ref())
}))
} else {
self.build_always_false()
}
}
fn starts_with(
&mut self,
reference: &BoundReference,
literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if let Some(idx) = self.bound_reference(reference)? {
let literal = get_arrow_datum(literal)?;
Ok(Box::new(move |batch| {
let left = project_column(&batch, idx)?;
let literal = try_cast_literal(&literal, left.data_type())?;
starts_with(&left, literal.as_ref())
}))
} else {
self.build_always_false()
}
}
fn not_starts_with(
&mut self,
reference: &BoundReference,
literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if let Some(idx) = self.bound_reference(reference)? {
let literal = get_arrow_datum(literal)?;
Ok(Box::new(move |batch| {
let left = project_column(&batch, idx)?;
let literal = try_cast_literal(&literal, left.data_type())?;
not(&starts_with(&left, literal.as_ref())?)
}))
} else {
self.build_always_true()
}
}
fn r#in(
&mut self,
reference: &BoundReference,
literals: &FnvHashSet<Datum>,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if let Some(idx) = self.bound_reference(reference)? {
let literals: Vec<_> = literals
.iter()
.map(|lit| get_arrow_datum(lit).unwrap())
.collect();
Ok(Box::new(move |batch| {
let left = project_column(&batch, idx)?;
let mut acc = BooleanArray::from(vec![false; batch.num_rows()]);
for literal in &literals {
let literal = try_cast_literal(literal, left.data_type())?;
acc = or(&acc, &eq(&left, literal.as_ref())?)?
}
Ok(acc)
}))
} else {
self.build_always_false()
}
}
fn not_in(
&mut self,
reference: &BoundReference,
literals: &FnvHashSet<Datum>,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if let Some(idx) = self.bound_reference(reference)? {
let literals: Vec<_> = literals
.iter()
.map(|lit| get_arrow_datum(lit).unwrap())
.collect();
Ok(Box::new(move |batch| {
let left = project_column(&batch, idx)?;
let mut acc = BooleanArray::from(vec![true; batch.num_rows()]);
for literal in &literals {
let literal = try_cast_literal(literal, left.data_type())?;
acc = and(&acc, &neq(&left, literal.as_ref())?)?
}
Ok(acc)
}))
} else {
self.build_always_true()
}
}
}
pub struct ArrowFileReader {
meta: FileMetadata,
parquet_read_options: ParquetReadOptions,
r: Box<dyn FileRead>,
}
impl ArrowFileReader {
pub fn new(meta: FileMetadata, r: Box<dyn FileRead>) -> Self {
Self {
meta,
parquet_read_options: ParquetReadOptions::builder().build(),
r,
}
}
pub(crate) fn with_parquet_read_options(mut self, options: ParquetReadOptions) -> Self {
self.parquet_read_options = options;
self
}
}
impl AsyncFileReader for ArrowFileReader {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
Box::pin(
self.r
.read(range.start..range.end)
.map_err(|err| parquet::errors::ParquetError::External(Box::new(err))),
)
}
fn get_byte_ranges(
&mut self,
ranges: Vec<Range<u64>>,
) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>> {
let coalesce_bytes = self.parquet_read_options.range_coalesce_bytes();
let concurrency = self.parquet_read_options.range_fetch_concurrency().max(1);
async move {
let fetch_ranges = merge_ranges(&ranges, coalesce_bytes);
let r = &self.r;
let fetched: Vec<Bytes> = futures::stream::iter(fetch_ranges.iter().cloned())
.map(|range| async move {
r.read(range)
.await
.map_err(|e| parquet::errors::ParquetError::External(Box::new(e)))
})
.buffered(concurrency)
.try_collect()
.await?;
Ok(ranges
.iter()
.map(|range| {
let idx = fetch_ranges.partition_point(|v| v.start <= range.start) - 1;
let fetch_range = &fetch_ranges[idx];
let fetch_bytes = &fetched[idx];
let start = (range.start - fetch_range.start) as usize;
let end = (range.end - fetch_range.start) as usize;
fetch_bytes.slice(start..end.min(fetch_bytes.len()))
})
.collect())
}
.boxed()
}
fn get_metadata(
&mut self,
_options: Option<&'_ ArrowReaderOptions>,
) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
async move {
let reader = ParquetMetaDataReader::new()
.with_prefetch_hint(self.parquet_read_options.metadata_size_hint())
.with_page_index_policy(PageIndexPolicy::from(
self.parquet_read_options.preload_page_index(),
))
.with_column_index_policy(PageIndexPolicy::from(
self.parquet_read_options.preload_column_index(),
))
.with_offset_index_policy(PageIndexPolicy::from(
self.parquet_read_options.preload_offset_index(),
));
let size = self.meta.size;
let meta = reader.load_and_finish(self, size).await?;
Ok(Arc::new(meta))
}
.boxed()
}
}
fn merge_ranges(ranges: &[Range<u64>], coalesce: u64) -> Vec<Range<u64>> {
if ranges.is_empty() {
return vec![];
}
let mut ranges = ranges.to_vec();
ranges.sort_unstable_by_key(|r| r.start);
let mut merged = Vec::with_capacity(ranges.len());
let mut start_idx = 0;
let mut end_idx = 1;
while start_idx != ranges.len() {
let mut range_end = ranges[start_idx].end;
while end_idx != ranges.len()
&& ranges[end_idx]
.start
.checked_sub(range_end)
.map(|delta| delta <= coalesce)
.unwrap_or(true)
{
range_end = range_end.max(ranges[end_idx].end);
end_idx += 1;
}
merged.push(ranges[start_idx].start..range_end);
start_idx = end_idx;
end_idx += 1;
}
merged
}
fn try_cast_literal(
literal: &Arc<dyn ArrowDatum + Send + Sync>,
column_type: &DataType,
) -> std::result::Result<Arc<dyn ArrowDatum + Send + Sync>, ArrowError> {
let literal_array = literal.get().0;
if literal_array.data_type() == column_type {
return Ok(Arc::clone(literal));
}
let literal_array = cast(literal_array, column_type)?;
Ok(Arc::new(Scalar::new(literal_array)))
}
#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
use std::fs::File;
use std::ops::Range;
use std::sync::Arc;
use arrow_array::cast::AsArray;
use arrow_array::{ArrayRef, LargeStringArray, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
use futures::TryStreamExt;
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use parquet::arrow::{ArrowWriter, ProjectionMask};
use parquet::basic::Compression;
use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
use parquet::file::properties::WriterProperties;
use parquet::schema::parser::parse_message_type;
use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
use roaring::RoaringTreemap;
use tempfile::TempDir;
use crate::ErrorKind;
use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY};
use crate::arrow::{ArrowReader, ArrowReaderBuilder};
use crate::delete_vector::DeleteVector;
use crate::expr::visitors::bound_predicate_visitor::visit;
use crate::expr::{Bind, Predicate, Reference};
use crate::io::FileIO;
use crate::scan::{FileScanTask, FileScanTaskDeleteFile, FileScanTaskStream};
use crate::spec::{
DataContentType, DataFileFormat, Datum, NestedField, PrimitiveType, Schema, SchemaRef, Type,
};
fn table_schema_simple() -> SchemaRef {
Arc::new(
Schema::builder()
.with_schema_id(1)
.with_identifier_field_ids(vec![2])
.with_fields(vec![
NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
NestedField::optional(4, "qux", Type::Primitive(PrimitiveType::Float)).into(),
])
.build()
.unwrap(),
)
}
#[test]
fn test_collect_field_id() {
let schema = table_schema_simple();
let expr = Reference::new("qux").is_null();
let bound_expr = expr.bind(schema, true).unwrap();
let mut visitor = CollectFieldIdVisitor {
field_ids: HashSet::default(),
};
visit(&mut visitor, &bound_expr).unwrap();
let mut expected = HashSet::default();
expected.insert(4_i32);
assert_eq!(visitor.field_ids, expected);
}
#[test]
fn test_collect_field_id_with_and() {
let schema = table_schema_simple();
let expr = Reference::new("qux")
.is_null()
.and(Reference::new("baz").is_null());
let bound_expr = expr.bind(schema, true).unwrap();
let mut visitor = CollectFieldIdVisitor {
field_ids: HashSet::default(),
};
visit(&mut visitor, &bound_expr).unwrap();
let mut expected = HashSet::default();
expected.insert(4_i32);
expected.insert(3);
assert_eq!(visitor.field_ids, expected);
}
#[test]
fn test_collect_field_id_with_or() {
let schema = table_schema_simple();
let expr = Reference::new("qux")
.is_null()
.or(Reference::new("baz").is_null());
let bound_expr = expr.bind(schema, true).unwrap();
let mut visitor = CollectFieldIdVisitor {
field_ids: HashSet::default(),
};
visit(&mut visitor, &bound_expr).unwrap();
let mut expected = HashSet::default();
expected.insert(4_i32);
expected.insert(3);
assert_eq!(visitor.field_ids, expected);
}
#[test]
fn test_arrow_projection_mask() {
let schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_identifier_field_ids(vec![1])
.with_fields(vec![
NestedField::required(1, "c1", Type::Primitive(PrimitiveType::String)).into(),
NestedField::optional(2, "c2", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(
3,
"c3",
Type::Primitive(PrimitiveType::Decimal {
precision: 38,
scale: 3,
}),
)
.into(),
])
.build()
.unwrap(),
);
let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("c1", DataType::Utf8, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)])),
Field::new("c2", DataType::Duration(TimeUnit::Microsecond), true).with_metadata(
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
),
Field::new("c3", DataType::Decimal128(39, 3), true).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"3".to_string(),
)])),
]));
let message_type = "
message schema {
required binary c1 (STRING) = 1;
optional int32 c2 (INTEGER(8,true)) = 2;
optional fixed_len_byte_array(17) c3 (DECIMAL(39,3)) = 3;
}
";
let parquet_type = parse_message_type(message_type).expect("should parse schema");
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_type));
let err = ArrowReader::get_arrow_projection_mask(
&[1, 2, 3],
&schema,
&parquet_schema,
&arrow_schema,
false,
)
.unwrap_err();
assert_eq!(err.kind(), ErrorKind::DataInvalid);
assert_eq!(
err.to_string(),
"DataInvalid => Unsupported Arrow data type: Duration(µs)".to_string()
);
let err = ArrowReader::get_arrow_projection_mask(
&[1, 3],
&schema,
&parquet_schema,
&arrow_schema,
false,
)
.unwrap_err();
assert_eq!(err.kind(), ErrorKind::DataInvalid);
assert_eq!(
err.to_string(),
"DataInvalid => Failed to create decimal type, source: DataInvalid => Decimals with precision larger than 38 are not supported: 39".to_string()
);
let mask = ArrowReader::get_arrow_projection_mask(
&[1],
&schema,
&parquet_schema,
&arrow_schema,
false,
)
.expect("Some ProjectionMask");
assert_eq!(mask, ProjectionMask::leaves(&parquet_schema, vec![0]));
}
#[tokio::test]
async fn test_kleene_logic_or_behaviour() {
let predicate = Reference::new("a")
.is_null()
.or(Reference::new("a").equal_to(Datum::string("foo")));
let data_for_col_a = vec![None, Some("foo".to_string()), Some("bar".to_string())];
let expected = vec![None, Some("foo".to_string())];
let (file_io, schema, table_location, _temp_dir) =
setup_kleene_logic(data_for_col_a, DataType::Utf8);
let reader = ArrowReaderBuilder::new(file_io).build();
let result_data = test_perform_read(predicate, schema, table_location, reader).await;
assert_eq!(result_data, expected);
}
#[tokio::test]
async fn test_kleene_logic_and_behaviour() {
let predicate = Reference::new("a")
.is_not_null()
.and(Reference::new("a").not_equal_to(Datum::string("foo")));
let data_for_col_a = vec![None, Some("foo".to_string()), Some("bar".to_string())];
let expected = vec![Some("bar".to_string())];
let (file_io, schema, table_location, _temp_dir) =
setup_kleene_logic(data_for_col_a, DataType::Utf8);
let reader = ArrowReaderBuilder::new(file_io).build();
let result_data = test_perform_read(predicate, schema, table_location, reader).await;
assert_eq!(result_data, expected);
}
#[tokio::test]
async fn test_predicate_cast_literal() {
let predicates = vec![
(Reference::new("a").equal_to(Datum::string("foo")), vec![
Some("foo".to_string()),
]),
(
Reference::new("a").not_equal_to(Datum::string("foo")),
vec![Some("bar".to_string())],
),
(Reference::new("a").starts_with(Datum::string("f")), vec![
Some("foo".to_string()),
]),
(
Reference::new("a").not_starts_with(Datum::string("f")),
vec![Some("bar".to_string())],
),
(Reference::new("a").less_than(Datum::string("foo")), vec![
Some("bar".to_string()),
]),
(
Reference::new("a").less_than_or_equal_to(Datum::string("foo")),
vec![Some("foo".to_string()), Some("bar".to_string())],
),
(
Reference::new("a").greater_than(Datum::string("bar")),
vec![Some("foo".to_string())],
),
(
Reference::new("a").greater_than_or_equal_to(Datum::string("foo")),
vec![Some("foo".to_string())],
),
(
Reference::new("a").is_in([Datum::string("foo"), Datum::string("baz")]),
vec![Some("foo".to_string())],
),
(
Reference::new("a").is_not_in([Datum::string("foo"), Datum::string("baz")]),
vec![Some("bar".to_string())],
),
];
let data_for_col_a = vec![Some("foo".to_string()), Some("bar".to_string())];
let (file_io, schema, table_location, _temp_dir) =
setup_kleene_logic(data_for_col_a, DataType::LargeUtf8);
let reader = ArrowReaderBuilder::new(file_io).build();
for (predicate, expected) in predicates {
println!("testing predicate {predicate}");
let result_data = test_perform_read(
predicate.clone(),
schema.clone(),
table_location.clone(),
reader.clone(),
)
.await;
assert_eq!(result_data, expected, "predicate={predicate}");
}
}
async fn test_perform_read(
predicate: Predicate,
schema: SchemaRef,
table_location: String,
reader: ArrowReader,
) -> Vec<Option<String>> {
let tasks = Box::pin(futures::stream::iter(
vec![Ok(FileScanTask {
file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
.unwrap()
.len(),
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{table_location}/1.parquet"),
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1],
predicate: Some(predicate.bind(schema, true).unwrap()),
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
})]
.into_iter(),
)) as FileScanTaskStream;
let result = reader
.read(tasks)
.unwrap()
.try_collect::<Vec<RecordBatch>>()
.await
.unwrap();
result[0].columns()[0]
.as_string_opt::<i32>()
.unwrap()
.iter()
.map(|v| v.map(ToOwned::to_owned))
.collect::<Vec<_>>()
}
fn setup_kleene_logic(
data_for_col_a: Vec<Option<String>>,
col_a_type: DataType,
) -> (FileIO, SchemaRef, String, TempDir) {
let schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::optional(1, "a", Type::Primitive(PrimitiveType::String)).into(),
])
.build()
.unwrap(),
);
let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("a", col_a_type.clone(), true).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)])),
]));
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().to_str().unwrap().to_string();
let file_io = FileIO::new_with_fs();
let col = match col_a_type {
DataType::Utf8 => Arc::new(StringArray::from(data_for_col_a)) as ArrayRef,
DataType::LargeUtf8 => Arc::new(LargeStringArray::from(data_for_col_a)) as ArrayRef,
_ => panic!("unexpected col_a_type"),
};
let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col]).unwrap();
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let file = File::create(format!("{table_location}/1.parquet")).unwrap();
let mut writer =
ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
writer.write(&to_write).expect("Writing batch");
writer.close().unwrap();
(file_io, schema, table_location, tmp_dir)
}
#[test]
fn test_build_deletes_row_selection() {
let schema_descr = get_test_schema_descr();
let mut columns = vec![];
for ptr in schema_descr.columns() {
let column = ColumnChunkMetaData::builder(ptr.clone()).build().unwrap();
columns.push(column);
}
let row_groups_metadata = vec![
build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 0),
build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 1),
build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 2),
build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 3),
build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 4),
];
let selected_row_groups = Some(vec![1, 3]);
let positional_deletes = RoaringTreemap::from_iter(&[
1, 3, 4, 5, 998, 999, 1000, 1010, 1011, 1012, 1498, 1499, 1500, 1501, 1600, 1999, 2000, 2001, 2100, 2200, 2201, 2202, 2999, 3000, ]);
let positional_deletes = DeleteVector::new(positional_deletes);
let result = ArrowReader::build_deletes_row_selection(
&row_groups_metadata,
&selected_row_groups,
&positional_deletes,
)
.unwrap();
let expected = RowSelection::from(vec![
RowSelector::skip(1),
RowSelector::select(9),
RowSelector::skip(3),
RowSelector::select(485),
RowSelector::skip(4),
RowSelector::select(98),
RowSelector::skip(1),
RowSelector::select(99),
RowSelector::skip(3),
RowSelector::select(796),
RowSelector::skip(1),
]);
assert_eq!(result, expected);
let result = ArrowReader::build_deletes_row_selection(
&row_groups_metadata,
&None,
&positional_deletes,
)
.unwrap();
let expected = RowSelection::from(vec![
RowSelector::select(1),
RowSelector::skip(1),
RowSelector::select(1),
RowSelector::skip(3),
RowSelector::select(992),
RowSelector::skip(3),
RowSelector::select(9),
RowSelector::skip(3),
RowSelector::select(485),
RowSelector::skip(4),
RowSelector::select(98),
RowSelector::skip(1),
RowSelector::select(398),
RowSelector::skip(3),
RowSelector::select(98),
RowSelector::skip(1),
RowSelector::select(99),
RowSelector::skip(3),
RowSelector::select(796),
RowSelector::skip(2),
RowSelector::select(499),
]);
assert_eq!(result, expected);
}
fn build_test_row_group_meta(
schema_descr: SchemaDescPtr,
columns: Vec<ColumnChunkMetaData>,
num_rows: i64,
ordinal: i16,
) -> RowGroupMetaData {
RowGroupMetaData::builder(schema_descr.clone())
.set_num_rows(num_rows)
.set_total_byte_size(2000)
.set_column_metadata(columns)
.set_ordinal(ordinal)
.build()
.unwrap()
}
fn get_test_schema_descr() -> SchemaDescPtr {
use parquet::schema::types::Type as SchemaType;
let schema = SchemaType::group_type_builder("schema")
.with_fields(vec![
Arc::new(
SchemaType::primitive_type_builder("a", parquet::basic::Type::INT32)
.build()
.unwrap(),
),
Arc::new(
SchemaType::primitive_type_builder("b", parquet::basic::Type::INT32)
.build()
.unwrap(),
),
])
.build()
.unwrap();
Arc::new(SchemaDescriptor::new(Arc::new(schema)))
}
#[tokio::test]
async fn test_file_splits_respect_byte_ranges() {
use arrow_array::Int32Array;
use parquet::file::reader::{FileReader, SerializedFileReader};
let schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
])
.build()
.unwrap(),
);
let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)])),
]));
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().to_str().unwrap().to_string();
let file_path = format!("{table_location}/multi_row_group.parquet");
let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
(0..100).collect::<Vec<i32>>(),
))])
.unwrap();
let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
(100..200).collect::<Vec<i32>>(),
))])
.unwrap();
let batch3 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
(200..300).collect::<Vec<i32>>(),
))])
.unwrap();
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.set_max_row_group_size(100)
.build();
let file = File::create(&file_path).unwrap();
let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
writer.write(&batch1).expect("Writing batch 1");
writer.write(&batch2).expect("Writing batch 2");
writer.write(&batch3).expect("Writing batch 3");
writer.close().unwrap();
let file = File::open(&file_path).unwrap();
let reader = SerializedFileReader::new(file).unwrap();
let metadata = reader.metadata();
println!("File has {} row groups", metadata.num_row_groups());
assert_eq!(metadata.num_row_groups(), 3, "Expected 3 row groups");
let row_group_0 = metadata.row_group(0);
let row_group_1 = metadata.row_group(1);
let row_group_2 = metadata.row_group(2);
let rg0_start = 4u64; let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
let rg2_start = rg1_start + row_group_1.compressed_size() as u64;
let file_end = rg2_start + row_group_2.compressed_size() as u64;
println!(
"Row group 0: {} rows, starts at byte {}, {} bytes compressed",
row_group_0.num_rows(),
rg0_start,
row_group_0.compressed_size()
);
println!(
"Row group 1: {} rows, starts at byte {}, {} bytes compressed",
row_group_1.num_rows(),
rg1_start,
row_group_1.compressed_size()
);
println!(
"Row group 2: {} rows, starts at byte {}, {} bytes compressed",
row_group_2.num_rows(),
rg2_start,
row_group_2.compressed_size()
);
let file_io = FileIO::new_with_fs();
let reader = ArrowReaderBuilder::new(file_io).build();
let task1 = FileScanTask {
file_size_in_bytes: std::fs::metadata(&file_path).unwrap().len(),
start: rg0_start,
length: row_group_0.compressed_size() as u64,
record_count: Some(100),
data_file_path: file_path.clone(),
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1],
predicate: None,
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
};
let task2 = FileScanTask {
file_size_in_bytes: std::fs::metadata(&file_path).unwrap().len(),
start: rg1_start,
length: file_end - rg1_start,
record_count: Some(200),
data_file_path: file_path.clone(),
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1],
predicate: None,
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
};
let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as FileScanTaskStream;
let result1 = reader
.clone()
.read(tasks1)
.unwrap()
.try_collect::<Vec<RecordBatch>>()
.await
.unwrap();
let total_rows_task1: usize = result1.iter().map(|b| b.num_rows()).sum();
println!(
"Task 1 (bytes {}-{}) returned {} rows",
rg0_start,
rg0_start + row_group_0.compressed_size() as u64,
total_rows_task1
);
let tasks2 = Box::pin(futures::stream::iter(vec![Ok(task2)])) as FileScanTaskStream;
let result2 = reader
.read(tasks2)
.unwrap()
.try_collect::<Vec<RecordBatch>>()
.await
.unwrap();
let total_rows_task2: usize = result2.iter().map(|b| b.num_rows()).sum();
println!("Task 2 (bytes {rg1_start}-{file_end}) returned {total_rows_task2} rows");
assert_eq!(
total_rows_task1, 100,
"Task 1 should read only the first row group (100 rows), but got {total_rows_task1} rows"
);
assert_eq!(
total_rows_task2, 200,
"Task 2 should read only the second+third row groups (200 rows), but got {total_rows_task2} rows"
);
if total_rows_task1 > 0 {
let first_batch = &result1[0];
let id_col = first_batch
.column(0)
.as_primitive::<arrow_array::types::Int32Type>();
let first_val = id_col.value(0);
let last_val = id_col.value(id_col.len() - 1);
println!("Task 1 data range: {first_val} to {last_val}");
assert_eq!(first_val, 0, "Task 1 should start with id=0");
assert_eq!(last_val, 99, "Task 1 should end with id=99");
}
if total_rows_task2 > 0 {
let first_batch = &result2[0];
let id_col = first_batch
.column(0)
.as_primitive::<arrow_array::types::Int32Type>();
let first_val = id_col.value(0);
println!("Task 2 first value: {first_val}");
assert_eq!(first_val, 100, "Task 2 should start with id=100, not id=0");
}
}
#[tokio::test]
async fn test_schema_evolution_add_column() {
use arrow_array::{Array, Int32Array};
let new_schema = Arc::new(
Schema::builder()
.with_schema_id(2)
.with_fields(vec![
NestedField::required(1, "a", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
])
.build()
.unwrap(),
);
let arrow_schema_old = Arc::new(ArrowSchema::new(vec![
Field::new("a", DataType::Int32, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)])),
]));
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().to_str().unwrap().to_string();
let file_io = FileIO::new_with_fs();
let data_a = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
let to_write = RecordBatch::try_new(arrow_schema_old.clone(), vec![data_a]).unwrap();
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let file = File::create(format!("{table_location}/old_file.parquet")).unwrap();
let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
writer.write(&to_write).expect("Writing batch");
writer.close().unwrap();
let reader = ArrowReaderBuilder::new(file_io).build();
let tasks = Box::pin(futures::stream::iter(
vec![Ok(FileScanTask {
file_size_in_bytes: std::fs::metadata(format!("{table_location}/old_file.parquet"))
.unwrap()
.len(),
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{table_location}/old_file.parquet"),
data_file_format: DataFileFormat::Parquet,
schema: new_schema.clone(),
project_field_ids: vec![1, 2], predicate: None,
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
})]
.into_iter(),
)) as FileScanTaskStream;
let result = reader
.read(tasks)
.unwrap()
.try_collect::<Vec<RecordBatch>>()
.await
.unwrap();
assert_eq!(result.len(), 1);
let batch = &result[0];
assert_eq!(batch.num_columns(), 2);
assert_eq!(batch.num_rows(), 3);
let col_a = batch
.column(0)
.as_primitive::<arrow_array::types::Int32Type>();
assert_eq!(col_a.values(), &[1, 2, 3]);
let col_b = batch
.column(1)
.as_primitive::<arrow_array::types::Int32Type>();
assert_eq!(col_b.null_count(), 3);
assert!(col_b.is_null(0));
assert!(col_b.is_null(1));
assert!(col_b.is_null(2));
}
#[tokio::test]
async fn test_position_delete_across_multiple_row_groups() {
use arrow_array::{Int32Array, Int64Array};
use parquet::file::reader::{FileReader, SerializedFileReader};
const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().to_str().unwrap().to_string();
let table_schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
])
.build()
.unwrap(),
);
let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)])),
]));
let data_file_path = format!("{table_location}/data.parquet");
let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
Int32Array::from_iter_values(1..=100),
)])
.unwrap();
let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
Int32Array::from_iter_values(101..=200),
)])
.unwrap();
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.set_max_row_group_size(100)
.build();
let file = File::create(&data_file_path).unwrap();
let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
writer.write(&batch1).expect("Writing batch 1");
writer.write(&batch2).expect("Writing batch 2");
writer.close().unwrap();
let verify_file = File::open(&data_file_path).unwrap();
let verify_reader = SerializedFileReader::new(verify_file).unwrap();
assert_eq!(
verify_reader.metadata().num_row_groups(),
2,
"Should have 2 row groups"
);
let delete_file_path = format!("{table_location}/deletes.parquet");
let delete_schema = Arc::new(ArrowSchema::new(vec![
Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
)])),
Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
)])),
]));
let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
Arc::new(Int64Array::from_iter_values(vec![199i64])),
])
.unwrap();
let delete_props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let delete_file = File::create(&delete_file_path).unwrap();
let mut delete_writer =
ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
delete_writer.write(&delete_batch).unwrap();
delete_writer.close().unwrap();
let file_io = FileIO::new_with_fs();
let reader = ArrowReaderBuilder::new(file_io).build();
let task = FileScanTask {
file_size_in_bytes: std::fs::metadata(&data_file_path).unwrap().len(),
start: 0,
length: 0,
record_count: Some(200),
data_file_path: data_file_path.clone(),
data_file_format: DataFileFormat::Parquet,
schema: table_schema.clone(),
project_field_ids: vec![1],
predicate: None,
deletes: vec![FileScanTaskDeleteFile {
file_size_in_bytes: std::fs::metadata(&delete_file_path).unwrap().len(),
file_path: delete_file_path,
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
equality_ids: None,
}],
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
};
let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
let result = reader
.read(tasks)
.unwrap()
.try_collect::<Vec<RecordBatch>>()
.await
.unwrap();
let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
println!("Total rows read: {total_rows}");
println!("Expected: 199 rows (deleted row 199 which had id=200)");
assert_eq!(
total_rows, 199,
"Expected 199 rows after deleting row 199, but got {total_rows} rows. \
The bug causes position deletes in later row groups to be ignored."
);
let all_ids: Vec<i32> = result
.iter()
.flat_map(|batch| {
batch
.column(0)
.as_primitive::<arrow_array::types::Int32Type>()
.values()
.iter()
.copied()
})
.collect();
assert!(
!all_ids.contains(&200),
"Row with id=200 should be deleted but was found in results"
);
let expected_ids: Vec<i32> = (1..=199).collect();
assert_eq!(
all_ids, expected_ids,
"Should have ids 1-199 but got different values"
);
}
#[tokio::test]
async fn test_position_delete_with_row_group_selection() {
use arrow_array::{Int32Array, Int64Array};
use parquet::file::reader::{FileReader, SerializedFileReader};
const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().to_str().unwrap().to_string();
let table_schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
])
.build()
.unwrap(),
);
let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)])),
]));
let data_file_path = format!("{table_location}/data.parquet");
let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
Int32Array::from_iter_values(1..=100),
)])
.unwrap();
let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
Int32Array::from_iter_values(101..=200),
)])
.unwrap();
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.set_max_row_group_size(100)
.build();
let file = File::create(&data_file_path).unwrap();
let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
writer.write(&batch1).expect("Writing batch 1");
writer.write(&batch2).expect("Writing batch 2");
writer.close().unwrap();
let verify_file = File::open(&data_file_path).unwrap();
let verify_reader = SerializedFileReader::new(verify_file).unwrap();
assert_eq!(
verify_reader.metadata().num_row_groups(),
2,
"Should have 2 row groups"
);
let delete_file_path = format!("{table_location}/deletes.parquet");
let delete_schema = Arc::new(ArrowSchema::new(vec![
Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
)])),
Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
)])),
]));
let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
Arc::new(Int64Array::from_iter_values(vec![199i64])),
])
.unwrap();
let delete_props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let delete_file = File::create(&delete_file_path).unwrap();
let mut delete_writer =
ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
delete_writer.write(&delete_batch).unwrap();
delete_writer.close().unwrap();
let metadata_file = File::open(&data_file_path).unwrap();
let metadata_reader = SerializedFileReader::new(metadata_file).unwrap();
let metadata = metadata_reader.metadata();
let row_group_0 = metadata.row_group(0);
let row_group_1 = metadata.row_group(1);
let rg0_start = 4u64; let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
let rg1_length = row_group_1.compressed_size() as u64;
println!(
"Row group 0: starts at byte {}, {} bytes compressed",
rg0_start,
row_group_0.compressed_size()
);
println!(
"Row group 1: starts at byte {}, {} bytes compressed",
rg1_start,
row_group_1.compressed_size()
);
let file_io = FileIO::new_with_fs();
let reader = ArrowReaderBuilder::new(file_io).build();
let task = FileScanTask {
file_size_in_bytes: std::fs::metadata(&data_file_path).unwrap().len(),
start: rg1_start,
length: rg1_length,
record_count: Some(100), data_file_path: data_file_path.clone(),
data_file_format: DataFileFormat::Parquet,
schema: table_schema.clone(),
project_field_ids: vec![1],
predicate: None,
deletes: vec![FileScanTaskDeleteFile {
file_size_in_bytes: std::fs::metadata(&delete_file_path).unwrap().len(),
file_path: delete_file_path,
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
equality_ids: None,
}],
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
};
let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
let result = reader
.read(tasks)
.unwrap()
.try_collect::<Vec<RecordBatch>>()
.await
.unwrap();
let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
println!("Total rows read from row group 1: {total_rows}");
println!("Expected: 99 rows (row group 1 has 100 rows, 1 delete at position 199)");
assert_eq!(
total_rows, 99,
"Expected 99 rows from row group 1 after deleting position 199, but got {total_rows} rows. \
The bug causes position deletes to be lost when advance_to() is followed by next() \
when skipping unselected row groups."
);
let all_ids: Vec<i32> = result
.iter()
.flat_map(|batch| {
batch
.column(0)
.as_primitive::<arrow_array::types::Int32Type>()
.values()
.iter()
.copied()
})
.collect();
assert!(
!all_ids.contains(&200),
"Row with id=200 should be deleted but was found in results"
);
let expected_ids: Vec<i32> = (101..=199).collect();
assert_eq!(
all_ids, expected_ids,
"Should have ids 101-199 but got different values"
);
}
#[tokio::test]
async fn test_position_delete_in_skipped_row_group() {
use arrow_array::{Int32Array, Int64Array};
use parquet::file::reader::{FileReader, SerializedFileReader};
const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().to_str().unwrap().to_string();
let table_schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
])
.build()
.unwrap(),
);
let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)])),
]));
let data_file_path = format!("{table_location}/data.parquet");
let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
Int32Array::from_iter_values(1..=100),
)])
.unwrap();
let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
Int32Array::from_iter_values(101..=200),
)])
.unwrap();
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.set_max_row_group_size(100)
.build();
let file = File::create(&data_file_path).unwrap();
let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
writer.write(&batch1).expect("Writing batch 1");
writer.write(&batch2).expect("Writing batch 2");
writer.close().unwrap();
let verify_file = File::open(&data_file_path).unwrap();
let verify_reader = SerializedFileReader::new(verify_file).unwrap();
assert_eq!(
verify_reader.metadata().num_row_groups(),
2,
"Should have 2 row groups"
);
let delete_file_path = format!("{table_location}/deletes.parquet");
let delete_schema = Arc::new(ArrowSchema::new(vec![
Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
)])),
Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
)])),
]));
let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
Arc::new(Int64Array::from_iter_values(vec![0i64])),
])
.unwrap();
let delete_props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let delete_file = File::create(&delete_file_path).unwrap();
let mut delete_writer =
ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
delete_writer.write(&delete_batch).unwrap();
delete_writer.close().unwrap();
let metadata_file = File::open(&data_file_path).unwrap();
let metadata_reader = SerializedFileReader::new(metadata_file).unwrap();
let metadata = metadata_reader.metadata();
let row_group_0 = metadata.row_group(0);
let row_group_1 = metadata.row_group(1);
let rg0_start = 4u64; let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
let rg1_length = row_group_1.compressed_size() as u64;
let file_io = FileIO::new_with_fs();
let reader = ArrowReaderBuilder::new(file_io).build();
let task = FileScanTask {
file_size_in_bytes: std::fs::metadata(&data_file_path).unwrap().len(),
start: rg1_start,
length: rg1_length,
record_count: Some(100), data_file_path: data_file_path.clone(),
data_file_format: DataFileFormat::Parquet,
schema: table_schema.clone(),
project_field_ids: vec![1],
predicate: None,
deletes: vec![FileScanTaskDeleteFile {
file_size_in_bytes: std::fs::metadata(&delete_file_path).unwrap().len(),
file_path: delete_file_path,
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
equality_ids: None,
}],
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
};
let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
let result = reader
.read(tasks)
.unwrap()
.try_collect::<Vec<RecordBatch>>()
.await
.unwrap();
let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
assert_eq!(
total_rows, 100,
"Expected 100 rows from row group 1 (delete at position 0 is in skipped row group 0). \
If this hangs or fails, it indicates the cached delete index was not updated after advance_to()."
);
let all_ids: Vec<i32> = result
.iter()
.flat_map(|batch| {
batch
.column(0)
.as_primitive::<arrow_array::types::Int32Type>()
.values()
.iter()
.copied()
})
.collect();
let expected_ids: Vec<i32> = (101..=200).collect();
assert_eq!(
all_ids, expected_ids,
"Should have ids 101-200 (all of row group 1)"
);
}
#[tokio::test]
async fn test_read_parquet_file_without_field_ids() {
let schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(2, "age", Type::Primitive(PrimitiveType::Int)).into(),
])
.build()
.unwrap(),
);
let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("name", DataType::Utf8, false),
Field::new("age", DataType::Int32, false),
]));
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().to_str().unwrap().to_string();
let file_io = FileIO::new_with_fs();
let name_data = vec!["Alice", "Bob", "Charlie"];
let age_data = vec![30, 25, 35];
use arrow_array::Int32Array;
let name_col = Arc::new(StringArray::from(name_data.clone())) as ArrayRef;
let age_col = Arc::new(Int32Array::from(age_data.clone())) as ArrayRef;
let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![name_col, age_col]).unwrap();
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let file = File::create(format!("{table_location}/1.parquet")).unwrap();
let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
writer.write(&to_write).expect("Writing batch");
writer.close().unwrap();
let reader = ArrowReaderBuilder::new(file_io).build();
let tasks = Box::pin(futures::stream::iter(
vec![Ok(FileScanTask {
file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
.unwrap()
.len(),
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{table_location}/1.parquet"),
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1, 2],
predicate: None,
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
})]
.into_iter(),
)) as FileScanTaskStream;
let result = reader
.read(tasks)
.unwrap()
.try_collect::<Vec<RecordBatch>>()
.await
.unwrap();
assert_eq!(result.len(), 1);
let batch = &result[0];
assert_eq!(batch.num_rows(), 3);
assert_eq!(batch.num_columns(), 2);
let name_array = batch.column(0).as_string::<i32>();
assert_eq!(name_array.value(0), "Alice");
assert_eq!(name_array.value(1), "Bob");
assert_eq!(name_array.value(2), "Charlie");
let age_array = batch
.column(1)
.as_primitive::<arrow_array::types::Int32Type>();
assert_eq!(age_array.value(0), 30);
assert_eq!(age_array.value(1), 25);
assert_eq!(age_array.value(2), 35);
}
#[tokio::test]
async fn test_read_parquet_without_field_ids_partial_projection() {
use arrow_array::Int32Array;
let schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::required(1, "col1", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(2, "col2", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(3, "col3", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(4, "col4", Type::Primitive(PrimitiveType::Int)).into(),
])
.build()
.unwrap(),
);
let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("col1", DataType::Utf8, false),
Field::new("col2", DataType::Int32, false),
Field::new("col3", DataType::Utf8, false),
Field::new("col4", DataType::Int32, false),
]));
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().to_str().unwrap().to_string();
let file_io = FileIO::new_with_fs();
let col1_data = Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef;
let col2_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef;
let col3_data = Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef;
let col4_data = Arc::new(Int32Array::from(vec![30, 40])) as ArrayRef;
let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
col1_data, col2_data, col3_data, col4_data,
])
.unwrap();
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let file = File::create(format!("{table_location}/1.parquet")).unwrap();
let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
writer.write(&to_write).expect("Writing batch");
writer.close().unwrap();
let reader = ArrowReaderBuilder::new(file_io).build();
let tasks = Box::pin(futures::stream::iter(
vec![Ok(FileScanTask {
file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
.unwrap()
.len(),
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{table_location}/1.parquet"),
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1, 3],
predicate: None,
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
})]
.into_iter(),
)) as FileScanTaskStream;
let result = reader
.read(tasks)
.unwrap()
.try_collect::<Vec<RecordBatch>>()
.await
.unwrap();
assert_eq!(result.len(), 1);
let batch = &result[0];
assert_eq!(batch.num_rows(), 2);
assert_eq!(batch.num_columns(), 2);
let col1_array = batch.column(0).as_string::<i32>();
assert_eq!(col1_array.value(0), "a");
assert_eq!(col1_array.value(1), "b");
let col3_array = batch.column(1).as_string::<i32>();
assert_eq!(col3_array.value(0), "c");
assert_eq!(col3_array.value(1), "d");
}
#[tokio::test]
async fn test_read_parquet_without_field_ids_schema_evolution() {
use arrow_array::{Array, Int32Array};
let schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(2, "age", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(3, "city", Type::Primitive(PrimitiveType::String)).into(),
])
.build()
.unwrap(),
);
let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("name", DataType::Utf8, false),
Field::new("age", DataType::Int32, false),
]));
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().to_str().unwrap().to_string();
let file_io = FileIO::new_with_fs();
let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as ArrayRef;
let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef;
let to_write =
RecordBatch::try_new(arrow_schema.clone(), vec![name_data, age_data]).unwrap();
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let file = File::create(format!("{table_location}/1.parquet")).unwrap();
let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
writer.write(&to_write).expect("Writing batch");
writer.close().unwrap();
let reader = ArrowReaderBuilder::new(file_io).build();
let tasks = Box::pin(futures::stream::iter(
vec![Ok(FileScanTask {
file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
.unwrap()
.len(),
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{table_location}/1.parquet"),
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1, 2, 3],
predicate: None,
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
})]
.into_iter(),
)) as FileScanTaskStream;
let result = reader
.read(tasks)
.unwrap()
.try_collect::<Vec<RecordBatch>>()
.await
.unwrap();
assert_eq!(result.len(), 1);
let batch = &result[0];
assert_eq!(batch.num_rows(), 2);
assert_eq!(batch.num_columns(), 3);
let name_array = batch.column(0).as_string::<i32>();
assert_eq!(name_array.value(0), "Alice");
assert_eq!(name_array.value(1), "Bob");
let age_array = batch
.column(1)
.as_primitive::<arrow_array::types::Int32Type>();
assert_eq!(age_array.value(0), 30);
assert_eq!(age_array.value(1), 25);
let city_array = batch.column(2).as_string::<i32>();
assert_eq!(city_array.null_count(), 2);
assert!(city_array.is_null(0));
assert!(city_array.is_null(1));
}
#[tokio::test]
async fn test_read_parquet_without_field_ids_multiple_row_groups() {
use arrow_array::Int32Array;
let schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(2, "value", Type::Primitive(PrimitiveType::Int)).into(),
])
.build()
.unwrap(),
);
let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("name", DataType::Utf8, false),
Field::new("value", DataType::Int32, false),
]));
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().to_str().unwrap().to_string();
let file_io = FileIO::new_with_fs();
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.set_write_batch_size(2)
.set_max_row_group_size(2)
.build();
let file = File::create(format!("{table_location}/1.parquet")).unwrap();
let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
for batch_num in 0..3 {
let name_data = Arc::new(StringArray::from(vec![
format!("name_{}", batch_num * 2),
format!("name_{}", batch_num * 2 + 1),
])) as ArrayRef;
let value_data =
Arc::new(Int32Array::from(vec![batch_num * 2, batch_num * 2 + 1])) as ArrayRef;
let batch =
RecordBatch::try_new(arrow_schema.clone(), vec![name_data, value_data]).unwrap();
writer.write(&batch).expect("Writing batch");
}
writer.close().unwrap();
let reader = ArrowReaderBuilder::new(file_io).build();
let tasks = Box::pin(futures::stream::iter(
vec![Ok(FileScanTask {
file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
.unwrap()
.len(),
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{table_location}/1.parquet"),
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1, 2],
predicate: None,
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
})]
.into_iter(),
)) as FileScanTaskStream;
let result = reader
.read(tasks)
.unwrap()
.try_collect::<Vec<RecordBatch>>()
.await
.unwrap();
assert!(!result.is_empty());
let mut all_names = Vec::new();
let mut all_values = Vec::new();
for batch in &result {
let name_array = batch.column(0).as_string::<i32>();
let value_array = batch
.column(1)
.as_primitive::<arrow_array::types::Int32Type>();
for i in 0..batch.num_rows() {
all_names.push(name_array.value(i).to_string());
all_values.push(value_array.value(i));
}
}
assert_eq!(all_names.len(), 6);
assert_eq!(all_values.len(), 6);
for i in 0..6 {
assert_eq!(all_names[i], format!("name_{i}"));
assert_eq!(all_values[i], i as i32);
}
}
#[tokio::test]
async fn test_read_parquet_without_field_ids_with_struct() {
use arrow_array::{Int32Array, StructArray};
use arrow_schema::Fields;
let schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(
2,
"person",
Type::Struct(crate::spec::StructType::new(vec![
NestedField::required(
3,
"name",
Type::Primitive(PrimitiveType::String),
)
.into(),
NestedField::required(4, "age", Type::Primitive(PrimitiveType::Int))
.into(),
])),
)
.into(),
])
.build()
.unwrap(),
);
let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new(
"person",
DataType::Struct(Fields::from(vec![
Field::new("name", DataType::Utf8, false),
Field::new("age", DataType::Int32, false),
])),
false,
),
]));
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().to_str().unwrap().to_string();
let file_io = FileIO::new_with_fs();
let id_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef;
let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as ArrayRef;
let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef;
let person_data = Arc::new(StructArray::from(vec![
(
Arc::new(Field::new("name", DataType::Utf8, false)),
name_data,
),
(
Arc::new(Field::new("age", DataType::Int32, false)),
age_data,
),
])) as ArrayRef;
let to_write =
RecordBatch::try_new(arrow_schema.clone(), vec![id_data, person_data]).unwrap();
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let file = File::create(format!("{table_location}/1.parquet")).unwrap();
let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
writer.write(&to_write).expect("Writing batch");
writer.close().unwrap();
let reader = ArrowReaderBuilder::new(file_io).build();
let tasks = Box::pin(futures::stream::iter(
vec![Ok(FileScanTask {
file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
.unwrap()
.len(),
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{table_location}/1.parquet"),
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1, 2],
predicate: None,
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
})]
.into_iter(),
)) as FileScanTaskStream;
let result = reader
.read(tasks)
.unwrap()
.try_collect::<Vec<RecordBatch>>()
.await
.unwrap();
assert_eq!(result.len(), 1);
let batch = &result[0];
assert_eq!(batch.num_rows(), 2);
assert_eq!(batch.num_columns(), 2);
let id_array = batch
.column(0)
.as_primitive::<arrow_array::types::Int32Type>();
assert_eq!(id_array.value(0), 1);
assert_eq!(id_array.value(1), 2);
let person_array = batch.column(1).as_struct();
assert_eq!(person_array.num_columns(), 2);
let name_array = person_array.column(0).as_string::<i32>();
assert_eq!(name_array.value(0), "Alice");
assert_eq!(name_array.value(1), "Bob");
let age_array = person_array
.column(1)
.as_primitive::<arrow_array::types::Int32Type>();
assert_eq!(age_array.value(0), 30);
assert_eq!(age_array.value(1), 25);
}
#[tokio::test]
async fn test_read_parquet_without_field_ids_schema_evolution_add_column_in_middle() {
use arrow_array::{Array, Int32Array};
let arrow_schema_old = Arc::new(ArrowSchema::new(vec![
Field::new("col0", DataType::Int32, true),
Field::new("col1", DataType::Int32, true),
]));
let schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::optional(1, "col0", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(5, "newCol", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(2, "col1", Type::Primitive(PrimitiveType::Int)).into(),
])
.build()
.unwrap(),
);
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().to_str().unwrap().to_string();
let file_io = FileIO::new_with_fs();
let col0_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef;
let col1_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef;
let to_write =
RecordBatch::try_new(arrow_schema_old.clone(), vec![col0_data, col1_data]).unwrap();
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let file = File::create(format!("{table_location}/1.parquet")).unwrap();
let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
writer.write(&to_write).expect("Writing batch");
writer.close().unwrap();
let reader = ArrowReaderBuilder::new(file_io).build();
let tasks = Box::pin(futures::stream::iter(
vec![Ok(FileScanTask {
file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
.unwrap()
.len(),
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{table_location}/1.parquet"),
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1, 5, 2],
predicate: None,
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
})]
.into_iter(),
)) as FileScanTaskStream;
let result = reader
.read(tasks)
.unwrap()
.try_collect::<Vec<RecordBatch>>()
.await
.unwrap();
assert_eq!(result.len(), 1);
let batch = &result[0];
assert_eq!(batch.num_rows(), 2);
assert_eq!(batch.num_columns(), 3);
let result_col0 = batch
.column(0)
.as_primitive::<arrow_array::types::Int32Type>();
assert_eq!(result_col0.value(0), 1);
assert_eq!(result_col0.value(1), 2);
let result_newcol = batch
.column(1)
.as_primitive::<arrow_array::types::Int32Type>();
assert_eq!(result_newcol.null_count(), 2);
assert!(result_newcol.is_null(0));
assert!(result_newcol.is_null(1));
let result_col1 = batch
.column(2)
.as_primitive::<arrow_array::types::Int32Type>();
assert_eq!(result_col1.value(0), 10);
assert_eq!(result_col1.value(1), 20);
}
#[tokio::test]
async fn test_read_parquet_without_field_ids_filter_eliminates_all_rows() {
use arrow_array::{Float64Array, Int32Array};
let schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(3, "value", Type::Primitive(PrimitiveType::Double))
.into(),
])
.build()
.unwrap(),
);
let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
Field::new("value", DataType::Float64, false),
]));
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().to_str().unwrap().to_string();
let file_io = FileIO::new_with_fs();
let id_data = Arc::new(Int32Array::from(vec![10, 11, 12])) as ArrayRef;
let name_data = Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef;
let value_data = Arc::new(Float64Array::from(vec![100.0, 200.0, 300.0])) as ArrayRef;
let to_write =
RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data, value_data])
.unwrap();
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let file = File::create(format!("{table_location}/1.parquet")).unwrap();
let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
writer.write(&to_write).expect("Writing batch");
writer.close().unwrap();
let predicate = Reference::new("id").less_than(Datum::int(5));
let reader = ArrowReaderBuilder::new(file_io)
.with_row_group_filtering_enabled(true)
.with_row_selection_enabled(true)
.build();
let tasks = Box::pin(futures::stream::iter(
vec![Ok(FileScanTask {
file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
.unwrap()
.len(),
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{table_location}/1.parquet"),
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1, 2, 3],
predicate: Some(predicate.bind(schema, true).unwrap()),
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
})]
.into_iter(),
)) as FileScanTaskStream;
let result = reader
.read(tasks)
.unwrap()
.try_collect::<Vec<RecordBatch>>()
.await
.unwrap();
assert!(result.is_empty() || result.iter().all(|batch| batch.num_rows() == 0));
}
#[tokio::test]
async fn test_read_with_concurrency_one() {
use arrow_array::Int32Array;
let schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(2, "file_num", Type::Primitive(PrimitiveType::Int))
.into(),
])
.build()
.unwrap(),
);
let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)])),
Field::new("file_num", DataType::Int32, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"2".to_string(),
)])),
]));
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().to_str().unwrap().to_string();
let file_io = FileIO::new_with_fs();
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
for file_num in 0..3 {
let id_data = Arc::new(Int32Array::from_iter_values(
file_num * 10..(file_num + 1) * 10,
)) as ArrayRef;
let file_num_data = Arc::new(Int32Array::from(vec![file_num; 10])) as ArrayRef;
let to_write =
RecordBatch::try_new(arrow_schema.clone(), vec![id_data, file_num_data]).unwrap();
let file = File::create(format!("{table_location}/file_{file_num}.parquet")).unwrap();
let mut writer =
ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
writer.write(&to_write).expect("Writing batch");
writer.close().unwrap();
}
let reader = ArrowReaderBuilder::new(file_io)
.with_data_file_concurrency_limit(1)
.build();
let tasks = vec![
Ok(FileScanTask {
file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_0.parquet"))
.unwrap()
.len(),
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{table_location}/file_0.parquet"),
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1, 2],
predicate: None,
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
}),
Ok(FileScanTask {
file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_1.parquet"))
.unwrap()
.len(),
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{table_location}/file_1.parquet"),
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1, 2],
predicate: None,
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
}),
Ok(FileScanTask {
file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_2.parquet"))
.unwrap()
.len(),
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{table_location}/file_2.parquet"),
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1, 2],
predicate: None,
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
}),
];
let tasks_stream = Box::pin(futures::stream::iter(tasks)) as FileScanTaskStream;
let result = reader
.read(tasks_stream)
.unwrap()
.try_collect::<Vec<RecordBatch>>()
.await
.unwrap();
let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 30, "Should have 30 total rows");
let mut all_ids = Vec::new();
let mut all_file_nums = Vec::new();
for batch in &result {
let id_col = batch
.column(0)
.as_primitive::<arrow_array::types::Int32Type>();
let file_num_col = batch
.column(1)
.as_primitive::<arrow_array::types::Int32Type>();
for i in 0..batch.num_rows() {
all_ids.push(id_col.value(i));
all_file_nums.push(file_num_col.value(i));
}
}
assert_eq!(all_ids.len(), 30);
assert_eq!(all_file_nums.len(), 30);
for i in 0..10 {
assert_eq!(all_file_nums[i], 0, "First 10 rows should be from file_0");
assert_eq!(all_ids[i], i as i32, "IDs should be 0-9");
}
for i in 10..20 {
assert_eq!(all_file_nums[i], 1, "Next 10 rows should be from file_1");
assert_eq!(all_ids[i], i as i32, "IDs should be 10-19");
}
for i in 20..30 {
assert_eq!(all_file_nums[i], 2, "Last 10 rows should be from file_2");
assert_eq!(all_ids[i], i as i32, "IDs should be 20-29");
}
}
#[tokio::test]
async fn test_bucket_partitioning_reads_source_column_from_file() {
use arrow_array::Int32Array;
use crate::spec::{Literal, PartitionSpec, Struct, Transform};
let schema = Arc::new(
Schema::builder()
.with_schema_id(0)
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
])
.build()
.unwrap(),
);
let partition_spec = Arc::new(
PartitionSpec::builder(schema.clone())
.with_spec_id(0)
.add_partition_field("id", "id_bucket", Transform::Bucket(4))
.unwrap()
.build()
.unwrap(),
);
let partition_data = Struct::from_iter(vec![Some(Literal::int(1))]);
let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)])),
Field::new("name", DataType::Utf8, true).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"2".to_string(),
)])),
]));
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().to_str().unwrap().to_string();
let file_io = FileIO::new_with_fs();
let id_data = Arc::new(Int32Array::from(vec![1, 5, 9, 13])) as ArrayRef;
let name_data =
Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie", "Dave"])) as ArrayRef;
let to_write =
RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data]).unwrap();
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let file = File::create(format!("{}/data.parquet", &table_location)).unwrap();
let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
writer.write(&to_write).expect("Writing batch");
writer.close().unwrap();
let reader = ArrowReaderBuilder::new(file_io).build();
let tasks = Box::pin(futures::stream::iter(
vec![Ok(FileScanTask {
file_size_in_bytes: std::fs::metadata(format!("{table_location}/data.parquet"))
.unwrap()
.len(),
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{table_location}/data.parquet"),
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1, 2],
predicate: None,
deletes: vec![],
partition: Some(partition_data),
partition_spec: Some(partition_spec),
name_mapping: None,
case_sensitive: false,
})]
.into_iter(),
)) as FileScanTaskStream;
let result = reader
.read(tasks)
.unwrap()
.try_collect::<Vec<RecordBatch>>()
.await
.unwrap();
assert_eq!(result.len(), 1);
let batch = &result[0];
assert_eq!(batch.num_columns(), 2);
assert_eq!(batch.num_rows(), 4);
let id_col = batch
.column(0)
.as_primitive::<arrow_array::types::Int32Type>();
assert_eq!(id_col.value(0), 1);
assert_eq!(id_col.value(1), 5);
assert_eq!(id_col.value(2), 9);
assert_eq!(id_col.value(3), 13);
let name_col = batch.column(1).as_string::<i32>();
assert_eq!(name_col.value(0), "Alice");
assert_eq!(name_col.value(1), "Bob");
assert_eq!(name_col.value(2), "Charlie");
assert_eq!(name_col.value(3), "Dave");
}
#[test]
fn test_merge_ranges_empty() {
assert_eq!(super::merge_ranges(&[], 1024), Vec::<Range<u64>>::new());
}
#[test]
fn test_merge_ranges_no_coalesce() {
let ranges = vec![0..100, 1_000_000..1_000_100];
let merged = super::merge_ranges(&ranges, 1024);
assert_eq!(merged, vec![0..100, 1_000_000..1_000_100]);
}
#[test]
fn test_merge_ranges_coalesce() {
let ranges = vec![0..100, 200..300, 500..600];
let merged = super::merge_ranges(&ranges, 1024);
assert_eq!(merged, vec![0..600]);
}
#[test]
fn test_merge_ranges_overlapping() {
let ranges = vec![0..200, 100..300];
let merged = super::merge_ranges(&ranges, 0);
assert_eq!(merged, vec![0..300]);
}
#[test]
fn test_merge_ranges_unsorted() {
let ranges = vec![500..600, 0..100, 200..300];
let merged = super::merge_ranges(&ranges, 1024);
assert_eq!(merged, vec![0..600]);
}
struct MockFileRead {
data: bytes::Bytes,
}
impl MockFileRead {
fn new(size: usize) -> Self {
let data: Vec<u8> = (0..size).map(|i| (i % 256) as u8).collect();
Self {
data: bytes::Bytes::from(data),
}
}
}
#[async_trait::async_trait]
impl crate::io::FileRead for MockFileRead {
async fn read(&self, range: Range<u64>) -> crate::Result<bytes::Bytes> {
Ok(self.data.slice(range.start as usize..range.end as usize))
}
}
#[tokio::test]
async fn test_get_byte_ranges_no_coalesce() {
use parquet::arrow::async_reader::AsyncFileReader;
let mock = MockFileRead::new(2048);
let expected_0 = mock.data.slice(0..100);
let expected_1 = mock.data.slice(1500..1600);
let mut reader =
super::ArrowFileReader::new(crate::io::FileMetadata { size: 2048 }, Box::new(mock))
.with_parquet_read_options(
super::ParquetReadOptions::builder()
.with_range_coalesce_bytes(0)
.build(),
);
let result = reader
.get_byte_ranges(vec![0..100, 1500..1600])
.await
.unwrap();
assert_eq!(result.len(), 2);
assert_eq!(result[0], expected_0);
assert_eq!(result[1], expected_1);
}
#[tokio::test]
async fn test_get_byte_ranges_with_coalesce() {
use parquet::arrow::async_reader::AsyncFileReader;
let mock = MockFileRead::new(1024);
let expected_0 = mock.data.slice(0..100);
let expected_1 = mock.data.slice(200..300);
let expected_2 = mock.data.slice(500..600);
let mut reader =
super::ArrowFileReader::new(crate::io::FileMetadata { size: 1024 }, Box::new(mock))
.with_parquet_read_options(
super::ParquetReadOptions::builder()
.with_range_coalesce_bytes(1024)
.build(),
);
let result = reader
.get_byte_ranges(vec![0..100, 200..300, 500..600])
.await
.unwrap();
assert_eq!(result.len(), 3);
assert_eq!(result[0], expected_0);
assert_eq!(result[1], expected_1);
assert_eq!(result[2], expected_2);
}
#[tokio::test]
async fn test_get_byte_ranges_empty() {
use parquet::arrow::async_reader::AsyncFileReader;
let mock = MockFileRead::new(1024);
let mut reader =
super::ArrowFileReader::new(crate::io::FileMetadata { size: 1024 }, Box::new(mock));
let result = reader.get_byte_ranges(vec![]).await.unwrap();
assert!(result.is_empty());
}
#[tokio::test]
async fn test_get_byte_ranges_coalesce_max() {
use parquet::arrow::async_reader::AsyncFileReader;
let mock = MockFileRead::new(2048);
let expected_0 = mock.data.slice(0..100);
let expected_1 = mock.data.slice(1500..1600);
let mut reader =
super::ArrowFileReader::new(crate::io::FileMetadata { size: 2048 }, Box::new(mock))
.with_parquet_read_options(
super::ParquetReadOptions::builder()
.with_range_coalesce_bytes(u64::MAX)
.build(),
);
let result = reader
.get_byte_ranges(vec![0..100, 1500..1600])
.await
.unwrap();
assert_eq!(result.len(), 2);
assert_eq!(result[0], expected_0);
assert_eq!(result[1], expected_1);
}
#[tokio::test]
async fn test_get_byte_ranges_concurrency_zero() {
use parquet::arrow::async_reader::AsyncFileReader;
let mock = MockFileRead::new(1024);
let expected = mock.data.slice(0..100);
let mut reader =
super::ArrowFileReader::new(crate::io::FileMetadata { size: 1024 }, Box::new(mock))
.with_parquet_read_options(
super::ParquetReadOptions::builder()
.with_range_fetch_concurrency(0)
.build(),
);
let result = reader
.get_byte_ranges(vec![0..100, 200..300])
.await
.unwrap();
assert_eq!(result.len(), 2);
assert_eq!(result[0], expected);
}
#[tokio::test]
async fn test_get_byte_ranges_concurrency_one() {
use parquet::arrow::async_reader::AsyncFileReader;
let mock = MockFileRead::new(2048);
let expected_0 = mock.data.slice(0..100);
let expected_1 = mock.data.slice(500..600);
let expected_2 = mock.data.slice(1500..1600);
let mut reader =
super::ArrowFileReader::new(crate::io::FileMetadata { size: 2048 }, Box::new(mock))
.with_parquet_read_options(
super::ParquetReadOptions::builder()
.with_range_coalesce_bytes(0)
.with_range_fetch_concurrency(1)
.build(),
);
let result = reader
.get_byte_ranges(vec![0..100, 500..600, 1500..1600])
.await
.unwrap();
assert_eq!(result.len(), 3);
assert_eq!(result[0], expected_0);
assert_eq!(result[1], expected_1);
assert_eq!(result[2], expected_2);
}
}