use crate::{infer_column_type, Column, Error, ReadOptions, Reader, Result, Row, Table, Value};
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::record::Field;
use std::path::Path;
#[derive(Default)]
pub struct ParquetReader;
impl ParquetReader {
#[must_use]
pub fn new() -> Self {
Self
}
}
impl Reader for ParquetReader {
fn extensions(&self) -> &[&'static str] {
&["parquet"]
}
fn name(&self) -> &'static str {
"parquet"
}
fn read(&self, path: &Path, options: &ReadOptions) -> Result<Table> {
let file = std::fs::File::open(path)?;
let reader = SerializedFileReader::new(file)
.map_err(|e| Error::ParseError(format!("parquet open failed: {e}")))?;
let metadata = reader.metadata();
let file_meta = metadata.file_metadata();
let row_count = file_meta.num_rows();
let schema = file_meta.schema();
let column_names: Vec<String> = schema
.get_fields()
.iter()
.map(|f| f.name().to_string())
.collect();
let row_iter = reader
.get_row_iter(None)
.map_err(|e| Error::ParseError(format!("parquet row-iter failed: {e}")))?;
let mut sample_rows: Vec<Row> = Vec::with_capacity(options.max_sample_rows);
for row_result in row_iter.take(options.max_sample_rows) {
let row = row_result
.map_err(|e| Error::ParseError(format!("parquet row read failed: {e}")))?;
let mut cells: Row = row
.get_column_iter()
.map(|(_, field)| field_to_value(field))
.collect();
while cells.len() < column_names.len() {
cells.push(Value::Null);
}
cells.truncate(column_names.len());
sample_rows.push(cells);
}
let columns: Vec<Column> = column_names
.iter()
.enumerate()
.map(|(idx, name)| {
let col_samples: Vec<Value> = sample_rows
.iter()
.map(|r| r.get(idx).cloned().unwrap_or(Value::Null))
.collect();
let (data_type, nullable) = infer_column_type(&col_samples);
Column {
name: name.clone(),
data_type,
nullable,
}
})
.collect();
let mut metadata_map = std::collections::HashMap::new();
let public_row_count: u64 = u64::try_from(row_count).unwrap_or(0);
metadata_map.insert(
"num_row_groups".into(),
metadata.num_row_groups().to_string(),
);
Ok(Table {
columns,
sample_rows,
row_count: Some(public_row_count),
metadata: metadata_map,
})
}
}
fn field_to_value(field: &Field) -> Value {
match field {
Field::Null => Value::Null,
Field::Bool(b) => Value::Bool(*b),
Field::Byte(i) => Value::Integer(i64::from(*i)),
Field::Short(i) => Value::Integer(i64::from(*i)),
Field::Int(i) => Value::Integer(i64::from(*i)),
Field::Long(i) => Value::Integer(*i),
Field::UByte(i) => Value::Integer(i64::from(*i)),
Field::UShort(i) => Value::Integer(i64::from(*i)),
Field::UInt(i) => Value::Integer(i64::from(*i)),
Field::ULong(i) => {
i64::try_from(*i).map_or_else(|_| Value::Text(i.to_string()), Value::Integer)
}
Field::Float(f) => Value::Float(f64::from(*f)),
Field::Double(f) => Value::Float(*f),
Field::Str(s) => Value::Text(s.clone()),
Field::Date(_) => Value::Date(format!("{field}")),
Field::TimestampMillis(_) | Field::TimestampMicros(_) => {
Value::DateTime(format!("{field}").replacen(' ', "T", 1))
}
other => Value::Text(format!("{other}")),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn extensions_is_parquet_only() {
assert_eq!(ParquetReader.extensions(), &["parquet"]);
}
#[test]
fn name_identifies_backend() {
assert_eq!(ParquetReader.name(), "parquet");
}
#[test]
fn missing_file_returns_io_error() {
let result = ParquetReader.read(Path::new("/nonexistent.parquet"), &ReadOptions::default());
assert!(matches!(result, Err(Error::Io(_))));
}
#[test]
fn invalid_parquet_returns_parse_error() {
use std::io::Write;
let mut f = tempfile::Builder::new()
.suffix(".parquet")
.tempfile()
.unwrap();
f.write_all(b"this is not a parquet file").unwrap();
f.flush().unwrap();
let result = ParquetReader.read(f.path(), &ReadOptions::default());
assert!(matches!(result, Err(Error::ParseError(_))));
}
#[test]
fn field_to_value_covers_basic_types() {
assert_eq!(field_to_value(&Field::Null), Value::Null);
assert_eq!(field_to_value(&Field::Bool(true)), Value::Bool(true));
assert_eq!(field_to_value(&Field::Int(42)), Value::Integer(42));
assert_eq!(
field_to_value(&Field::Long(-1_234_567_890)),
Value::Integer(-1_234_567_890)
);
assert_eq!(field_to_value(&Field::Double(2.5)), Value::Float(2.5));
assert_eq!(
field_to_value(&Field::Str("hi".into())),
Value::Text("hi".into())
);
}
#[test]
fn ulong_within_i64_range_stays_integer() {
let small = Field::ULong(42);
assert_eq!(field_to_value(&small), Value::Integer(42));
}
#[test]
fn ulong_beyond_i64_max_falls_back_to_text() {
let huge = Field::ULong(u64::MAX);
match field_to_value(&huge) {
Value::Text(s) => assert_eq!(s, "18446744073709551615"),
other => panic!("expected Text fallback for u64::MAX, got {other:?}"),
}
}
#[test]
#[ignore = "requires a real Parquet file at tests/fixtures/sample.parquet"]
fn extracts_schema_and_samples_from_real_parquet() {
let table = ParquetReader
.read(
Path::new("tests/fixtures/sample.parquet"),
&ReadOptions::default(),
)
.expect("read failed");
assert!(!table.columns.is_empty());
assert!(!table.sample_rows.is_empty());
}
}