mod arrow_file;
mod avro;
mod csv;
mod file_groups;
mod file_scan_config;
mod file_stream;
mod json;
#[cfg(feature = "parquet")]
pub mod parquet;
mod statistics;
pub(crate) use self::csv::plan_to_csv;
pub(crate) use self::json::plan_to_json;
#[cfg(feature = "parquet")]
pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory};
pub use arrow_file::ArrowExec;
pub use avro::AvroExec;
pub use csv::{CsvConfig, CsvExec, CsvExecBuilder, CsvOpener};
use datafusion_expr::dml::InsertOp;
pub use file_groups::FileGroupPartitioner;
pub use file_scan_config::{
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig,
};
pub use file_stream::{FileOpenFuture, FileOpener, FileStream, OnError};
pub use json::{JsonOpener, NdJsonExec};
use std::{
fmt::{Debug, Formatter, Result as FmtResult},
ops::Range,
sync::Arc,
vec,
};
use super::{file_format::write::demux::start_demuxer_task, listing::ListingTableUrl};
use crate::datasource::file_format::write::demux::DemuxedStreamReceiver;
use crate::error::Result;
use crate::physical_plan::{DisplayAs, DisplayFormatType};
use crate::{
datasource::{
listing::{FileRange, PartitionedFile},
object_store::ObjectStoreUrl,
},
physical_plan::display::{display_orderings, ProjectSchemaDisplay},
};
use arrow::datatypes::{DataType, SchemaRef};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::insert::DataSink;
use async_trait::async_trait;
use futures::StreamExt;
use log::debug;
use object_store::{path::Path, GetOptions, GetRange, ObjectMeta, ObjectStore};
#[async_trait]
pub trait FileSink: DataSink {
fn config(&self) -> &FileSinkConfig;
async fn spawn_writer_tasks_and_join(
&self,
context: &Arc<TaskContext>,
demux_task: SpawnedTask<Result<()>>,
file_stream_rx: DemuxedStreamReceiver,
object_store: Arc<dyn ObjectStore>,
) -> Result<u64>;
async fn write_all(
&self,
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
let config = self.config();
let object_store = context
.runtime_env()
.object_store(&config.object_store_url)?;
let (demux_task, file_stream_rx) = start_demuxer_task(config, data, context);
self.spawn_writer_tasks_and_join(
context,
demux_task,
file_stream_rx,
object_store,
)
.await
}
}
pub struct FileSinkConfig {
pub object_store_url: ObjectStoreUrl,
pub file_groups: Vec<PartitionedFile>,
pub table_paths: Vec<ListingTableUrl>,
pub output_schema: SchemaRef,
pub table_partition_cols: Vec<(String, DataType)>,
pub insert_op: InsertOp,
pub keep_partition_by_columns: bool,
pub file_extension: String,
}
impl FileSinkConfig {
pub fn output_schema(&self) -> &SchemaRef {
&self.output_schema
}
}
impl Debug for FileScanConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(f, "object_store_url={:?}, ", self.object_store_url)?;
write!(f, "statistics={:?}, ", self.statistics)?;
DisplayAs::fmt_as(self, DisplayFormatType::Verbose, f)
}
}
impl DisplayAs for FileScanConfig {
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
let (schema, _, _, orderings) = self.project();
write!(f, "file_groups=")?;
FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
if !schema.fields().is_empty() {
write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
}
if let Some(limit) = self.limit {
write!(f, ", limit={limit}")?;
}
display_orderings(f, &orderings)?;
if !self.constraints.is_empty() {
write!(f, ", {}", self.constraints)?;
}
Ok(())
}
}
#[derive(Debug)]
struct FileGroupsDisplay<'a>(&'a [Vec<PartitionedFile>]);
impl DisplayAs for FileGroupsDisplay<'_> {
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
let n_groups = self.0.len();
let groups = if n_groups == 1 { "group" } else { "groups" };
write!(f, "{{{n_groups} {groups}: [")?;
match t {
DisplayFormatType::Default => {
let max_groups = 5;
fmt_up_to_n_elements(self.0, max_groups, f, |group, f| {
FileGroupDisplay(group).fmt_as(t, f)
})?;
}
DisplayFormatType::Verbose => {
fmt_elements_split_by_commas(self.0.iter(), f, |group, f| {
FileGroupDisplay(group).fmt_as(t, f)
})?
}
}
write!(f, "]}}")
}
}
#[derive(Debug)]
pub(crate) struct FileGroupDisplay<'a>(pub &'a [PartitionedFile]);
impl DisplayAs for FileGroupDisplay<'_> {
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
write!(f, "[")?;
match t {
DisplayFormatType::Default => {
let max_files = 5;
fmt_up_to_n_elements(self.0, max_files, f, |pf, f| {
write!(f, "{}", pf.object_meta.location.as_ref())?;
if let Some(range) = pf.range.as_ref() {
write!(f, ":{}..{}", range.start, range.end)?;
}
Ok(())
})?
}
DisplayFormatType::Verbose => {
fmt_elements_split_by_commas(self.0.iter(), f, |pf, f| {
write!(f, "{}", pf.object_meta.location.as_ref())?;
if let Some(range) = pf.range.as_ref() {
write!(f, ":{}..{}", range.start, range.end)?;
}
Ok(())
})?
}
}
write!(f, "]")
}
}
fn fmt_up_to_n_elements<E, F>(
elements: &[E],
n: usize,
f: &mut Formatter,
format_element: F,
) -> FmtResult
where
F: Fn(&E, &mut Formatter) -> FmtResult,
{
let len = elements.len();
fmt_elements_split_by_commas(elements.iter().take(n), f, |element, f| {
format_element(element, f)
})?;
if len > n {
write!(f, ", ...")?;
}
Ok(())
}
fn fmt_elements_split_by_commas<E, I, F>(
iter: I,
f: &mut Formatter,
format_element: F,
) -> FmtResult
where
I: Iterator<Item = E>,
F: Fn(E, &mut Formatter) -> FmtResult,
{
for (idx, element) in iter.enumerate() {
if idx > 0 {
write!(f, ", ")?;
}
format_element(element, f)?;
}
Ok(())
}
pub struct FileMeta {
pub object_meta: ObjectMeta,
pub range: Option<FileRange>,
pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
pub metadata_size_hint: Option<usize>,
}
impl FileMeta {
pub fn location(&self) -> &Path {
&self.object_meta.location
}
}
impl From<ObjectMeta> for FileMeta {
fn from(object_meta: ObjectMeta) -> Self {
Self {
object_meta,
range: None,
extensions: None,
metadata_size_hint: None,
}
}
}
fn get_projected_output_ordering(
base_config: &FileScanConfig,
projected_schema: &SchemaRef,
) -> Vec<LexOrdering> {
let mut all_orderings = vec![];
for output_ordering in &base_config.output_ordering {
let mut new_ordering = LexOrdering::default();
for PhysicalSortExpr { expr, options } in output_ordering.iter() {
if let Some(col) = expr.as_any().downcast_ref::<Column>() {
let name = col.name();
if let Some((idx, _)) = projected_schema.column_with_name(name) {
new_ordering.push(PhysicalSortExpr {
expr: Arc::new(Column::new(name, idx)),
options: *options,
});
continue;
}
}
break;
}
if new_ordering.is_empty() {
continue;
}
if base_config.file_groups.iter().any(|group| {
if group.len() <= 1 {
return false;
}
let statistics = match statistics::MinMaxStatistics::new_from_files(
&new_ordering,
projected_schema,
base_config.projection.as_deref(),
group,
) {
Ok(statistics) => statistics,
Err(e) => {
log::trace!("Error fetching statistics for file group: {e}");
return true;
}
};
!statistics.is_sorted()
}) {
debug!(
"Skipping specified output ordering {:?}. \
Some file groups couldn't be determined to be sorted: {:?}",
base_config.output_ordering[0], base_config.file_groups
);
continue;
}
all_orderings.push(new_ordering);
}
all_orderings
}
enum RangeCalculation {
Range(Option<Range<usize>>),
TerminateEarly,
}
async fn calculate_range(
file_meta: &FileMeta,
store: &Arc<dyn ObjectStore>,
terminator: Option<u8>,
) -> Result<RangeCalculation> {
let location = file_meta.location();
let file_size = file_meta.object_meta.size;
let newline = terminator.unwrap_or(b'\n');
match file_meta.range {
None => Ok(RangeCalculation::Range(None)),
Some(FileRange { start, end }) => {
let (start, end) = (start as usize, end as usize);
let start_delta = if start != 0 {
find_first_newline(store, location, start - 1, file_size, newline).await?
} else {
0
};
let end_delta = if end != file_size {
find_first_newline(store, location, end - 1, file_size, newline).await?
} else {
0
};
let range = start + start_delta..end + end_delta;
if range.start == range.end {
return Ok(RangeCalculation::TerminateEarly);
}
Ok(RangeCalculation::Range(Some(range)))
}
}
}
async fn find_first_newline(
object_store: &Arc<dyn ObjectStore>,
location: &Path,
start: usize,
end: usize,
newline: u8,
) -> Result<usize> {
let options = GetOptions {
range: Some(GetRange::Bounded(start..end)),
..Default::default()
};
let result = object_store.get_opts(location, options).await?;
let mut result_stream = result.into_stream();
let mut index = 0;
while let Some(chunk) = result_stream.next().await.transpose()? {
if let Some(position) = chunk.iter().position(|&byte| byte == newline) {
return Ok(index + position);
}
index += chunk.len();
}
Ok(index)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::physical_plan::{DefaultDisplay, VerboseDisplay};
use arrow_array::cast::AsArray;
use arrow_array::types::{Float32Type, Float64Type, UInt32Type};
use arrow_array::{
BinaryArray, BooleanArray, Float32Array, Int32Array, Int64Array, RecordBatch,
StringArray, UInt64Array,
};
use arrow_schema::{Field, Schema};
use crate::datasource::schema_adapter::{
DefaultSchemaAdapterFactory, SchemaAdapterFactory,
};
use chrono::Utc;
#[test]
fn schema_mapping_map_batch() {
let table_schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Utf8, true),
Field::new("c2", DataType::UInt32, true),
Field::new("c3", DataType::Float64, true),
]));
let adapter = DefaultSchemaAdapterFactory
.create(table_schema.clone(), table_schema.clone());
let file_schema = Schema::new(vec![
Field::new("c1", DataType::Utf8, true),
Field::new("c2", DataType::UInt64, true),
Field::new("c3", DataType::Float32, true),
]);
let (mapping, _) = adapter.map_schema(&file_schema).expect("map schema failed");
let c1 = StringArray::from(vec!["hello", "world"]);
let c2 = UInt64Array::from(vec![9_u64, 5_u64]);
let c3 = Float32Array::from(vec![2.0_f32, 7.0_f32]);
let batch = RecordBatch::try_new(
Arc::new(file_schema),
vec![Arc::new(c1), Arc::new(c2), Arc::new(c3)],
)
.unwrap();
let mapped_batch = mapping.map_batch(batch).unwrap();
assert_eq!(mapped_batch.schema(), table_schema);
assert_eq!(mapped_batch.num_columns(), 3);
assert_eq!(mapped_batch.num_rows(), 2);
let c1 = mapped_batch.column(0).as_string::<i32>();
let c2 = mapped_batch.column(1).as_primitive::<UInt32Type>();
let c3 = mapped_batch.column(2).as_primitive::<Float64Type>();
assert_eq!(c1.value(0), "hello");
assert_eq!(c1.value(1), "world");
assert_eq!(c2.value(0), 9_u32);
assert_eq!(c2.value(1), 5_u32);
assert_eq!(c3.value(0), 2.0_f64);
assert_eq!(c3.value(1), 7.0_f64);
}
#[test]
fn schema_adapter_map_schema_with_projection() {
let table_schema = Arc::new(Schema::new(vec![
Field::new("c0", DataType::Utf8, true),
Field::new("c1", DataType::Utf8, true),
Field::new("c2", DataType::Float64, true),
Field::new("c3", DataType::Int32, true),
Field::new("c4", DataType::Float32, true),
]));
let file_schema = Schema::new(vec![
Field::new("id", DataType::Int32, true),
Field::new("c1", DataType::Boolean, true),
Field::new("c2", DataType::Float32, true),
Field::new("c3", DataType::Binary, true),
Field::new("c4", DataType::Int64, true),
]);
let indices = vec![1, 2, 4];
let schema = SchemaRef::from(table_schema.project(&indices).unwrap());
let adapter = DefaultSchemaAdapterFactory.create(schema, table_schema.clone());
let (mapping, projection) = adapter.map_schema(&file_schema).unwrap();
let id = Int32Array::from(vec![Some(1), Some(2), Some(3)]);
let c1 = BooleanArray::from(vec![Some(true), Some(false), Some(true)]);
let c2 = Float32Array::from(vec![Some(2.0_f32), Some(7.0_f32), Some(3.0_f32)]);
let c3 = BinaryArray::from_opt_vec(vec![
Some(b"hallo"),
Some(b"danke"),
Some(b"super"),
]);
let c4 = Int64Array::from(vec![1, 2, 3]);
let batch = RecordBatch::try_new(
Arc::new(file_schema),
vec![
Arc::new(id),
Arc::new(c1),
Arc::new(c2),
Arc::new(c3),
Arc::new(c4),
],
)
.unwrap();
let rows_num = batch.num_rows();
let projected = batch.project(&projection).unwrap();
let mapped_batch = mapping.map_batch(projected).unwrap();
assert_eq!(
mapped_batch.schema(),
Arc::new(table_schema.project(&indices).unwrap())
);
assert_eq!(mapped_batch.num_columns(), indices.len());
assert_eq!(mapped_batch.num_rows(), rows_num);
let c1 = mapped_batch.column(0).as_string::<i32>();
let c2 = mapped_batch.column(1).as_primitive::<Float64Type>();
let c4 = mapped_batch.column(2).as_primitive::<Float32Type>();
assert_eq!(c1.value(0), "true");
assert_eq!(c1.value(1), "false");
assert_eq!(c1.value(2), "true");
assert_eq!(c2.value(0), 2.0_f64);
assert_eq!(c2.value(1), 7.0_f64);
assert_eq!(c2.value(2), 3.0_f64);
assert_eq!(c4.value(0), 1.0_f32);
assert_eq!(c4.value(1), 2.0_f32);
assert_eq!(c4.value(2), 3.0_f32);
}
#[test]
fn file_groups_display_empty() {
let expected = "{0 groups: []}";
assert_eq!(DefaultDisplay(FileGroupsDisplay(&[])).to_string(), expected);
}
#[test]
fn file_groups_display_one() {
let files = [vec![partitioned_file("foo"), partitioned_file("bar")]];
let expected = "{1 group: [[foo, bar]]}";
assert_eq!(
DefaultDisplay(FileGroupsDisplay(&files)).to_string(),
expected
);
}
#[test]
fn file_groups_display_many_default() {
let files = [
vec![partitioned_file("foo"), partitioned_file("bar")],
vec![partitioned_file("baz")],
vec![],
];
let expected = "{3 groups: [[foo, bar], [baz], []]}";
assert_eq!(
DefaultDisplay(FileGroupsDisplay(&files)).to_string(),
expected
);
}
#[test]
fn file_groups_display_many_verbose() {
let files = [
vec![partitioned_file("foo"), partitioned_file("bar")],
vec![partitioned_file("baz")],
vec![],
];
let expected = "{3 groups: [[foo, bar], [baz], []]}";
assert_eq!(
VerboseDisplay(FileGroupsDisplay(&files)).to_string(),
expected
);
}
#[test]
fn file_groups_display_too_many_default() {
let files = [
vec![partitioned_file("foo"), partitioned_file("bar")],
vec![partitioned_file("baz")],
vec![partitioned_file("qux")],
vec![partitioned_file("quux")],
vec![partitioned_file("quuux")],
vec![partitioned_file("quuuux")],
vec![],
];
let expected = "{7 groups: [[foo, bar], [baz], [qux], [quux], [quuux], ...]}";
assert_eq!(
DefaultDisplay(FileGroupsDisplay(&files)).to_string(),
expected
);
}
#[test]
fn file_groups_display_too_many_verbose() {
let files = [
vec![partitioned_file("foo"), partitioned_file("bar")],
vec![partitioned_file("baz")],
vec![partitioned_file("qux")],
vec![partitioned_file("quux")],
vec![partitioned_file("quuux")],
vec![partitioned_file("quuuux")],
vec![],
];
let expected =
"{7 groups: [[foo, bar], [baz], [qux], [quux], [quuux], [quuuux], []]}";
assert_eq!(
VerboseDisplay(FileGroupsDisplay(&files)).to_string(),
expected
);
}
#[test]
fn file_group_display_many_default() {
let files = vec![partitioned_file("foo"), partitioned_file("bar")];
let expected = "[foo, bar]";
assert_eq!(
DefaultDisplay(FileGroupDisplay(&files)).to_string(),
expected
);
}
#[test]
fn file_group_display_too_many_default() {
let files = vec![
partitioned_file("foo"),
partitioned_file("bar"),
partitioned_file("baz"),
partitioned_file("qux"),
partitioned_file("quux"),
partitioned_file("quuux"),
];
let expected = "[foo, bar, baz, qux, quux, ...]";
assert_eq!(
DefaultDisplay(FileGroupDisplay(&files)).to_string(),
expected
);
}
#[test]
fn file_group_display_too_many_verbose() {
let files = vec![
partitioned_file("foo"),
partitioned_file("bar"),
partitioned_file("baz"),
partitioned_file("qux"),
partitioned_file("quux"),
partitioned_file("quuux"),
];
let expected = "[foo, bar, baz, qux, quux, quuux]";
assert_eq!(
VerboseDisplay(FileGroupDisplay(&files)).to_string(),
expected
);
}
fn partitioned_file(path: &str) -> PartitionedFile {
let object_meta = ObjectMeta {
location: Path::parse(path).unwrap(),
last_modified: Utc::now(),
size: 42,
e_tag: None,
version: None,
};
PartitionedFile {
object_meta,
partition_values: vec![],
range: None,
statistics: None,
extensions: None,
metadata_size_hint: None,
}
}
}