pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;
pub mod arrow;
pub mod avro;
pub mod csv;
pub mod file_compression_type;
pub mod json;
pub mod options;
#[cfg(feature = "parquet")]
pub mod parquet;
pub mod write;
use std::any::Any;
use std::collections::HashMap;
use std::fmt::{self, Display};
use std::sync::Arc;
use crate::arrow::datatypes::SchemaRef;
use crate::datasource::physical_plan::{FileScanConfig, FileSinkConfig};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::{ExecutionPlan, Statistics};
use arrow_schema::{DataType, Field, Schema};
use datafusion_common::file_options::file_type::FileType;
use datafusion_common::{internal_err, not_impl_err, GetExt};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use async_trait::async_trait;
use file_compression_type::FileCompressionType;
use object_store::{ObjectMeta, ObjectStore};
use std::fmt::Debug;
pub trait FileFormatFactory: Sync + Send + GetExt + Debug {
fn create(
&self,
state: &SessionState,
format_options: &HashMap<String, String>,
) -> Result<Arc<dyn FileFormat>>;
fn default(&self) -> Arc<dyn FileFormat>;
fn as_any(&self) -> &dyn Any;
}
#[async_trait]
pub trait FileFormat: Send + Sync + fmt::Debug {
fn as_any(&self) -> &dyn Any;
fn get_ext(&self) -> String;
fn get_ext_with_compression(
&self,
_file_compression_type: &FileCompressionType,
) -> Result<String>;
async fn infer_schema(
&self,
state: &SessionState,
store: &Arc<dyn ObjectStore>,
objects: &[ObjectMeta],
) -> Result<SchemaRef>;
async fn infer_stats(
&self,
state: &SessionState,
store: &Arc<dyn ObjectStore>,
table_schema: SchemaRef,
object: &ObjectMeta,
) -> Result<Statistics>;
async fn create_physical_plan(
&self,
state: &SessionState,
conf: FileScanConfig,
filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>>;
async fn create_writer_physical_plan(
&self,
_input: Arc<dyn ExecutionPlan>,
_state: &SessionState,
_conf: FileSinkConfig,
_order_requirements: Option<Vec<PhysicalSortRequirement>>,
) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("Writer not implemented for this format")
}
}
#[derive(Debug)]
pub struct DefaultFileType {
file_format_factory: Arc<dyn FileFormatFactory>,
}
impl DefaultFileType {
pub fn new(file_format_factory: Arc<dyn FileFormatFactory>) -> Self {
Self {
file_format_factory,
}
}
pub fn as_format_factory(&self) -> &Arc<dyn FileFormatFactory> {
&self.file_format_factory
}
}
impl FileType for DefaultFileType {
fn as_any(&self) -> &dyn Any {
self
}
}
impl Display for DefaultFileType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self.file_format_factory)
}
}
impl GetExt for DefaultFileType {
fn get_ext(&self) -> String {
self.file_format_factory.get_ext()
}
}
pub fn format_as_file_type(
file_format_factory: Arc<dyn FileFormatFactory>,
) -> Arc<dyn FileType> {
Arc::new(DefaultFileType {
file_format_factory,
})
}
pub fn file_type_to_format(
file_type: &Arc<dyn FileType>,
) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
match file_type
.as_ref()
.as_any()
.downcast_ref::<DefaultFileType>()
{
Some(source) => Ok(source.file_format_factory.clone()),
_ => internal_err!("FileType was not DefaultFileType"),
}
}
pub fn transform_schema_to_view(schema: &Schema) -> Schema {
let transformed_fields: Vec<Arc<Field>> = schema
.fields
.iter()
.map(|field| match field.data_type() {
DataType::Utf8 | DataType::LargeUtf8 => Arc::new(Field::new(
field.name(),
DataType::Utf8View,
field.is_nullable(),
)),
DataType::Binary | DataType::LargeBinary => Arc::new(Field::new(
field.name(),
DataType::BinaryView,
field.is_nullable(),
)),
_ => field.clone(),
})
.collect();
Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
}
#[cfg(test)]
pub(crate) mod test_util {
use std::ops::Range;
use std::sync::Mutex;
use super::*;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::test::object_store::local_unpartitioned_file;
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::StreamExt;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
use object_store::{
Attributes, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload,
PutMultipartOpts, PutOptions, PutPayload, PutResult,
};
pub async fn scan_format(
state: &SessionState,
format: &dyn FileFormat,
store_root: &str,
file_name: &str,
projection: Option<Vec<usize>>,
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let store = Arc::new(LocalFileSystem::new()) as _;
let meta = local_unpartitioned_file(format!("{store_root}/{file_name}"));
let file_schema = format.infer_schema(state, &store, &[meta.clone()]).await?;
let statistics = format
.infer_stats(state, &store, file_schema.clone(), &meta)
.await?;
let file_groups = vec![vec![PartitionedFile {
object_meta: meta,
partition_values: vec![],
range: None,
statistics: None,
extensions: None,
}]];
let exec = format
.create_physical_plan(
state,
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema)
.with_file_groups(file_groups)
.with_statistics(statistics)
.with_projection(projection)
.with_limit(limit),
None,
)
.await?;
Ok(exec)
}
#[derive(Debug)]
pub struct VariableStream {
bytes_to_repeat: Bytes,
max_iterations: usize,
iterations_detected: Arc<Mutex<usize>>,
}
impl std::fmt::Display for VariableStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "VariableStream")
}
}
#[async_trait]
impl ObjectStore for VariableStream {
async fn put_opts(
&self,
_location: &Path,
_payload: PutPayload,
_opts: PutOptions,
) -> object_store::Result<PutResult> {
unimplemented!()
}
async fn put_multipart_opts(
&self,
_location: &Path,
_opts: PutMultipartOpts,
) -> object_store::Result<Box<dyn MultipartUpload>> {
unimplemented!()
}
async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
let bytes = self.bytes_to_repeat.clone();
let range = 0..bytes.len() * self.max_iterations;
let arc = self.iterations_detected.clone();
let stream = futures::stream::repeat_with(move || {
let arc_inner = arc.clone();
*arc_inner.lock().unwrap() += 1;
Ok(bytes.clone())
})
.take(self.max_iterations)
.boxed();
Ok(GetResult {
payload: GetResultPayload::Stream(stream),
meta: ObjectMeta {
location: location.clone(),
last_modified: Default::default(),
size: range.end,
e_tag: None,
version: None,
},
range: Default::default(),
attributes: Attributes::default(),
})
}
async fn get_opts(
&self,
_location: &Path,
_opts: GetOptions,
) -> object_store::Result<GetResult> {
unimplemented!()
}
async fn get_ranges(
&self,
_location: &Path,
_ranges: &[Range<usize>],
) -> object_store::Result<Vec<Bytes>> {
unimplemented!()
}
async fn head(&self, _location: &Path) -> object_store::Result<ObjectMeta> {
unimplemented!()
}
async fn delete(&self, _location: &Path) -> object_store::Result<()> {
unimplemented!()
}
fn list(
&self,
_prefix: Option<&Path>,
) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
unimplemented!()
}
async fn list_with_delimiter(
&self,
_prefix: Option<&Path>,
) -> object_store::Result<ListResult> {
unimplemented!()
}
async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
unimplemented!()
}
async fn copy_if_not_exists(
&self,
_from: &Path,
_to: &Path,
) -> object_store::Result<()> {
unimplemented!()
}
}
impl VariableStream {
pub fn new(bytes_to_repeat: Bytes, max_iterations: usize) -> Self {
Self {
bytes_to_repeat,
max_iterations,
iterations_detected: Arc::new(Mutex::new(0)),
}
}
pub fn get_iterations_detected(&self) -> usize {
*self.iterations_detected.lock().unwrap()
}
}
}