mod avro;
#[cfg(test)]
mod chunked_store;
mod csv;
mod file_stream;
mod json;
mod parquet;
pub(crate) use self::csv::plan_to_csv;
pub use self::csv::CsvExec;
pub(crate) use self::parquet::plan_to_parquet;
pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory};
use arrow::{
array::{ArrayData, ArrayRef, DictionaryArray},
buffer::Buffer,
datatypes::{DataType, Field, Schema, SchemaRef, UInt16Type},
record_batch::RecordBatch,
};
pub use avro::AvroExec;
use datafusion_physical_expr::PhysicalSortExpr;
pub use file_stream::{FileOpenFuture, FileOpener, FileStream};
pub(crate) use json::plan_to_json;
pub use json::NdJsonExec;
use crate::datasource::{
listing::{FileRange, PartitionedFile},
object_store::ObjectStoreUrl,
};
use crate::{
error::{DataFusionError, Result},
scalar::ScalarValue,
};
use arrow::array::{new_null_array, UInt16BufferBuilder};
use arrow::record_batch::RecordBatchOptions;
use log::{debug, info};
use object_store::path::Path;
use object_store::ObjectMeta;
use std::{
collections::HashMap,
fmt::{Display, Formatter, Result as FmtResult},
sync::Arc,
vec,
};
use super::{ColumnStatistics, Statistics};
pub fn partition_type_wrap(val_type: DataType) -> DataType {
DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type))
}
#[derive(Debug, Clone)]
pub struct FileScanConfig {
pub object_store_url: ObjectStoreUrl,
pub file_schema: SchemaRef,
pub file_groups: Vec<Vec<PartitionedFile>>,
pub statistics: Statistics,
pub projection: Option<Vec<usize>>,
pub limit: Option<usize>,
pub table_partition_cols: Vec<(String, DataType)>,
pub output_ordering: Option<Vec<PhysicalSortExpr>>,
pub infinite_source: bool,
}
impl FileScanConfig {
fn project(&self) -> (SchemaRef, Statistics) {
if self.projection.is_none() && self.table_partition_cols.is_empty() {
return (Arc::clone(&self.file_schema), self.statistics.clone());
}
let proj_iter: Box<dyn Iterator<Item = usize>> = match &self.projection {
Some(proj) => Box::new(proj.iter().copied()),
None => Box::new(
0..(self.file_schema.fields().len() + self.table_partition_cols.len()),
),
};
let mut table_fields = vec![];
let mut table_cols_stats = vec![];
for idx in proj_iter {
if idx < self.file_schema.fields().len() {
table_fields.push(self.file_schema.field(idx).clone());
if let Some(file_cols_stats) = &self.statistics.column_statistics {
table_cols_stats.push(file_cols_stats[idx].clone())
} else {
table_cols_stats.push(ColumnStatistics::default())
}
} else {
let partition_idx = idx - self.file_schema.fields().len();
table_fields.push(Field::new(
&self.table_partition_cols[partition_idx].0,
self.table_partition_cols[partition_idx].1.to_owned(),
false,
));
table_cols_stats.push(ColumnStatistics::default())
}
}
let table_stats = Statistics {
num_rows: self.statistics.num_rows,
is_exact: self.statistics.is_exact,
total_byte_size: None,
column_statistics: Some(table_cols_stats),
};
let table_schema = Arc::new(
Schema::new(table_fields).with_metadata(self.file_schema.metadata().clone()),
);
(table_schema, table_stats)
}
#[allow(unused)] fn projected_file_column_names(&self) -> Option<Vec<String>> {
self.projection.as_ref().map(|p| {
p.iter()
.filter(|col_idx| **col_idx < self.file_schema.fields().len())
.map(|col_idx| self.file_schema.field(*col_idx).name())
.cloned()
.collect()
})
}
fn file_column_projection_indices(&self) -> Option<Vec<usize>> {
self.projection.as_ref().map(|p| {
p.iter()
.filter(|col_idx| **col_idx < self.file_schema.fields().len())
.copied()
.collect()
})
}
}
#[derive(Debug)]
struct FileGroupsDisplay<'a>(&'a [Vec<PartitionedFile>]);
impl<'a> Display for FileGroupsDisplay<'a> {
fn fmt(&self, f: &mut Formatter) -> FmtResult {
let mut first_group = true;
let groups = if self.0.len() == 1 { "group" } else { "groups" };
write!(f, "{{{} {}: [", self.0.len(), groups)?;
for group in self.0 {
if !first_group {
write!(f, ", ")?;
}
first_group = false;
write!(f, "[")?;
let mut first_file = true;
for pf in group {
if !first_file {
write!(f, ", ")?;
}
first_file = false;
write!(f, "{}", pf.object_meta.location.as_ref())?;
if let Some(range) = pf.range.as_ref() {
write!(f, ":{}..{}", range.start, range.end)?;
}
}
write!(f, "]")?;
}
write!(f, "]}}")?;
Ok(())
}
}
#[derive(Debug)]
struct ProjectSchemaDisplay<'a>(&'a SchemaRef);
impl<'a> Display for ProjectSchemaDisplay<'a> {
fn fmt(&self, f: &mut Formatter) -> FmtResult {
let parts: Vec<_> = self
.0
.fields()
.iter()
.map(|x| x.name().to_owned())
.collect::<Vec<String>>();
write!(f, "[{}]", parts.join(", "))
}
}
#[derive(Clone, Debug)]
pub(crate) struct SchemaAdapter {
table_schema: SchemaRef,
}
impl SchemaAdapter {
pub(crate) fn new(table_schema: SchemaRef) -> SchemaAdapter {
Self { table_schema }
}
pub(crate) fn map_column_index(
&self,
index: usize,
file_schema: &Schema,
) -> Option<usize> {
let field = self.table_schema.field(index);
file_schema.index_of(field.name()).ok()
}
pub fn map_projections(
&self,
file_schema: &Schema,
projections: &[usize],
) -> Result<Vec<usize>> {
let mut mapped: Vec<usize> = vec![];
for idx in projections {
let field = self.table_schema.field(*idx);
if let Ok(mapped_idx) = file_schema.index_of(field.name().as_str()) {
if file_schema.field(mapped_idx).data_type() == field.data_type() {
mapped.push(mapped_idx)
} else {
let msg = format!("Failed to map column projection for field {}. Incompatible data types {:?} and {:?}", field.name(), file_schema.field(mapped_idx).data_type(), field.data_type());
info!("{}", msg);
return Err(DataFusionError::Execution(msg));
}
}
}
Ok(mapped)
}
pub fn adapt_batch(
&self,
batch: RecordBatch,
projections: &[usize],
) -> Result<RecordBatch> {
let batch_rows = batch.num_rows();
let batch_schema = batch.schema();
let mut cols: Vec<ArrayRef> = Vec::with_capacity(batch.columns().len());
let batch_cols = batch.columns().to_vec();
for field_idx in projections {
let table_field = &self.table_schema.fields()[*field_idx];
if let Some((batch_idx, _name)) =
batch_schema.column_with_name(table_field.name().as_str())
{
cols.push(batch_cols[batch_idx].clone());
} else {
cols.push(new_null_array(table_field.data_type(), batch_rows))
}
}
let projected_schema = Arc::new(self.table_schema.clone().project(projections)?);
let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
Ok(RecordBatch::try_new_with_options(
projected_schema,
cols,
&options,
)?)
}
}
struct PartitionColumnProjector {
key_buffer_cache: Option<Buffer>,
projected_partition_indexes: Vec<(usize, usize)>,
projected_schema: SchemaRef,
}
impl PartitionColumnProjector {
fn new(projected_schema: SchemaRef, table_partition_cols: &[String]) -> Self {
let mut idx_map = HashMap::new();
for (partition_idx, partition_name) in table_partition_cols.iter().enumerate() {
if let Ok(schema_idx) = projected_schema.index_of(partition_name) {
idx_map.insert(partition_idx, schema_idx);
}
}
let mut projected_partition_indexes: Vec<_> = idx_map.into_iter().collect();
projected_partition_indexes.sort_by(|(_, a), (_, b)| a.cmp(b));
Self {
projected_partition_indexes,
key_buffer_cache: None,
projected_schema,
}
}
fn project(
&mut self,
file_batch: RecordBatch,
partition_values: &[ScalarValue],
) -> Result<RecordBatch> {
let expected_cols =
self.projected_schema.fields().len() - self.projected_partition_indexes.len();
if file_batch.columns().len() != expected_cols {
return Err(DataFusionError::Execution(format!(
"Unexpected batch schema from file, expected {} cols but got {}",
expected_cols,
file_batch.columns().len()
)));
}
let mut cols = file_batch.columns().to_vec();
for &(pidx, sidx) in &self.projected_partition_indexes {
cols.insert(
sidx,
create_dict_array(
&mut self.key_buffer_cache,
&partition_values[pidx],
file_batch.num_rows(),
),
)
}
RecordBatch::try_new(Arc::clone(&self.projected_schema), cols).map_err(Into::into)
}
}
fn create_dict_array(
key_buffer_cache: &mut Option<Buffer>,
val: &ScalarValue,
len: usize,
) -> ArrayRef {
let dict_vals = val.to_array();
let sliced_key_buffer = match key_buffer_cache {
Some(buf) if buf.len() >= len * 2 => buf.slice(buf.len() - len * 2),
_ => {
let mut key_buffer_builder = UInt16BufferBuilder::new(len * 2);
key_buffer_builder.advance(len * 2); key_buffer_cache.insert(key_buffer_builder.finish()).clone()
}
};
let data_type = partition_type_wrap(val.get_datatype());
let mut builder = ArrayData::builder(data_type)
.len(len)
.add_buffer(sliced_key_buffer);
builder = builder.add_child_data(dict_vals.data().clone());
Arc::new(DictionaryArray::<UInt16Type>::from(
builder.build().unwrap(),
))
}
pub struct FileMeta {
pub object_meta: ObjectMeta,
pub range: Option<FileRange>,
pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
}
impl FileMeta {
pub fn location(&self) -> &Path {
&self.object_meta.location
}
}
impl From<ObjectMeta> for FileMeta {
fn from(object_meta: ObjectMeta) -> Self {
Self {
object_meta,
range: None,
extensions: None,
}
}
}
pub(crate) fn get_output_ordering(
base_config: &FileScanConfig,
) -> Option<&[PhysicalSortExpr]> {
base_config.output_ordering.as_ref()
.map(|output_ordering| if base_config.file_groups.iter().any(|group| group.len() > 1) {
debug!("Skipping specified output ordering {:?}. Some file group had more than one file: {:?}",
output_ordering, base_config.file_groups);
None
} else {
Some(output_ordering.as_slice())
}).unwrap_or_else(|| None)
}
#[cfg(test)]
mod tests {
use chrono::Utc;
use crate::{
test::{build_table_i32, columns},
test_util::aggr_test_schema,
};
use super::*;
#[test]
fn physical_plan_config_no_projection() {
let file_schema = aggr_test_schema();
let conf = config_for_projection(
Arc::clone(&file_schema),
None,
Statistics::default(),
vec![("date".to_owned(), partition_type_wrap(DataType::Utf8))],
);
let (proj_schema, proj_statistics) = conf.project();
assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
assert_eq!(
proj_schema.field(file_schema.fields().len()).name(),
"date",
"partition columns are the last columns"
);
assert_eq!(
proj_statistics
.column_statistics
.expect("projection creates column statistics")
.len(),
file_schema.fields().len() + 1
);
let col_names = conf.projected_file_column_names();
assert_eq!(col_names, None);
let col_indices = conf.file_column_projection_indices();
assert_eq!(col_indices, None);
}
#[test]
fn physical_plan_config_with_projection() {
let file_schema = aggr_test_schema();
let conf = config_for_projection(
Arc::clone(&file_schema),
Some(vec![file_schema.fields().len(), 0]),
Statistics {
num_rows: Some(10),
column_statistics: Some(
(0..file_schema.fields().len())
.map(|i| ColumnStatistics {
distinct_count: Some(i),
..Default::default()
})
.collect(),
),
..Default::default()
},
vec![("date".to_owned(), partition_type_wrap(DataType::Utf8))],
);
let (proj_schema, proj_statistics) = conf.project();
assert_eq!(
columns(&proj_schema),
vec!["date".to_owned(), "c1".to_owned()]
);
let proj_stat_cols = proj_statistics
.column_statistics
.expect("projection creates column statistics");
assert_eq!(proj_stat_cols.len(), 2);
assert_eq!(proj_stat_cols[1].distinct_count, Some(0));
let col_names = conf.projected_file_column_names();
assert_eq!(col_names, Some(vec!["c1".to_owned()]));
let col_indices = conf.file_column_projection_indices();
assert_eq!(col_indices, Some(vec![0]));
}
#[test]
fn partition_column_projector() {
let file_batch = build_table_i32(
("a", &vec![0, 1, 2]),
("b", &vec![-2, -1, 0]),
("c", &vec![10, 11, 12]),
);
let partition_cols = vec![
("year".to_owned(), partition_type_wrap(DataType::Utf8)),
("month".to_owned(), partition_type_wrap(DataType::Utf8)),
("day".to_owned(), partition_type_wrap(DataType::Utf8)),
];
let conf = config_for_projection(
file_batch.schema(),
Some(vec![
0,
1,
2,
file_batch.schema().fields().len(),
file_batch.schema().fields().len() + 2,
]),
Statistics::default(),
partition_cols.clone(),
);
let (proj_schema, _) = conf.project();
let mut proj = PartitionColumnProjector::new(
proj_schema,
&partition_cols
.iter()
.map(|x| x.0.clone())
.collect::<Vec<_>>(),
);
let projected_batch = proj
.project(
file_batch,
&[
ScalarValue::Utf8(Some("2021".to_owned())),
ScalarValue::Utf8(Some("10".to_owned())),
ScalarValue::Utf8(Some("26".to_owned())),
],
)
.expect("Projection of partition columns into record batch failed");
let expected = vec![
"+---+----+----+------+-----+",
"| a | b | c | year | day |",
"+---+----+----+------+-----+",
"| 0 | -2 | 10 | 2021 | 26 |",
"| 1 | -1 | 11 | 2021 | 26 |",
"| 2 | 0 | 12 | 2021 | 26 |",
"+---+----+----+------+-----+",
];
crate::assert_batches_eq!(expected, &[projected_batch]);
let file_batch = build_table_i32(
("a", &vec![5, 6, 7, 8, 9]),
("b", &vec![-10, -9, -8, -7, -6]),
("c", &vec![12, 13, 14, 15, 16]),
);
let projected_batch = proj
.project(
file_batch,
&[
ScalarValue::Utf8(Some("2021".to_owned())),
ScalarValue::Utf8(Some("10".to_owned())),
ScalarValue::Utf8(Some("27".to_owned())),
],
)
.expect("Projection of partition columns into record batch failed");
let expected = vec![
"+---+-----+----+------+-----+",
"| a | b | c | year | day |",
"+---+-----+----+------+-----+",
"| 5 | -10 | 12 | 2021 | 27 |",
"| 6 | -9 | 13 | 2021 | 27 |",
"| 7 | -8 | 14 | 2021 | 27 |",
"| 8 | -7 | 15 | 2021 | 27 |",
"| 9 | -6 | 16 | 2021 | 27 |",
"+---+-----+----+------+-----+",
];
crate::assert_batches_eq!(expected, &[projected_batch]);
let file_batch = build_table_i32(
("a", &vec![0, 1, 3]),
("b", &vec![2, 3, 4]),
("c", &vec![4, 5, 6]),
);
let projected_batch = proj
.project(
file_batch,
&[
ScalarValue::Utf8(Some("2021".to_owned())),
ScalarValue::Utf8(Some("10".to_owned())),
ScalarValue::Utf8(Some("28".to_owned())),
],
)
.expect("Projection of partition columns into record batch failed");
let expected = vec![
"+---+---+---+------+-----+",
"| a | b | c | year | day |",
"+---+---+---+------+-----+",
"| 0 | 2 | 4 | 2021 | 28 |",
"| 1 | 3 | 5 | 2021 | 28 |",
"| 3 | 4 | 6 | 2021 | 28 |",
"+---+---+---+------+-----+",
];
crate::assert_batches_eq!(expected, &[projected_batch]);
}
#[test]
fn schema_adapter_adapt_projections() {
let table_schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Utf8, true),
Field::new("c2", DataType::Int64, true),
Field::new("c3", DataType::Int8, true),
]));
let file_schema = Schema::new(vec![
Field::new("c1", DataType::Utf8, true),
Field::new("c2", DataType::Int64, true),
]);
let file_schema_2 = Arc::new(Schema::new(vec![
Field::new("c3", DataType::Int8, true),
Field::new("c2", DataType::Int64, true),
]));
let file_schema_3 =
Arc::new(Schema::new(vec![Field::new("c3", DataType::Float32, true)]));
let adapter = SchemaAdapter::new(table_schema);
let projections1: Vec<usize> = vec![0, 1, 2];
let projections2: Vec<usize> = vec![2];
let mapped = adapter
.map_projections(&file_schema, projections1.as_slice())
.expect("mapping projections");
assert_eq!(mapped, vec![0, 1]);
let mapped = adapter
.map_projections(&file_schema, projections2.as_slice())
.expect("mapping projections");
assert!(mapped.is_empty());
let mapped = adapter
.map_projections(&file_schema_2, projections1.as_slice())
.expect("mapping projections");
assert_eq!(mapped, vec![1, 0]);
let mapped = adapter
.map_projections(&file_schema_2, projections2.as_slice())
.expect("mapping projections");
assert_eq!(mapped, vec![0]);
let mapped = adapter.map_projections(&file_schema_3, projections1.as_slice());
assert!(mapped.is_err());
}
fn config_for_projection(
file_schema: SchemaRef,
projection: Option<Vec<usize>>,
statistics: Statistics,
table_partition_cols: Vec<(String, DataType)>,
) -> FileScanConfig {
FileScanConfig {
file_schema,
file_groups: vec![vec![]],
limit: None,
object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
projection,
statistics,
table_partition_cols,
output_ordering: None,
infinite_source: false,
}
}
#[test]
fn file_groups_display_empty() {
let expected = "{0 groups: []}";
assert_eq!(&FileGroupsDisplay(&[]).to_string(), expected);
}
#[test]
fn file_groups_display_one() {
let files = [vec![partitioned_file("foo"), partitioned_file("bar")]];
let expected = "{1 group: [[foo, bar]]}";
assert_eq!(&FileGroupsDisplay(&files).to_string(), expected);
}
#[test]
fn file_groups_display_many() {
let files = [
vec![partitioned_file("foo"), partitioned_file("bar")],
vec![partitioned_file("baz")],
vec![],
];
let expected = "{3 groups: [[foo, bar], [baz], []]}";
assert_eq!(&FileGroupsDisplay(&files).to_string(), expected);
}
fn partitioned_file(path: &str) -> PartitionedFile {
let object_meta = ObjectMeta {
location: object_store::path::Path::parse(path).unwrap(),
last_modified: Utc::now(),
size: 42,
};
PartitionedFile {
object_meta,
partition_values: vec![],
range: None,
extensions: None,
}
}
}