use crate::error::{IoError, Result};
pub mod conversion;
pub mod options;
pub mod predicates;
pub mod reader;
pub mod schema;
pub mod statistics;
pub mod writer;
pub use conversion::{arrow_to_ndarray, ndarray_to_arrow};
pub use options::{CompressionCodec, ParquetVersion, ParquetWriteOptions};
pub use predicates::{
analyze_predicate_effectiveness, read_parquet_filtered, read_parquet_filtered_chunked,
FilterConfig, ParquetPredicate, PredicateAnalysis, PredicateValue,
};
pub use reader::{
read_parquet, read_parquet_chunked, read_parquet_chunked_columns, read_parquet_columns,
ParquetChunkIterator, ParquetData, ParquetReader,
};
pub use schema::{infer_arrow_schema, ParquetSchema};
pub use statistics::{read_parquet_statistics, ColumnStatistics, ParquetFileStatistics};
pub use writer::{write_parquet, write_parquet_with_name, ParquetWriter};
#[cfg(test)]
mod tests {
use super::*;
use scirs2_core::ndarray::Array1;
use std::fs;
use tempfile::tempdir;
#[test]
fn test_parquet_roundtrip_f64() {
let dir = tempdir().expect("Test I/O operation failed");
let path = dir.path().join("test.parquet");
let data = Array1::from_vec(vec![1.0, 2.0, 3.0, 4.0, 5.0]);
write_parquet(&path, &data, Default::default()).expect("Test I/O operation failed");
let loaded = read_parquet(&path).expect("Test I/O operation failed");
assert_eq!(loaded.column_names().len(), 1);
let column = loaded
.get_column_f64("value")
.expect("Test I/O operation failed");
assert_eq!(column.len(), 5);
assert_eq!(column[0], 1.0);
assert_eq!(column[4], 5.0);
}
#[test]
fn test_compression_codecs() {
let dir = tempdir().expect("Test I/O operation failed");
let data = Array1::from_vec((0..100).map(|x| x as f64).collect::<Vec<_>>());
for codec in [
CompressionCodec::Uncompressed,
CompressionCodec::Snappy,
CompressionCodec::Gzip,
] {
let path = dir.path().join(format!("test_{:?}.parquet", codec));
let options = ParquetWriteOptions {
compression: codec,
..Default::default()
};
write_parquet(&path, &data, options).expect("Test I/O operation failed");
let loaded = read_parquet(&path).expect("Test I/O operation failed");
assert_eq!(loaded.num_rows(), 100);
}
}
#[test]
fn test_parquet_roundtrip_i32() {
let dir = tempdir().expect("Test I/O operation failed");
let path = dir.path().join("test_i32.parquet");
let data = Array1::from_vec(vec![10i32, 20, 30, 40, 50]);
write_parquet(&path, &data, Default::default()).expect("Test I/O operation failed");
let loaded = read_parquet(&path).expect("Test I/O operation failed");
assert_eq!(loaded.num_rows(), 5);
let column = loaded
.get_column_i32("value")
.expect("Test I/O operation failed");
assert_eq!(column[0], 10);
assert_eq!(column[4], 50);
}
#[test]
fn test_parquet_roundtrip_f32() {
let dir = tempdir().expect("Test I/O operation failed");
let path = dir.path().join("test_f32.parquet");
let data = Array1::from_vec(vec![1.5f32, 2.5, 3.5, 4.5]);
write_parquet(&path, &data, Default::default()).expect("Test I/O operation failed");
let loaded = read_parquet(&path).expect("Test I/O operation failed");
let column = loaded
.get_column_f32("value")
.expect("Test I/O operation failed");
assert_eq!(column.len(), 4);
assert!((column[0] - 1.5).abs() < 1e-6);
}
#[test]
fn test_custom_column_name() {
let dir = tempdir().expect("Test I/O operation failed");
let path = dir.path().join("test_named.parquet");
let data = Array1::from_vec(vec![1.0, 2.0, 3.0]);
write_parquet_with_name(&path, &data, "temperature", Default::default())
.expect("Test I/O operation failed");
let loaded = read_parquet(&path).expect("Test I/O operation failed");
assert_eq!(loaded.column_names(), vec!["temperature"]);
let column = loaded
.get_column_f64("temperature")
.expect("Test I/O operation failed");
assert_eq!(column.len(), 3);
}
#[test]
fn test_large_dataset() {
let dir = tempdir().expect("Test I/O operation failed");
let path = dir.path().join("test_large.parquet");
let data: Vec<f64> = (0..10000).map(|x| x as f64 * 0.1).collect();
let array = Array1::from_vec(data);
write_parquet(&path, &array, Default::default()).expect("Test I/O operation failed");
let loaded = read_parquet(&path).expect("Test I/O operation failed");
assert_eq!(loaded.num_rows(), 10000);
let column = loaded
.get_column_f64("value")
.expect("Test I/O operation failed");
assert!((column[0] - 0.0).abs() < 1e-10);
assert!((column[9999] - 999.9).abs() < 1e-6);
}
#[test]
fn test_schema_introspection() {
let dir = tempdir().expect("Test I/O operation failed");
let path = dir.path().join("test_schema.parquet");
let data = Array1::from_vec(vec![1.0, 2.0, 3.0]);
write_parquet_with_name(&path, &data, "measurements", Default::default())
.expect("Test I/O operation failed");
let loaded = read_parquet(&path).expect("Test I/O operation failed");
let schema = loaded.schema();
assert_eq!(schema.num_columns(), 1);
assert_eq!(schema.column_names(), vec!["measurements"]);
assert!(schema.field("measurements").is_some());
assert!(schema.field("nonexistent").is_none());
}
#[test]
fn test_options_builder_pattern() {
let dir = tempdir().expect("Test I/O operation failed");
let path = dir.path().join("test_builder.parquet");
let data = Array1::from_vec(vec![1.0, 2.0, 3.0]);
let options = ParquetWriteOptions::with_compression(CompressionCodec::Brotli)
.with_row_group_size(500)
.with_dictionary(false);
write_parquet(&path, &data, options).expect("Test I/O operation failed");
assert!(path.exists());
}
#[test]
fn test_error_invalid_column() {
let dir = tempdir().expect("Test I/O operation failed");
let path = dir.path().join("test_error.parquet");
let data = Array1::from_vec(vec![1.0, 2.0, 3.0]);
write_parquet(&path, &data, Default::default()).expect("Test I/O operation failed");
let loaded = read_parquet(&path).expect("Test I/O operation failed");
let result = loaded.get_column_f64("nonexistent");
assert!(result.is_err());
let result = loaded.get_column_i32("value");
assert!(result.is_err());
}
#[test]
fn test_data_accuracy() {
let dir = tempdir().expect("Test I/O operation failed");
let path = dir.path().join("test_accuracy.parquet");
let original = Array1::from_vec(vec![
1.23456789,
2.98765432,
std::f64::consts::PI,
4.71238898,
5.55555555,
]);
write_parquet(&path, &original, Default::default()).expect("Test I/O operation failed");
let loaded = read_parquet(&path).expect("Test I/O operation failed");
let recovered = loaded
.get_column_f64("value")
.expect("Test I/O operation failed");
assert_eq!(recovered.len(), original.len());
for (a, b) in original.iter().zip(recovered.iter()) {
assert!((a - b).abs() < 1e-10, "Value mismatch: {} vs {}", a, b);
}
}
#[test]
fn test_chunked_reading() {
let dir = tempdir().expect("Test I/O operation failed");
let path = dir.path().join("test_chunked.parquet");
let data: Vec<f64> = (0..100).map(|x| x as f64).collect();
let array = Array1::from_vec(data);
write_parquet(&path, &array, Default::default()).expect("Test I/O operation failed");
let chunks: Vec<_> = read_parquet_chunked(&path, 10)
.expect("Operation failed")
.collect::<Result<Vec<_>>>()
.expect("Test I/O operation failed");
assert_eq!(chunks.len(), 10);
let total_rows: usize = chunks.iter().map(|c| c.num_rows()).sum();
assert_eq!(total_rows, 100);
let first_chunk = &chunks[0];
let first_values = first_chunk
.get_column_f64("value")
.expect("Test I/O operation failed");
assert_eq!(first_values[0], 0.0);
assert_eq!(first_values[9], 9.0);
}
#[test]
fn test_chunked_column_selection() {
let dir = tempdir().expect("Test I/O operation failed");
let path = dir.path().join("test_chunked_cols.parquet");
let data = Array1::from_vec((0..50).map(|x| x as f64).collect::<Vec<_>>());
write_parquet_with_name(&path, &data, "column_a", Default::default())
.expect("Test I/O operation failed");
let chunks: Vec<_> = read_parquet_chunked_columns(&path, &["column_a"], 10)
.expect("Operation failed")
.collect::<Result<Vec<_>>>()
.expect("Test I/O operation failed");
assert_eq!(chunks.len(), 5); assert_eq!(chunks[0].num_columns(), 1);
assert_eq!(chunks[0].column_names(), vec!["column_a"]);
}
#[test]
fn test_chunked_reading_schema() {
let dir = tempdir().expect("Test I/O operation failed");
let path = dir.path().join("test_schema_chunk.parquet");
let data = Array1::from_vec(vec![1.0, 2.0, 3.0]);
write_parquet_with_name(&path, &data, "test_col", Default::default())
.expect("Test I/O operation failed");
let mut iterator = read_parquet_chunked(&path, 2).expect("Test I/O operation failed");
let schema = iterator.schema();
assert_eq!(schema.num_columns(), 1);
assert_eq!(schema.column_names(), vec!["test_col"]);
let chunks: Vec<_> = iterator
.collect::<Result<Vec<_>>>()
.expect("Test I/O operation failed");
assert_eq!(chunks.len(), 2); }
#[test]
fn test_chunked_memory_efficiency() {
let dir = tempdir().expect("Test I/O operation failed");
let path = dir.path().join("test_memory.parquet");
let data: Vec<f64> = (0..10000).map(|x| x as f64).collect();
let array = Array1::from_vec(data);
write_parquet(&path, &array, Default::default()).expect("Test I/O operation failed");
let mut row_count = 0;
let mut chunk_count = 0;
for chunk_result in read_parquet_chunked(&path, 100).expect("Operation failed") {
let chunk = chunk_result.expect("Test I/O operation failed");
row_count += chunk.num_rows();
chunk_count += 1;
assert!(chunk.num_rows() <= 100);
}
assert_eq!(row_count, 10000);
assert_eq!(chunk_count, 100); }
#[test]
fn test_empty_chunked_reading() {
let result = read_parquet_chunked("nonexistent.parquet", 10);
assert!(result.is_err());
}
#[test]
fn test_chunked_single_row() {
let dir = tempdir().expect("Test I/O operation failed");
let path = dir.path().join("test_single_row.parquet");
let data = Array1::from_vec(vec![42.0]);
write_parquet(&path, &data, Default::default()).expect("Test I/O operation failed");
let chunks: Vec<_> = read_parquet_chunked(&path, 10)
.expect("Operation failed")
.collect::<Result<Vec<_>>>()
.expect("Test I/O operation failed");
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0].num_rows(), 1);
let values = chunks[0]
.get_column_f64("value")
.expect("Test I/O operation failed");
assert_eq!(values[0], 42.0);
}
#[test]
fn test_statistics_api() {
let dir = tempdir().expect("Test I/O operation failed");
let path = dir.path().join("test_stats_api.parquet");
let data = Array1::from_vec(vec![10.0, 20.0, 30.0, 40.0, 50.0]);
write_parquet(&path, &data, Default::default()).expect("Test I/O operation failed");
let stats = read_parquet_statistics(&path).expect("Test I/O operation failed");
assert_eq!(stats.num_rows, 5);
assert!(stats.has_statistics());
assert!(stats.column_stats("value").is_some());
let col_stats = stats
.column_stats("value")
.expect("Test I/O operation failed");
assert_eq!(col_stats.num_values, 5);
assert!(!col_stats.has_nulls());
}
#[test]
fn test_predicate_filtering() {
let dir = tempdir().expect("Test I/O operation failed");
let path = dir.path().join("test_predicate.parquet");
let data = Array1::from_vec(vec![5.0, 15.0, 25.0, 35.0, 45.0]);
write_parquet(&path, &data, Default::default()).expect("Test I/O operation failed");
let predicate = ParquetPredicate::gt("value", PredicateValue::Float64(20.0));
let config = FilterConfig::new(predicate);
let result = read_parquet_filtered(&path, config);
assert!(result.is_ok());
let data = result.expect("Test I/O operation failed");
assert!(data.num_rows() > 0);
}
#[test]
fn test_predicate_chunked_filtering() {
let dir = tempdir().expect("Test I/O operation failed");
let path = dir.path().join("test_pred_chunked.parquet");
let data = Array1::from_vec((0..100).map(|x| x as f64).collect::<Vec<_>>());
write_parquet(&path, &data, Default::default()).expect("Test I/O operation failed");
let predicate = ParquetPredicate::gt("value", PredicateValue::Float64(50.0));
let config = FilterConfig::new(predicate).with_batch_size(10);
let chunks: Vec<_> = read_parquet_filtered_chunked(&path, config)
.expect("Operation failed")
.collect::<Result<Vec<_>>>()
.expect("Test I/O operation failed");
assert!(!chunks.is_empty());
}
#[test]
fn test_combined_statistics_and_predicates() {
let dir = tempdir().expect("Test I/O operation failed");
let path = dir.path().join("test_combined.parquet");
let data = Array1::from_vec(vec![1.0, 5.0, 10.0, 15.0, 20.0]);
write_parquet(&path, &data, Default::default()).expect("Test I/O operation failed");
let stats = read_parquet_statistics(&path).expect("Test I/O operation failed");
assert_eq!(stats.num_rows, 5);
let predicate = ParquetPredicate::gt("value", PredicateValue::Float64(8.0));
let config = FilterConfig::new(predicate);
let filtered = read_parquet_filtered(&path, config).expect("Test I/O operation failed");
assert!(filtered.num_rows() > 0);
}
#[test]
fn test_statistics_with_compression() {
let dir = tempdir().expect("Test I/O operation failed");
let path = dir.path().join("test_stats_compressed.parquet");
let data = Array1::from_vec((0..50).map(|x| x as f64).collect::<Vec<_>>());
let options = ParquetWriteOptions {
compression: CompressionCodec::Brotli,
enable_statistics: true,
..Default::default()
};
write_parquet(&path, &data, options).expect("Test I/O operation failed");
let stats = read_parquet_statistics(&path).expect("Test I/O operation failed");
assert_eq!(stats.num_rows, 50);
assert!(stats.has_statistics());
}
}