use std::any::Any;
use std::string::String;
use std::sync::Arc;
use arrow::datatypes::*;
use crate::datasource::datasource::Statistics;
use crate::datasource::TableProvider;
use crate::error::Result;
use crate::logical_plan::Expr;
use crate::physical_plan::parquet::ParquetExec;
use crate::physical_plan::ExecutionPlan;
pub struct ParquetTable {
path: String,
schema: SchemaRef,
statistics: Statistics,
max_concurrency: usize,
}
impl ParquetTable {
pub fn try_new(path: &str, max_concurrency: usize) -> Result<Self> {
let parquet_exec = ParquetExec::try_from_path(path, None, 0, 1)?;
let schema = parquet_exec.schema();
Ok(Self {
path: path.to_string(),
schema,
statistics: parquet_exec.statistics().to_owned(),
max_concurrency,
})
}
pub fn path(&self) -> &str {
&self.path
}
}
impl TableProvider for ParquetTable {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn scan(
&self,
projection: &Option<Vec<usize>>,
batch_size: usize,
_filters: &[Expr],
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(ParquetExec::try_from_path(
&self.path,
projection.clone(),
batch_size,
self.max_concurrency,
)?))
}
fn statistics(&self) -> Statistics {
self.statistics.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
use arrow::array::{
BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
TimestampNanosecondArray,
};
use arrow::record_batch::RecordBatch;
use futures::StreamExt;
#[tokio::test]
async fn read_small_batches() -> Result<()> {
let table = load_table("alltypes_plain.parquet")?;
let projection = None;
let exec = table.scan(&projection, 2, &[])?;
let stream = exec.execute(0).await?;
let count = stream
.map(|batch| {
let batch = batch.unwrap();
assert_eq!(11, batch.num_columns());
assert_eq!(2, batch.num_rows());
})
.fold(0, |acc, _| async move { acc + 1i32 })
.await;
assert_eq!(4, count);
assert_eq!(table.statistics().num_rows, Some(8));
assert_eq!(table.statistics().total_byte_size, Some(671));
Ok(())
}
#[tokio::test]
async fn read_alltypes_plain_parquet() -> Result<()> {
let table = load_table("alltypes_plain.parquet")?;
let x: Vec<String> = table
.schema()
.fields()
.iter()
.map(|f| format!("{}: {:?}", f.name(), f.data_type()))
.collect();
let y = x.join("\n");
assert_eq!(
"id: Int32\n\
bool_col: Boolean\n\
tinyint_col: Int32\n\
smallint_col: Int32\n\
int_col: Int32\n\
bigint_col: Int64\n\
float_col: Float32\n\
double_col: Float64\n\
date_string_col: Binary\n\
string_col: Binary\n\
timestamp_col: Timestamp(Nanosecond, None)",
y
);
let projection = None;
let batch = get_first_batch(table, &projection).await?;
assert_eq!(11, batch.num_columns());
assert_eq!(8, batch.num_rows());
Ok(())
}
#[tokio::test]
async fn read_bool_alltypes_plain_parquet() -> Result<()> {
let table = load_table("alltypes_plain.parquet")?;
let projection = Some(vec![1]);
let batch = get_first_batch(table, &projection).await?;
assert_eq!(1, batch.num_columns());
assert_eq!(8, batch.num_rows());
let array = batch
.column(0)
.as_any()
.downcast_ref::<BooleanArray>()
.unwrap();
let mut values: Vec<bool> = vec![];
for i in 0..batch.num_rows() {
values.push(array.value(i));
}
assert_eq!(
"[true, false, true, false, true, false, true, false]",
format!("{:?}", values)
);
Ok(())
}
#[tokio::test]
async fn read_i32_alltypes_plain_parquet() -> Result<()> {
let table = load_table("alltypes_plain.parquet")?;
let projection = Some(vec![0]);
let batch = get_first_batch(table, &projection).await?;
assert_eq!(1, batch.num_columns());
assert_eq!(8, batch.num_rows());
let array = batch
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
let mut values: Vec<i32> = vec![];
for i in 0..batch.num_rows() {
values.push(array.value(i));
}
assert_eq!("[4, 5, 6, 7, 2, 3, 0, 1]", format!("{:?}", values));
Ok(())
}
#[tokio::test]
async fn read_i96_alltypes_plain_parquet() -> Result<()> {
let table = load_table("alltypes_plain.parquet")?;
let projection = Some(vec![10]);
let batch = get_first_batch(table, &projection).await?;
assert_eq!(1, batch.num_columns());
assert_eq!(8, batch.num_rows());
let array = batch
.column(0)
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap();
let mut values: Vec<i64> = vec![];
for i in 0..batch.num_rows() {
values.push(array.value(i));
}
assert_eq!("[1235865600000000000, 1235865660000000000, 1238544000000000000, 1238544060000000000, 1233446400000000000, 1233446460000000000, 1230768000000000000, 1230768060000000000]", format!("{:?}", values));
Ok(())
}
#[tokio::test]
async fn read_f32_alltypes_plain_parquet() -> Result<()> {
let table = load_table("alltypes_plain.parquet")?;
let projection = Some(vec![6]);
let batch = get_first_batch(table, &projection).await?;
assert_eq!(1, batch.num_columns());
assert_eq!(8, batch.num_rows());
let array = batch
.column(0)
.as_any()
.downcast_ref::<Float32Array>()
.unwrap();
let mut values: Vec<f32> = vec![];
for i in 0..batch.num_rows() {
values.push(array.value(i));
}
assert_eq!(
"[0.0, 1.1, 0.0, 1.1, 0.0, 1.1, 0.0, 1.1]",
format!("{:?}", values)
);
Ok(())
}
#[tokio::test]
async fn read_f64_alltypes_plain_parquet() -> Result<()> {
let table = load_table("alltypes_plain.parquet")?;
let projection = Some(vec![7]);
let batch = get_first_batch(table, &projection).await?;
assert_eq!(1, batch.num_columns());
assert_eq!(8, batch.num_rows());
let array = batch
.column(0)
.as_any()
.downcast_ref::<Float64Array>()
.unwrap();
let mut values: Vec<f64> = vec![];
for i in 0..batch.num_rows() {
values.push(array.value(i));
}
assert_eq!(
"[0.0, 10.1, 0.0, 10.1, 0.0, 10.1, 0.0, 10.1]",
format!("{:?}", values)
);
Ok(())
}
#[tokio::test]
async fn read_binary_alltypes_plain_parquet() -> Result<()> {
let table = load_table("alltypes_plain.parquet")?;
let projection = Some(vec![9]);
let batch = get_first_batch(table, &projection).await?;
assert_eq!(1, batch.num_columns());
assert_eq!(8, batch.num_rows());
let array = batch
.column(0)
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap();
let mut values: Vec<&str> = vec![];
for i in 0..batch.num_rows() {
values.push(std::str::from_utf8(array.value(i)).unwrap());
}
assert_eq!(
"[\"0\", \"1\", \"0\", \"1\", \"0\", \"1\", \"0\", \"1\"]",
format!("{:?}", values)
);
Ok(())
}
fn load_table(name: &str) -> Result<Box<dyn TableProvider>> {
let testdata = arrow::util::test_util::parquet_test_data();
let filename = format!("{}/{}", testdata, name);
let table = ParquetTable::try_new(&filename, 2)?;
Ok(Box::new(table))
}
async fn get_first_batch(
table: Box<dyn TableProvider>,
projection: &Option<Vec<usize>>,
) -> Result<RecordBatch> {
let exec = table.scan(projection, 1024, &[])?;
let mut it = exec.execute(0).await?;
it.next()
.await
.expect("should have received at least one batch")
.map_err(|e| e.into())
}
}