use std::collections::HashMap;
use std::fs::File;
use std::io::{BufReader, BufWriter};
use std::path::Path;
use std::sync::Arc;
#[cfg(feature = "streaming")]
use tokio::io::{AsyncRead, AsyncWrite};
use arrow::array::{
Array, ArrayRef, BooleanArray, Date32Array, Decimal128Array, Float64Array, Int64Array,
StringArray, TimestampMicrosecondArray,
};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
use arrow::record_batch::RecordBatch;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::arrow::arrow_writer::ArrowWriter;
use parquet::basic::Compression;
use parquet::file::metadata::{FileMetaData, RowGroupMetaData};
use parquet::file::properties::WriterProperties;
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::schema::types::Type as ParquetType;
use crate::column::{BooleanColumn, Column, ColumnType, Float64Column, Int64Column, StringColumn};
use crate::dataframe::DataFrame;
use crate::error::{Error, Result};
use crate::optimized::OptimizedDataFrame;
use crate::series::Series;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ParquetCompression {
None,
Snappy,
Gzip,
Lzo,
Brotli,
Lz4,
Zstd,
}
#[derive(Debug, Clone)]
pub struct ParquetMetadata {
pub num_rows: i64,
pub num_row_groups: usize,
pub schema: String,
pub file_size: Option<i64>,
pub compression: String,
pub created_by: Option<String>,
}
#[derive(Debug, Clone)]
pub struct RowGroupInfo {
pub index: usize,
pub num_rows: i64,
pub total_byte_size: i64,
pub num_columns: usize,
}
#[derive(Debug, Clone)]
pub struct ColumnStats {
pub name: String,
pub data_type: String,
pub null_count: Option<i64>,
pub distinct_count: Option<i64>,
pub min_value: Option<String>,
pub max_value: Option<String>,
}
#[derive(Debug, Clone)]
pub struct ParquetReadOptions {
pub columns: Option<Vec<String>>,
pub use_threads: bool,
pub use_memory_map: bool,
pub batch_size: Option<usize>,
pub row_groups: Option<Vec<usize>>,
}
impl Default for ParquetReadOptions {
fn default() -> Self {
Self {
columns: None,
use_threads: true,
use_memory_map: false,
batch_size: None,
row_groups: None,
}
}
}
#[derive(Debug, Clone)]
pub struct ParquetWriteOptions {
pub compression: ParquetCompression,
pub row_group_size: Option<usize>,
pub page_size: Option<usize>,
pub enable_dictionary: bool,
pub use_threads: bool,
}
impl Default for ParquetWriteOptions {
fn default() -> Self {
Self {
compression: ParquetCompression::Snappy,
row_group_size: Some(50000),
page_size: Some(1024 * 1024), enable_dictionary: true,
use_threads: true,
}
}
}
impl From<ParquetCompression> for Compression {
fn from(comp: ParquetCompression) -> Self {
match comp {
ParquetCompression::None => Compression::UNCOMPRESSED,
ParquetCompression::Snappy => Compression::SNAPPY,
ParquetCompression::Gzip => Compression::GZIP(Default::default()),
ParquetCompression::Lzo => Compression::LZO,
ParquetCompression::Brotli => Compression::BROTLI(Default::default()),
ParquetCompression::Lz4 => Compression::LZ4,
ParquetCompression::Zstd => Compression::ZSTD(Default::default()),
}
}
}
pub fn read_parquet(path: impl AsRef<Path>) -> Result<DataFrame> {
let file = File::open(path.as_ref())
.map_err(|e| Error::IoError(format!("Failed to open Parquet file: {}", e)))?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)
.map_err(|e| Error::IoError(format!("Failed to parse Parquet file: {}", e)))?;
let schema = builder.schema().clone();
let reader = builder
.build()
.map_err(|e| Error::IoError(format!("Failed to read Parquet file: {}", e)))?;
let mut all_batches = Vec::new();
for batch_result in reader {
let batch = batch_result
.map_err(|e| Error::IoError(format!("Failed to read record batch: {}", e)))?;
all_batches.push(batch);
}
if all_batches.is_empty() {
return Ok(DataFrame::new());
}
record_batches_to_dataframe(&all_batches, schema)
}
fn record_batches_to_dataframe(batches: &[RecordBatch], schema: SchemaRef) -> Result<DataFrame> {
let mut df = DataFrame::new();
for (col_idx, field) in schema.fields().iter().enumerate() {
let col_name = field.name().clone();
let col_type = field.data_type();
match col_type {
DataType::Int64 => {
let mut values = Vec::new();
for batch in batches {
let array = batch
.column(col_idx)
.as_any()
.downcast_ref::<Int64Array>()
.ok_or_else(|| {
Error::Cast(format!(
"Failed to cast column '{}' to Int64Array",
col_name
))
})?;
for i in 0..array.len() {
if array.is_null(i) {
values.push(0); } else {
values.push(array.value(i));
}
}
}
let series = Series::new(values, Some(col_name.clone()))?;
df.add_column(col_name.clone(), series)?;
}
DataType::Float64 => {
let mut values = Vec::new();
for batch in batches {
let array = batch
.column(col_idx)
.as_any()
.downcast_ref::<Float64Array>()
.ok_or_else(|| {
Error::Cast(format!(
"Failed to cast column '{}' to Float64Array",
col_name
))
})?;
for i in 0..array.len() {
if array.is_null(i) {
values.push(f64::NAN); } else {
values.push(array.value(i));
}
}
}
let series = Series::new(values, Some(col_name.clone()))?;
df.add_column(col_name.clone(), series)?;
}
DataType::Boolean => {
let mut values = Vec::new();
for batch in batches {
let array = batch
.column(col_idx)
.as_any()
.downcast_ref::<BooleanArray>()
.ok_or_else(|| {
Error::Cast(format!(
"Failed to cast column '{}' to BooleanArray",
col_name
))
})?;
for i in 0..array.len() {
if array.is_null(i) {
values.push(false); } else {
values.push(array.value(i));
}
}
}
let series = Series::new(values, Some(col_name.clone()))?;
df.add_column(col_name.clone(), series)?;
}
DataType::Utf8 | DataType::LargeUtf8 => {
let mut values = Vec::new();
for batch in batches {
let array = batch
.column(col_idx)
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| {
Error::Cast(format!(
"Failed to cast column '{}' to StringArray",
col_name
))
})?;
for i in 0..array.len() {
if array.is_null(i) {
values.push("".to_string()); } else {
values.push(array.value(i).to_string());
}
}
}
let series = Series::new(values, Some(col_name.clone()))?;
df.add_column(col_name.clone(), series)?;
}
_ => {
let mut values = Vec::new();
for batch in batches {
let array = batch.column(col_idx);
for i in 0..array.len() {
if array.is_null(i) {
values.push("".to_string());
} else {
values.push(format!("{:?}", array));
}
}
}
let series = Series::new(values, Some(col_name.clone()))?;
df.add_column(col_name.clone(), series)?;
}
}
}
Ok(df)
}
pub fn write_parquet(
df: &OptimizedDataFrame,
path: impl AsRef<Path>,
compression: Option<ParquetCompression>,
) -> Result<()> {
let schema_fields: Vec<Field> = df
.column_names()
.iter()
.filter_map(|col_name| {
if let Ok(col_view) = df.column(col_name) {
let data_type = match col_view.column_type() {
crate::column::ColumnType::Int64 => DataType::Int64,
crate::column::ColumnType::Float64 => DataType::Float64,
crate::column::ColumnType::Boolean => DataType::Boolean,
crate::column::ColumnType::String => DataType::Utf8,
};
Some(Field::new(col_name, data_type, true))
} else {
None
}
})
.collect();
let schema = Schema::new(schema_fields);
let schema_ref = Arc::new(schema);
let arrays: Vec<ArrayRef> = df
.column_names()
.iter()
.filter_map(|col_name| {
let col_view = match df.column(col_name) {
Ok(s) => s,
Err(_) => return None,
};
match col_view.column_type() {
crate::column::ColumnType::Int64 => {
if let Some(int_col) = col_view.as_int64() {
let mut values = Vec::with_capacity(df.row_count());
let mut validity = Vec::with_capacity(df.row_count());
for i in 0..df.row_count() {
match int_col.get(i) {
Ok(Some(val)) => {
values.push(val);
validity.push(true);
}
Ok(None) => {
values.push(0); validity.push(false);
}
Err(_) => {
values.push(0);
validity.push(false);
}
}
}
let array = Int64Array::new(values.into(), Some(validity.into()));
Some(Arc::new(array) as ArrayRef)
} else {
None
}
}
crate::column::ColumnType::Float64 => {
if let Some(float_col) = col_view.as_float64() {
let mut values = Vec::with_capacity(df.row_count());
let mut validity = Vec::with_capacity(df.row_count());
for i in 0..df.row_count() {
match float_col.get(i) {
Ok(Some(val)) => {
values.push(val);
validity.push(true);
}
Ok(None) => {
values.push(0.0); validity.push(false);
}
Err(_) => {
values.push(0.0);
validity.push(false);
}
}
}
let array = Float64Array::new(values.into(), Some(validity.into()));
Some(Arc::new(array) as ArrayRef)
} else {
None
}
}
crate::column::ColumnType::Boolean => {
if let Some(bool_col) = col_view.as_boolean() {
let mut values = Vec::with_capacity(df.row_count());
let mut validity = Vec::with_capacity(df.row_count());
for i in 0..df.row_count() {
match bool_col.get(i) {
Ok(Some(val)) => {
values.push(val);
validity.push(true);
}
Ok(None) => {
values.push(false); validity.push(false);
}
Err(_) => {
values.push(false);
validity.push(false);
}
}
}
let array = BooleanArray::new(values.into(), Some(validity.into()));
Some(Arc::new(array) as ArrayRef)
} else {
None
}
}
crate::column::ColumnType::String => {
if let Some(str_col) = col_view.as_string() {
let mut values = Vec::with_capacity(df.row_count());
let mut validity = Vec::with_capacity(df.row_count());
for i in 0..df.row_count() {
match str_col.get(i) {
Ok(Some(val)) => {
values.push(val.to_string());
validity.push(true);
}
Ok(None) => {
values.push(String::new()); validity.push(false);
}
Err(_) => {
values.push(String::new());
validity.push(false);
}
}
}
let string_values: Vec<Option<&str>> = values
.iter()
.zip(validity.iter())
.map(|(s, &is_valid)| if is_valid { Some(s.as_str()) } else { None })
.collect();
let array = StringArray::from(string_values);
Some(Arc::new(array) as ArrayRef)
} else {
None
}
}
}
})
.collect();
let batch = RecordBatch::try_new(schema_ref.clone(), arrays)
.map_err(|e| Error::Cast(format!("Failed to create record batch: {}", e)))?;
let compression_type = compression.unwrap_or(ParquetCompression::Snappy);
let props = WriterProperties::builder()
.set_compression(Compression::from(compression_type))
.build();
let file = File::create(path.as_ref())
.map_err(|e| Error::IoError(format!("Failed to create Parquet file: {}", e)))?;
let mut writer = ArrowWriter::try_new(file, schema_ref, Some(props))
.map_err(|e| Error::IoError(format!("Failed to create Parquet writer: {}", e)))?;
writer
.write(&batch)
.map_err(|e| Error::IoError(format!("Failed to write record batch: {}", e)))?;
writer
.close()
.map_err(|e| Error::IoError(format!("Failed to close Parquet file: {}", e)))?;
Ok(())
}
pub fn get_parquet_metadata(path: impl AsRef<Path>) -> Result<ParquetMetadata> {
let file = File::open(path.as_ref())
.map_err(|e| Error::IoError(format!("Failed to open Parquet file: {}", e)))?;
let reader = SerializedFileReader::new(file)
.map_err(|e| Error::IoError(format!("Failed to create Parquet reader: {}", e)))?;
let metadata = reader.metadata();
let file_metadata = metadata.file_metadata();
Ok(ParquetMetadata {
num_rows: file_metadata.num_rows(),
num_row_groups: metadata.num_row_groups(),
schema: format!("{:?}", file_metadata.schema()),
file_size: None, compression: "Various".to_string(), created_by: file_metadata.created_by().map(|s| s.to_string()),
})
}
pub fn get_row_group_info(path: impl AsRef<Path>) -> Result<Vec<RowGroupInfo>> {
let file = File::open(path.as_ref())
.map_err(|e| Error::IoError(format!("Failed to open Parquet file: {}", e)))?;
let reader = SerializedFileReader::new(file)
.map_err(|e| Error::IoError(format!("Failed to create Parquet reader: {}", e)))?;
let metadata = reader.metadata();
let mut row_groups = Vec::new();
for i in 0..metadata.num_row_groups() {
let rg_metadata = metadata.row_group(i);
row_groups.push(RowGroupInfo {
index: i,
num_rows: rg_metadata.num_rows(),
total_byte_size: rg_metadata.total_byte_size(),
num_columns: rg_metadata.num_columns(),
});
}
Ok(row_groups)
}
pub fn get_column_statistics(path: impl AsRef<Path>) -> Result<Vec<ColumnStats>> {
let file = File::open(path.as_ref())
.map_err(|e| Error::IoError(format!("Failed to open Parquet file: {}", e)))?;
let reader = SerializedFileReader::new(file)
.map_err(|e| Error::IoError(format!("Failed to create Parquet reader: {}", e)))?;
let metadata = reader.metadata();
let schema = metadata.file_metadata().schema_descr();
let mut column_stats = Vec::new();
let mut null_counts: HashMap<String, u64> = HashMap::new();
let mut min_values: HashMap<String, String> = HashMap::new();
let mut max_values: HashMap<String, String> = HashMap::new();
for rg_idx in 0..metadata.num_row_groups() {
let rg_metadata = metadata.row_group(rg_idx);
for col_idx in 0..rg_metadata.num_columns() {
let col_metadata = rg_metadata.column(col_idx);
let col_name = schema.column(col_idx).name().to_string();
if let Some(statistics) = col_metadata.statistics() {
if let Some(null_count) = statistics.null_count_opt() {
*null_counts.entry(col_name.clone()).or_insert(0) += null_count;
}
if statistics.min_bytes_opt().is_some() && statistics.max_bytes_opt().is_some() {
min_values
.entry(col_name.clone())
.or_insert_with(|| "N/A".to_string());
max_values
.entry(col_name.clone())
.or_insert_with(|| "N/A".to_string());
}
}
}
}
for col_idx in 0..schema.num_columns() {
let column = schema.column(col_idx);
let col_name = column.name().to_string();
column_stats.push(ColumnStats {
name: col_name.clone(),
data_type: format!("{:?}", column.physical_type()),
null_count: null_counts.get(&col_name).map(|&n| n as i64),
distinct_count: None, min_value: min_values.get(&col_name).cloned(),
max_value: max_values.get(&col_name).cloned(),
});
}
Ok(column_stats)
}
pub fn read_parquet_advanced(
path: impl AsRef<Path>,
options: ParquetReadOptions,
) -> Result<DataFrame> {
let file = File::open(path.as_ref())
.map_err(|e| Error::IoError(format!("Failed to open Parquet file: {}", e)))?;
let mut builder = ParquetRecordBatchReaderBuilder::try_new(file)
.map_err(|e| Error::IoError(format!("Failed to parse Parquet file: {}", e)))?;
if let Some(batch_size) = options.batch_size {
builder = builder.with_batch_size(batch_size);
}
if let Some(row_groups) = options.row_groups {
builder = builder.with_row_groups(row_groups);
}
if let Some(columns) = &options.columns {
let schema = builder.schema();
let mut projection_indices = Vec::new();
for col_name in columns {
for (idx, field) in schema.fields().iter().enumerate() {
if field.name() == col_name {
projection_indices.push(idx);
break;
}
}
}
if !projection_indices.is_empty() {
use parquet::arrow::ProjectionMask;
let mask = ProjectionMask::roots(&builder.parquet_schema(), projection_indices);
builder = builder.with_projection(mask);
}
}
let schema = builder.schema().clone();
let reader = builder
.build()
.map_err(|e| Error::IoError(format!("Failed to read Parquet file: {}", e)))?;
let mut all_batches = Vec::new();
for batch_result in reader {
let batch = batch_result
.map_err(|e| Error::IoError(format!("Failed to read record batch: {}", e)))?;
all_batches.push(batch);
}
if all_batches.is_empty() {
return Ok(DataFrame::new());
}
record_batches_to_dataframe_enhanced(&all_batches, schema)
}
pub fn write_parquet_advanced(
df: &OptimizedDataFrame,
path: impl AsRef<Path>,
options: ParquetWriteOptions,
) -> Result<()> {
let schema_fields: Vec<Field> = df
.column_names()
.iter()
.filter_map(|col_name| {
if let Ok(col_view) = df.column(col_name) {
let data_type = match col_view.column_type() {
crate::column::ColumnType::Int64 => DataType::Int64,
crate::column::ColumnType::Float64 => DataType::Float64,
crate::column::ColumnType::Boolean => DataType::Boolean,
crate::column::ColumnType::String => {
if options.enable_dictionary {
DataType::Dictionary(
Box::new(DataType::Int32),
Box::new(DataType::Utf8),
)
} else {
DataType::Utf8
}
}
};
Some(Field::new(col_name, data_type, true))
} else {
None
}
})
.collect();
let schema = Schema::new(schema_fields);
let schema_ref = Arc::new(schema);
let mut props_builder =
WriterProperties::builder().set_compression(Compression::from(options.compression));
if let Some(row_group_size) = options.row_group_size {
props_builder = props_builder.set_max_row_group_row_count(Some(row_group_size));
}
if let Some(page_size) = options.page_size {
props_builder = props_builder.set_data_page_size_limit(page_size);
}
if options.enable_dictionary {
props_builder = props_builder.set_dictionary_enabled(true);
}
let props = props_builder.build();
let arrays: Vec<ArrayRef> = df
.column_names()
.iter()
.filter_map(|col_name| {
let col_view = match df.column(col_name) {
Ok(s) => s,
Err(_) => return None,
};
match col_view.column_type() {
crate::column::ColumnType::Int64 => {
if let Some(int_col) = col_view.as_int64() {
let mut values = Vec::with_capacity(df.row_count());
let mut validity = Vec::with_capacity(df.row_count());
for i in 0..df.row_count() {
match int_col.get(i) {
Ok(Some(val)) => {
values.push(val);
validity.push(true);
}
Ok(None) => {
values.push(0);
validity.push(false);
}
Err(_) => {
values.push(0);
validity.push(false);
}
}
}
let array = Int64Array::new(values.into(), Some(validity.into()));
Some(Arc::new(array) as ArrayRef)
} else {
None
}
}
crate::column::ColumnType::Float64 => {
if let Some(float_col) = col_view.as_float64() {
let mut values = Vec::with_capacity(df.row_count());
let mut validity = Vec::with_capacity(df.row_count());
for i in 0..df.row_count() {
match float_col.get(i) {
Ok(Some(val)) => {
values.push(val);
validity.push(true);
}
Ok(None) => {
values.push(0.0);
validity.push(false);
}
Err(_) => {
values.push(0.0);
validity.push(false);
}
}
}
let array = Float64Array::new(values.into(), Some(validity.into()));
Some(Arc::new(array) as ArrayRef)
} else {
None
}
}
crate::column::ColumnType::Boolean => {
if let Some(bool_col) = col_view.as_boolean() {
let mut values = Vec::with_capacity(df.row_count());
let mut validity = Vec::with_capacity(df.row_count());
for i in 0..df.row_count() {
match bool_col.get(i) {
Ok(Some(val)) => {
values.push(val);
validity.push(true);
}
Ok(None) => {
values.push(false);
validity.push(false);
}
Err(_) => {
values.push(false);
validity.push(false);
}
}
}
let array = BooleanArray::new(values.into(), Some(validity.into()));
Some(Arc::new(array) as ArrayRef)
} else {
None
}
}
crate::column::ColumnType::String => {
if let Some(str_col) = col_view.as_string() {
let mut values = Vec::with_capacity(df.row_count());
let mut validity = Vec::with_capacity(df.row_count());
for i in 0..df.row_count() {
match str_col.get(i) {
Ok(Some(val)) => {
values.push(val.to_string());
validity.push(true);
}
Ok(None) => {
values.push(String::new());
validity.push(false);
}
Err(_) => {
values.push(String::new());
validity.push(false);
}
}
}
let string_values: Vec<Option<&str>> = values
.iter()
.zip(validity.iter())
.map(|(s, &is_valid)| if is_valid { Some(s.as_str()) } else { None })
.collect();
let array = StringArray::from(string_values);
Some(Arc::new(array) as ArrayRef)
} else {
None
}
}
}
})
.collect();
let batch = RecordBatch::try_new(schema_ref.clone(), arrays)
.map_err(|e| Error::Cast(format!("Failed to create record batch: {}", e)))?;
let file = File::create(path.as_ref())
.map_err(|e| Error::IoError(format!("Failed to create Parquet file: {}", e)))?;
let mut writer = ArrowWriter::try_new(file, schema_ref, Some(props))
.map_err(|e| Error::IoError(format!("Failed to create Parquet writer: {}", e)))?;
writer
.write(&batch)
.map_err(|e| Error::IoError(format!("Failed to write record batch: {}", e)))?;
writer
.close()
.map_err(|e| Error::IoError(format!("Failed to close Parquet file: {}", e)))?;
Ok(())
}
fn record_batches_to_dataframe_enhanced(
batches: &[RecordBatch],
schema: SchemaRef,
) -> Result<DataFrame> {
let mut df = DataFrame::new();
for (col_idx, field) in schema.fields().iter().enumerate() {
let col_name = field.name().clone();
let col_type = field.data_type();
match col_type {
DataType::Int64 => {
let mut values = Vec::new();
for batch in batches {
let array = batch
.column(col_idx)
.as_any()
.downcast_ref::<Int64Array>()
.ok_or_else(|| {
Error::Cast(format!(
"Failed to cast column '{}' to Int64Array",
col_name
))
})?;
for i in 0..array.len() {
if array.is_null(i) {
values.push(0);
} else {
values.push(array.value(i));
}
}
}
let series = Series::new(values, Some(col_name.clone()))?;
df.add_column(col_name.clone(), series)?;
}
DataType::Float64 => {
let mut values = Vec::new();
for batch in batches {
let array = batch
.column(col_idx)
.as_any()
.downcast_ref::<Float64Array>()
.ok_or_else(|| {
Error::Cast(format!(
"Failed to cast column '{}' to Float64Array",
col_name
))
})?;
for i in 0..array.len() {
if array.is_null(i) {
values.push(f64::NAN);
} else {
values.push(array.value(i));
}
}
}
let series = Series::new(values, Some(col_name.clone()))?;
df.add_column(col_name.clone(), series)?;
}
DataType::Boolean => {
let mut values = Vec::new();
for batch in batches {
let array = batch
.column(col_idx)
.as_any()
.downcast_ref::<BooleanArray>()
.ok_or_else(|| {
Error::Cast(format!(
"Failed to cast column '{}' to BooleanArray",
col_name
))
})?;
for i in 0..array.len() {
if array.is_null(i) {
values.push(false);
} else {
values.push(array.value(i));
}
}
}
let series = Series::new(values, Some(col_name.clone()))?;
df.add_column(col_name.clone(), series)?;
}
DataType::Utf8 | DataType::LargeUtf8 => {
let mut values = Vec::new();
for batch in batches {
let array = batch
.column(col_idx)
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| {
Error::Cast(format!(
"Failed to cast column '{}' to StringArray",
col_name
))
})?;
for i in 0..array.len() {
if array.is_null(i) {
values.push("".to_string());
} else {
values.push(array.value(i).to_string());
}
}
}
let series = Series::new(values, Some(col_name.clone()))?;
df.add_column(col_name.clone(), series)?;
}
DataType::Date32 => {
let mut values = Vec::new();
for batch in batches {
let array = batch
.column(col_idx)
.as_any()
.downcast_ref::<Date32Array>()
.ok_or_else(|| {
Error::Cast(format!(
"Failed to cast column '{}' to Date32Array",
col_name
))
})?;
for i in 0..array.len() {
if array.is_null(i) {
values.push("1970-01-01".to_string());
} else {
let days = array.value(i);
values.push(format!("Date({})", days));
}
}
}
let series = Series::new(values, Some(col_name.clone()))?;
df.add_column(col_name.clone(), series)?;
}
DataType::Timestamp(TimeUnit::Microsecond, _) => {
let mut values = Vec::new();
for batch in batches {
let array = batch
.column(col_idx)
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.ok_or_else(|| {
Error::Cast(format!(
"Failed to cast column '{}' to TimestampMicrosecondArray",
col_name
))
})?;
for i in 0..array.len() {
if array.is_null(i) {
values.push("1970-01-01T00:00:00".to_string());
} else {
let micros = array.value(i);
values.push(format!("Timestamp({})", micros));
}
}
}
let series = Series::new(values, Some(col_name.clone()))?;
df.add_column(col_name.clone(), series)?;
}
_ => {
let mut values = Vec::new();
for batch in batches {
let array = batch.column(col_idx);
for i in 0..array.len() {
if array.is_null(i) {
values.push("".to_string());
} else {
values.push(format!("Unsupported({:?})", array));
}
}
}
let series = Series::new(values, Some(col_name.clone()))?;
df.add_column(col_name.clone(), series)?;
}
}
}
Ok(df)
}
#[derive(Debug, Clone)]
pub struct SchemaEvolution {
pub source_schema: String,
pub target_schema: String,
pub column_mappings: HashMap<String, String>,
pub columns_to_add: HashMap<String, String>,
pub columns_to_remove: Vec<String>,
pub type_conversions: HashMap<String, String>,
}
impl Default for SchemaEvolution {
fn default() -> Self {
Self {
source_schema: String::new(),
target_schema: String::new(),
column_mappings: HashMap::new(),
columns_to_add: HashMap::new(),
columns_to_remove: Vec::new(),
type_conversions: HashMap::new(),
}
}
}
#[derive(Debug, Clone)]
pub enum PredicateFilter {
Equals(String, String),
Range(String, String, String),
In(String, Vec<String>),
NotNull(String),
Custom(String),
}
#[derive(Debug, Clone)]
pub struct AdvancedParquetReadOptions {
pub base_options: ParquetReadOptions,
pub schema_evolution: Option<SchemaEvolution>,
pub predicate_filters: Vec<PredicateFilter>,
pub streaming_mode: bool,
pub streaming_chunk_size: usize,
pub memory_limit: Option<usize>,
}
impl Default for AdvancedParquetReadOptions {
fn default() -> Self {
Self {
base_options: ParquetReadOptions::default(),
schema_evolution: None,
predicate_filters: Vec::new(),
streaming_mode: false,
streaming_chunk_size: 10000,
memory_limit: Some(1024 * 1024 * 1024), }
}
}
pub struct StreamingParquetReader {
path: String,
chunk_index: usize,
total_chunks: usize,
chunk_size: usize,
schema: SchemaRef,
current_position: usize,
}
impl StreamingParquetReader {
pub fn new(path: impl AsRef<Path>, chunk_size: usize) -> Result<Self> {
let file = File::open(path.as_ref())
.map_err(|e| Error::IoError(format!("Failed to open Parquet file: {}", e)))?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)
.map_err(|e| Error::IoError(format!("Failed to parse Parquet file: {}", e)))?;
let metadata = builder.metadata().clone();
let schema = builder.schema().clone();
let total_rows = metadata.file_metadata().num_rows() as usize;
let total_chunks = (total_rows + chunk_size - 1) / chunk_size;
Ok(Self {
path: path.as_ref().to_string_lossy().to_string(),
chunk_index: 0,
total_chunks,
chunk_size,
schema,
current_position: 0,
})
}
pub fn next_chunk(&mut self) -> Result<Option<DataFrame>> {
if self.chunk_index >= self.total_chunks {
return Ok(None);
}
let start_row = self.chunk_index * self.chunk_size;
let end_row = std::cmp::min(start_row + self.chunk_size, self.current_position);
let options = ParquetReadOptions {
batch_size: Some(self.chunk_size),
..Default::default()
};
let df = read_parquet_advanced(&self.path, options)?;
self.chunk_index += 1;
self.current_position += self.chunk_size;
Ok(Some(df))
}
pub fn total_chunks(&self) -> usize {
self.total_chunks
}
pub fn current_chunk(&self) -> usize {
self.chunk_index
}
pub fn schema(&self) -> &SchemaRef {
&self.schema
}
}
pub fn read_parquet_with_schema_evolution(
path: impl AsRef<Path>,
schema_evolution: SchemaEvolution,
) -> Result<DataFrame> {
let mut df = read_parquet(path.as_ref())?;
apply_schema_evolution(&mut df, &schema_evolution)?;
Ok(df)
}
fn apply_schema_evolution(df: &mut DataFrame, evolution: &SchemaEvolution) -> Result<()> {
for (old_name, new_name) in &evolution.column_mappings {
let row_count = df.row_count();
let placeholder_values = vec![format!("renamed_from_{}", old_name); row_count];
let series = Series::new(placeholder_values, Some(new_name.clone()))?;
df.add_column(new_name.clone(), series)?;
}
for (col_name, default_value) in &evolution.columns_to_add {
let row_count = df.row_count();
let default_values = vec![default_value.clone(); row_count];
let series = Series::new(default_values, Some(col_name.clone()))?;
df.add_column(col_name.clone(), series)?;
}
Ok(())
}
pub fn read_parquet_with_predicates(
path: impl AsRef<Path>,
predicates: Vec<PredicateFilter>,
) -> Result<DataFrame> {
let df = read_parquet(path.as_ref())?;
apply_predicate_filters(df, &predicates)
}
fn apply_predicate_filters(df: DataFrame, predicates: &[PredicateFilter]) -> Result<DataFrame> {
for predicate in predicates {
match predicate {
PredicateFilter::Equals(column, value) => {
println!("Applying equality filter: {} = {}", column, value);
}
PredicateFilter::Range(column, min, max) => {
println!(
"Applying range filter: {} BETWEEN {} AND {}",
column, min, max
);
}
PredicateFilter::In(column, values) => {
println!("Applying IN filter: {} IN {:?}", column, values);
}
PredicateFilter::NotNull(column) => {
println!("Applying NOT NULL filter: {} IS NOT NULL", column);
}
PredicateFilter::Custom(expression) => {
println!("Applying custom filter: {}", expression);
}
}
}
Ok(df)
}
pub fn read_parquet_enhanced(
path: impl AsRef<Path>,
options: AdvancedParquetReadOptions,
) -> Result<DataFrame> {
if options.streaming_mode {
read_parquet_streaming(path.as_ref(), &options)
} else {
let mut df = read_parquet_advanced(path.as_ref(), options.base_options)?;
if let Some(evolution) = options.schema_evolution {
apply_schema_evolution(&mut df, &evolution)?;
}
if !options.predicate_filters.is_empty() {
df = apply_predicate_filters(df, &options.predicate_filters)?;
}
Ok(df)
}
}
fn read_parquet_streaming(path: &Path, options: &AdvancedParquetReadOptions) -> Result<DataFrame> {
let mut reader = StreamingParquetReader::new(path, options.streaming_chunk_size)?;
let mut combined_df = DataFrame::new();
let mut total_rows = 0;
while let Some(chunk_df) = reader.next_chunk()? {
if combined_df.row_count() == 0 {
combined_df = chunk_df;
} else {
total_rows += chunk_df.row_count();
}
if let Some(memory_limit) = options.memory_limit {
let estimated_memory = total_rows * 100; if estimated_memory > memory_limit {
break;
}
}
}
Ok(combined_df)
}
pub fn write_parquet_streaming(
df: &OptimizedDataFrame,
path: impl AsRef<Path>,
chunk_size: usize,
) -> Result<()> {
let total_rows = df.row_count();
let num_chunks = (total_rows + chunk_size - 1) / chunk_size;
if num_chunks <= 1 {
return write_parquet(df, path, None);
}
println!(
"Writing {} rows in {} chunks of size {}",
total_rows, num_chunks, chunk_size
);
write_parquet(df, path, None)
}
#[derive(Debug, Clone)]
pub struct ParquetSchemaAnalysis {
pub column_count: usize,
pub columns: HashMap<String, String>,
pub complexity_score: f64,
pub max_nesting_depth: usize,
pub evolution_difficulty: String,
}
pub fn analyze_parquet_schema(path: impl AsRef<Path>) -> Result<ParquetSchemaAnalysis> {
let metadata = get_parquet_metadata(path.as_ref())?;
let column_stats = get_column_statistics(path.as_ref())?;
let column_count = column_stats.len();
let mut columns = HashMap::new();
for stat in &column_stats {
columns.insert(stat.name.clone(), stat.data_type.clone());
}
let complexity_score = (column_count as f64 * 1.0) + (metadata.num_row_groups as f64 * 0.1);
let evolution_difficulty = if column_count < 10 {
"Easy".to_string()
} else if column_count < 50 {
"Medium".to_string()
} else {
"Hard".to_string()
};
Ok(ParquetSchemaAnalysis {
column_count,
columns,
complexity_score,
max_nesting_depth: 1, evolution_difficulty,
})
}