use std::path::Path;
use std::sync::Arc;
use arrow::array::{new_null_array, ArrayRef};
use arrow::datatypes::{Field, Schema};
use arrow::record_batch::RecordBatch;
use bytes::Bytes;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::arrow::ProjectionMask;
use crate::predicate::Predicate;
use crate::{ColumnarError, ColumnarTable, CompressionMode, ParquetFileMetadata};
pub(crate) fn read_batches(path: &Path) -> Result<Vec<RecordBatch>, ColumnarError> {
let file = std::fs::File::open(path)?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
let reader = builder.build()?;
let batches: Result<Vec<RecordBatch>, _> = reader.collect();
Ok(batches?)
}
pub(crate) fn read_batches_with_projection(
path: &Path,
column_indices: &[usize],
) -> Result<Vec<RecordBatch>, ColumnarError> {
let file = std::fs::File::open(path)?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
let parquet_schema = builder.parquet_schema().clone();
let mask = ProjectionMask::leaves(&parquet_schema, column_indices.iter().copied());
let reader = builder.with_projection(mask).build()?;
let batches: Result<Vec<RecordBatch>, _> = reader.collect();
Ok(batches?)
}
pub(crate) fn read_metadata(path: &Path) -> Result<ParquetFileMetadata, ColumnarError> {
let file = std::fs::File::open(path)?;
let file_size = file.metadata()?.len();
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
let metadata = builder.metadata();
let schema = builder.schema().clone();
let num_rows = metadata.file_metadata().num_rows();
let num_row_groups = metadata.num_row_groups();
let num_columns = schema.fields().len();
Ok(ParquetFileMetadata {
schema,
num_rows,
num_row_groups,
num_columns,
file_size,
})
}
pub(crate) fn read_metadata_from_bytes(
data: &[u8],
) -> Result<crate::ParquetFileMetaInfo, ColumnarError> {
use crate::OXIA_MAGIC;
let parquet_bytes = if data.starts_with(OXIA_MAGIC) {
crate::decompress_payload(&data[OXIA_MAGIC.len()..])?
} else {
data.to_vec()
};
let cursor = Bytes::from(parquet_bytes);
let builder = ParquetRecordBatchReaderBuilder::try_new(cursor)?;
let metadata = builder.metadata();
let schema = builder.schema();
Ok(crate::ParquetFileMetaInfo {
num_rows: metadata.file_metadata().num_rows(),
num_row_groups: metadata.num_row_groups(),
num_columns: schema.fields().len(),
file_size: data.len() as u64,
})
}
pub(crate) fn read_batches_from_bytes(data: &[u8]) -> Result<ColumnarTable, ColumnarError> {
let cursor = Bytes::from(data.to_vec());
let builder = ParquetRecordBatchReaderBuilder::try_new(cursor)?;
let schema = builder.schema().clone();
let reader = builder.build()?;
let batches: Result<Vec<RecordBatch>, _> = reader.collect();
let batches = batches?;
Ok(ColumnarTable {
schema: Arc::clone(&schema),
batches,
compression: CompressionMode::None,
})
}
pub(crate) fn read_columns_from_bytes(
data: &[u8],
columns: &[&str],
) -> Result<ColumnarTable, ColumnarError> {
let cursor = Bytes::from(data.to_vec());
let builder = ParquetRecordBatchReaderBuilder::try_new(cursor)?;
let parquet_schema = builder.parquet_schema().clone();
let indices: Vec<usize> = parquet_schema
.columns()
.iter()
.enumerate()
.filter_map(|(i, c)| {
if columns.contains(&c.name()) {
Some(i)
} else {
None
}
})
.collect();
let mask = ProjectionMask::leaves(&parquet_schema, indices);
let schema = builder.schema().clone();
let reader = builder.with_projection(mask).build()?;
let batches: Result<Vec<RecordBatch>, _> = reader.collect();
let batches = batches?;
let projected_schema = if let Some(first) = batches.first() {
first.schema()
} else {
let fields: Vec<Arc<Field>> = schema
.fields()
.iter()
.filter(|f| columns.contains(&f.name().as_str()))
.cloned()
.collect();
Arc::new(Schema::new(fields))
};
Ok(ColumnarTable {
schema: projected_schema,
batches,
compression: CompressionMode::None,
})
}
pub(crate) fn read_with_predicate_from_bytes(
data: &[u8],
pred: &Predicate,
) -> Result<ColumnarTable, ColumnarError> {
let cursor = Bytes::from(data.to_vec());
let builder = ParquetRecordBatchReaderBuilder::try_new(cursor)?;
let meta = builder.metadata().clone();
let schema_desc = meta.file_metadata().schema_descr();
let surviving: Vec<usize> = (0..meta.num_row_groups())
.filter(|&i| pred.row_group_might_match(meta.row_group(i), schema_desc))
.collect();
let schema = builder.schema().clone();
let reader = builder.with_row_groups(surviving).build()?;
let batches: Result<Vec<RecordBatch>, _> = reader.collect();
let batches = batches?;
Ok(ColumnarTable {
schema,
batches,
compression: CompressionMode::None,
})
}
pub(crate) fn read_with_projection_and_predicate_from_bytes(
data: &[u8],
columns: &[&str],
pred: &Predicate,
) -> Result<ColumnarTable, ColumnarError> {
let cursor = Bytes::from(data.to_vec());
let builder = ParquetRecordBatchReaderBuilder::try_new(cursor)?;
let meta = builder.metadata().clone();
let schema_desc = meta.file_metadata().schema_descr();
let surviving: Vec<usize> = (0..meta.num_row_groups())
.filter(|&i| pred.row_group_might_match(meta.row_group(i), schema_desc))
.collect();
let parquet_schema = builder.parquet_schema().clone();
let indices: Vec<usize> = parquet_schema
.columns()
.iter()
.enumerate()
.filter_map(|(i, c)| {
if columns.contains(&c.name()) {
Some(i)
} else {
None
}
})
.collect();
let mask = ProjectionMask::leaves(&parquet_schema, indices);
let reader = builder
.with_row_groups(surviving)
.with_projection(mask)
.build()?;
let batches: Result<Vec<RecordBatch>, _> = reader.collect();
let batches = batches?;
let projected_schema = if let Some(first) = batches.first() {
first.schema()
} else {
let arrow_schema = meta.file_metadata().schema_descr();
let _schema_desc = arrow_schema; Arc::new(Schema::empty())
};
Ok(ColumnarTable {
schema: projected_schema,
batches,
compression: CompressionMode::None,
})
}
pub(crate) fn read_with_schema_from_bytes(
data: &[u8],
target: &Arc<Schema>,
) -> Result<ColumnarTable, ColumnarError> {
let cursor = Bytes::from(data.to_vec());
let builder = ParquetRecordBatchReaderBuilder::try_new(cursor)?;
let file_schema = builder.schema().clone();
for target_field in target.fields() {
if let Ok(file_idx) = file_schema.index_of(target_field.name()) {
let file_field = file_schema.field(file_idx);
if file_field.data_type() != target_field.data_type() {
return Err(ColumnarError::SchemaMismatch(format!(
"column '{}': file type {:?} != target type {:?}",
target_field.name(),
file_field.data_type(),
target_field.data_type()
)));
}
}
}
let columns_to_read: Vec<&str> = target
.fields()
.iter()
.filter_map(|f| {
if file_schema.index_of(f.name()).is_ok() {
Some(f.name().as_str())
} else {
None
}
})
.collect();
let parquet_schema = builder.parquet_schema().clone();
let indices: Vec<usize> = parquet_schema
.columns()
.iter()
.enumerate()
.filter_map(|(i, c)| {
if columns_to_read.contains(&c.name()) {
Some(i)
} else {
None
}
})
.collect();
let mask = ProjectionMask::leaves(&parquet_schema, indices);
let reader = builder.with_projection(mask).build()?;
let read_batches: Result<Vec<RecordBatch>, _> = reader.collect();
let read_batches = read_batches?;
let mut result_batches = Vec::with_capacity(read_batches.len());
for batch in &read_batches {
let num_rows = batch.num_rows();
let mut cols: Vec<ArrayRef> = Vec::with_capacity(target.fields().len());
for target_field in target.fields() {
let col: ArrayRef = if let Ok(idx) = batch.schema().index_of(target_field.name()) {
Arc::clone(batch.column(idx))
} else {
new_null_array(target_field.data_type(), num_rows)
};
cols.push(col);
}
let new_batch = RecordBatch::try_new(Arc::clone(target), cols)?;
result_batches.push(new_batch);
}
Ok(ColumnarTable {
schema: Arc::clone(target),
batches: result_batches,
compression: CompressionMode::None,
})
}