use std::path::Path;
use crate::error::{IngestionError, IngestionResult};
use crate::types::{DataSet, DataType, Schema};
use polars::prelude::*;
use super::polars_bridge::{dataframe_to_dataset, polars_error_to_ingestion};
pub fn ingest_parquet_from_path(
path: impl AsRef<Path>,
schema: &Schema,
) -> IngestionResult<DataSet> {
let path = path.as_ref();
let df = LazyFrame::scan_parquet(
path.to_string_lossy().as_ref().into(),
ScanArgsParquet::default(),
)
.map_err(|e| polars_error_to_ingestion("failed to read parquet with polars", e))?
.collect()
.map_err(|e| polars_error_to_ingestion("failed to collect parquet with polars", e))?;
validate_parquet_column_types(&df, schema)?;
dataframe_to_dataset(&df, schema, "column", 1)
}
fn validate_parquet_column_types(df: &DataFrame, schema: &Schema) -> IngestionResult<()> {
for field in &schema.fields {
let s = df
.column(&field.name)
.map_err(|_| IngestionError::SchemaMismatch {
message: format!("missing required column '{}'", field.name),
})?
.as_materialized_series();
if !dtype_compatible_with_schema(&field.data_type, s.dtype()) {
return Err(IngestionError::ParseError {
row: 1,
column: field.name.clone(),
raw: s.dtype().to_string(),
message: "parquet column type mismatch".to_string(),
});
}
}
Ok(())
}
fn dtype_compatible_with_schema(
schema_dtype: &DataType,
polars_dtype: &polars::datatypes::DataType,
) -> bool {
use polars::datatypes::DataType as P;
match schema_dtype {
DataType::Utf8 => matches!(polars_dtype, P::String),
DataType::Bool => matches!(polars_dtype, P::Boolean),
DataType::Int64 => matches!(
polars_dtype,
P::Int8 | P::Int16 | P::Int32 | P::Int64 | P::UInt8 | P::UInt16 | P::UInt32 | P::UInt64
),
DataType::Float64 => matches!(
polars_dtype,
P::Float32
| P::Float64
| P::Int8
| P::Int16
| P::Int32
| P::Int64
| P::UInt8
| P::UInt16
| P::UInt32
| P::UInt64
),
}
}