pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;
pub mod arrow;
pub mod avro;
pub mod csv;
pub mod file_compression_type;
pub mod json;
pub mod options;
#[cfg(feature = "parquet")]
pub mod parquet;
pub mod write;
use std::any::Any;
use std::collections::{HashMap, VecDeque};
use std::fmt::{self, Debug, Display};
use std::sync::Arc;
use std::task::Poll;
use crate::arrow::datatypes::SchemaRef;
use crate::datasource::physical_plan::{FileScanConfig, FileSinkConfig};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::{ExecutionPlan, Statistics};
use arrow_array::RecordBatch;
use arrow_schema::{ArrowError, DataType, Field, FieldRef, Schema};
use datafusion_common::file_options::file_type::FileType;
use datafusion_common::{internal_err, not_impl_err, GetExt};
use datafusion_expr::Expr;
use datafusion_physical_expr::PhysicalExpr;
use async_trait::async_trait;
use bytes::{Buf, Bytes};
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use file_compression_type::FileCompressionType;
use futures::stream::BoxStream;
use futures::{ready, Stream, StreamExt};
use object_store::{ObjectMeta, ObjectStore};
pub trait FileFormatFactory: Sync + Send + GetExt + Debug {
fn create(
&self,
state: &SessionState,
format_options: &HashMap<String, String>,
) -> Result<Arc<dyn FileFormat>>;
fn default(&self) -> Arc<dyn FileFormat>;
fn as_any(&self) -> &dyn Any;
}
#[async_trait]
pub trait FileFormat: Send + Sync + Debug {
fn as_any(&self) -> &dyn Any;
fn get_ext(&self) -> String;
fn get_ext_with_compression(
&self,
_file_compression_type: &FileCompressionType,
) -> Result<String>;
async fn infer_schema(
&self,
state: &SessionState,
store: &Arc<dyn ObjectStore>,
objects: &[ObjectMeta],
) -> Result<SchemaRef>;
async fn infer_stats(
&self,
state: &SessionState,
store: &Arc<dyn ObjectStore>,
table_schema: SchemaRef,
object: &ObjectMeta,
) -> Result<Statistics>;
async fn create_physical_plan(
&self,
state: &SessionState,
conf: FileScanConfig,
filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>>;
async fn create_writer_physical_plan(
&self,
_input: Arc<dyn ExecutionPlan>,
_state: &SessionState,
_conf: FileSinkConfig,
_order_requirements: Option<LexRequirement>,
) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("Writer not implemented for this format")
}
fn supports_filters_pushdown(
&self,
_file_schema: &Schema,
_table_schema: &Schema,
_filters: &[&Expr],
) -> Result<FilePushdownSupport> {
Ok(FilePushdownSupport::NoSupport)
}
}
#[derive(Debug, PartialEq)]
pub enum FilePushdownSupport {
NoSupport,
NotSupportedForFilter,
Supported,
}
#[derive(Debug, PartialEq)]
pub enum DeserializerOutput {
RecordBatch(RecordBatch),
RequiresMoreData,
InputExhausted,
}
pub trait BatchDeserializer<T>: Send + Debug {
fn digest(&mut self, message: T) -> usize;
fn next(&mut self) -> Result<DeserializerOutput, ArrowError>;
fn finish(&mut self);
}
pub(crate) trait Decoder: Send + Debug {
fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError>;
fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError>;
fn can_flush_early(&self) -> bool;
}
impl<T: Decoder> Debug for DecoderDeserializer<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Deserializer")
.field("buffered_queue", &self.buffered_queue)
.field("finalized", &self.finalized)
.finish()
}
}
impl<T: Decoder> BatchDeserializer<Bytes> for DecoderDeserializer<T> {
fn digest(&mut self, message: Bytes) -> usize {
if message.is_empty() {
return 0;
}
let consumed = message.len();
self.buffered_queue.push_back(message);
consumed
}
fn next(&mut self) -> Result<DeserializerOutput, ArrowError> {
while let Some(buffered) = self.buffered_queue.front_mut() {
let decoded = self.decoder.decode(buffered)?;
buffered.advance(decoded);
if buffered.is_empty() {
self.buffered_queue.pop_front();
}
if decoded == 0 || self.decoder.can_flush_early() {
return match self.decoder.flush() {
Ok(Some(batch)) => Ok(DeserializerOutput::RecordBatch(batch)),
Ok(None) => continue,
Err(e) => Err(e),
};
}
}
if self.finalized {
Ok(DeserializerOutput::InputExhausted)
} else {
Ok(DeserializerOutput::RequiresMoreData)
}
}
fn finish(&mut self) {
self.finalized = true;
self.buffered_queue.push_back(Bytes::new());
}
}
pub(crate) struct DecoderDeserializer<T: Decoder> {
pub(crate) decoder: T,
pub(crate) buffered_queue: VecDeque<Bytes>,
pub(crate) finalized: bool,
}
impl<T: Decoder> DecoderDeserializer<T> {
pub(crate) fn new(decoder: T) -> Self {
DecoderDeserializer {
decoder,
buffered_queue: VecDeque::new(),
finalized: false,
}
}
}
pub(crate) fn deserialize_stream<'a>(
mut input: impl Stream<Item = Result<Bytes>> + Unpin + Send + 'a,
mut deserializer: impl BatchDeserializer<Bytes> + 'a,
) -> BoxStream<'a, Result<RecordBatch, ArrowError>> {
futures::stream::poll_fn(move |cx| loop {
match ready!(input.poll_next_unpin(cx)).transpose()? {
Some(b) => _ = deserializer.digest(b),
None => deserializer.finish(),
};
return match deserializer.next()? {
DeserializerOutput::RecordBatch(rb) => Poll::Ready(Some(Ok(rb))),
DeserializerOutput::InputExhausted => Poll::Ready(None),
DeserializerOutput::RequiresMoreData => continue,
};
})
.boxed()
}
#[derive(Debug)]
pub struct DefaultFileType {
file_format_factory: Arc<dyn FileFormatFactory>,
}
impl DefaultFileType {
pub fn new(file_format_factory: Arc<dyn FileFormatFactory>) -> Self {
Self {
file_format_factory,
}
}
pub fn as_format_factory(&self) -> &Arc<dyn FileFormatFactory> {
&self.file_format_factory
}
}
impl FileType for DefaultFileType {
fn as_any(&self) -> &dyn Any {
self
}
}
impl Display for DefaultFileType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self.file_format_factory)
}
}
impl GetExt for DefaultFileType {
fn get_ext(&self) -> String {
self.file_format_factory.get_ext()
}
}
pub fn format_as_file_type(
file_format_factory: Arc<dyn FileFormatFactory>,
) -> Arc<dyn FileType> {
Arc::new(DefaultFileType {
file_format_factory,
})
}
pub fn file_type_to_format(
file_type: &Arc<dyn FileType>,
) -> Result<Arc<dyn FileFormatFactory>> {
match file_type
.as_ref()
.as_any()
.downcast_ref::<DefaultFileType>()
{
Some(source) => Ok(Arc::clone(&source.file_format_factory)),
_ => internal_err!("FileType was not DefaultFileType"),
}
}
fn field_with_new_type(field: &FieldRef, new_type: DataType) -> FieldRef {
Arc::new(field.as_ref().clone().with_data_type(new_type))
}
pub fn transform_schema_to_view(schema: &Schema) -> Schema {
let transformed_fields: Vec<Arc<Field>> = schema
.fields
.iter()
.map(|field| match field.data_type() {
DataType::Utf8 | DataType::LargeUtf8 => {
field_with_new_type(field, DataType::Utf8View)
}
DataType::Binary | DataType::LargeBinary => {
field_with_new_type(field, DataType::BinaryView)
}
_ => Arc::clone(field),
})
.collect();
Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
}
pub(crate) fn coerce_file_schema_to_view_type(
table_schema: &Schema,
file_schema: &Schema,
) -> Option<Schema> {
let mut transform = false;
let table_fields: HashMap<_, _> = table_schema
.fields
.iter()
.map(|f| {
let dt = f.data_type();
if dt.equals_datatype(&DataType::Utf8View)
|| dt.equals_datatype(&DataType::BinaryView)
{
transform = true;
}
(f.name(), dt)
})
.collect();
if !transform {
return None;
}
let transformed_fields: Vec<Arc<Field>> = file_schema
.fields
.iter()
.map(
|field| match (table_fields.get(field.name()), field.data_type()) {
(Some(DataType::Utf8View), DataType::Utf8 | DataType::LargeUtf8) => {
field_with_new_type(field, DataType::Utf8View)
}
(
Some(DataType::BinaryView),
DataType::Binary | DataType::LargeBinary,
) => field_with_new_type(field, DataType::BinaryView),
_ => Arc::clone(field),
},
)
.collect();
Some(Schema::new_with_metadata(
transformed_fields,
file_schema.metadata.clone(),
))
}
pub fn transform_binary_to_string(schema: &Schema) -> Schema {
let transformed_fields: Vec<Arc<Field>> = schema
.fields
.iter()
.map(|field| match field.data_type() {
DataType::Binary => field_with_new_type(field, DataType::Utf8),
DataType::LargeBinary => field_with_new_type(field, DataType::LargeUtf8),
DataType::BinaryView => field_with_new_type(field, DataType::Utf8View),
_ => Arc::clone(field),
})
.collect();
Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
}
pub(crate) fn coerce_file_schema_to_string_type(
table_schema: &Schema,
file_schema: &Schema,
) -> Option<Schema> {
let mut transform = false;
let table_fields: HashMap<_, _> = table_schema
.fields
.iter()
.map(|f| (f.name(), f.data_type()))
.collect();
let transformed_fields: Vec<Arc<Field>> = file_schema
.fields
.iter()
.map(
|field| match (table_fields.get(field.name()), field.data_type()) {
(
Some(DataType::Utf8),
DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
) => {
transform = true;
field_with_new_type(field, DataType::Utf8)
}
(
Some(DataType::LargeUtf8),
DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
) => {
transform = true;
field_with_new_type(field, DataType::LargeUtf8)
}
(
Some(DataType::Utf8View),
DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
) => {
transform = true;
field_with_new_type(field, DataType::Utf8View)
}
_ => Arc::clone(field),
},
)
.collect();
if !transform {
None
} else {
Some(Schema::new_with_metadata(
transformed_fields,
file_schema.metadata.clone(),
))
}
}
#[cfg(test)]
pub(crate) mod test_util {
use std::ops::Range;
use std::sync::Mutex;
use super::*;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::test::object_store::local_unpartitioned_file;
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::StreamExt;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
use object_store::{
Attributes, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload,
PutMultipartOpts, PutOptions, PutPayload, PutResult,
};
pub async fn scan_format(
state: &SessionState,
format: &dyn FileFormat,
store_root: &str,
file_name: &str,
projection: Option<Vec<usize>>,
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let store = Arc::new(LocalFileSystem::new()) as _;
let meta = local_unpartitioned_file(format!("{store_root}/{file_name}"));
let file_schema = format
.infer_schema(state, &store, std::slice::from_ref(&meta))
.await?;
let statistics = format
.infer_stats(state, &store, file_schema.clone(), &meta)
.await?;
let file_groups = vec![vec![PartitionedFile {
object_meta: meta,
partition_values: vec![],
range: None,
statistics: None,
extensions: None,
metadata_size_hint: None,
}]];
let exec = format
.create_physical_plan(
state,
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema)
.with_file_groups(file_groups)
.with_statistics(statistics)
.with_projection(projection)
.with_limit(limit),
None,
)
.await?;
Ok(exec)
}
#[derive(Debug)]
pub struct VariableStream {
bytes_to_repeat: Bytes,
max_iterations: usize,
iterations_detected: Arc<Mutex<usize>>,
}
impl Display for VariableStream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "VariableStream")
}
}
#[async_trait]
impl ObjectStore for VariableStream {
async fn put_opts(
&self,
_location: &Path,
_payload: PutPayload,
_opts: PutOptions,
) -> object_store::Result<PutResult> {
unimplemented!()
}
async fn put_multipart_opts(
&self,
_location: &Path,
_opts: PutMultipartOpts,
) -> object_store::Result<Box<dyn MultipartUpload>> {
unimplemented!()
}
async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
let bytes = self.bytes_to_repeat.clone();
let range = 0..bytes.len() * self.max_iterations;
let arc = self.iterations_detected.clone();
let stream = futures::stream::repeat_with(move || {
let arc_inner = arc.clone();
*arc_inner.lock().unwrap() += 1;
Ok(bytes.clone())
})
.take(self.max_iterations)
.boxed();
Ok(GetResult {
payload: GetResultPayload::Stream(stream),
meta: ObjectMeta {
location: location.clone(),
last_modified: Default::default(),
size: range.end,
e_tag: None,
version: None,
},
range: Default::default(),
attributes: Attributes::default(),
})
}
async fn get_opts(
&self,
_location: &Path,
_opts: GetOptions,
) -> object_store::Result<GetResult> {
unimplemented!()
}
async fn get_ranges(
&self,
_location: &Path,
_ranges: &[Range<usize>],
) -> object_store::Result<Vec<Bytes>> {
unimplemented!()
}
async fn head(&self, _location: &Path) -> object_store::Result<ObjectMeta> {
unimplemented!()
}
async fn delete(&self, _location: &Path) -> object_store::Result<()> {
unimplemented!()
}
fn list(
&self,
_prefix: Option<&Path>,
) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
unimplemented!()
}
async fn list_with_delimiter(
&self,
_prefix: Option<&Path>,
) -> object_store::Result<ListResult> {
unimplemented!()
}
async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
unimplemented!()
}
async fn copy_if_not_exists(
&self,
_from: &Path,
_to: &Path,
) -> object_store::Result<()> {
unimplemented!()
}
}
impl VariableStream {
pub fn new(bytes_to_repeat: Bytes, max_iterations: usize) -> Self {
Self {
bytes_to_repeat,
max_iterations,
iterations_detected: Arc::new(Mutex::new(0)),
}
}
pub fn get_iterations_detected(&self) -> usize {
*self.iterations_detected.lock().unwrap()
}
}
}