pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;
pub mod arrow;
pub mod avro;
pub mod csv;
pub mod json;
pub mod options;
pub mod parquet;
pub mod write;
use std::any::Any;
use std::fmt;
use std::sync::Arc;
use crate::arrow::datatypes::SchemaRef;
use crate::datasource::physical_plan::{FileScanConfig, FileSinkConfig};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::{ExecutionPlan, Statistics};
use datafusion_common::{not_impl_err, DataFusionError, FileType};
use datafusion_physical_expr::PhysicalExpr;
use async_trait::async_trait;
use object_store::{ObjectMeta, ObjectStore};
#[async_trait]
pub trait FileFormat: Send + Sync + fmt::Debug {
fn as_any(&self) -> &dyn Any;
async fn infer_schema(
&self,
state: &SessionState,
store: &Arc<dyn ObjectStore>,
objects: &[ObjectMeta],
) -> Result<SchemaRef>;
async fn infer_stats(
&self,
state: &SessionState,
store: &Arc<dyn ObjectStore>,
table_schema: SchemaRef,
object: &ObjectMeta,
) -> Result<Statistics>;
async fn create_physical_plan(
&self,
state: &SessionState,
conf: FileScanConfig,
filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>>;
async fn create_writer_physical_plan(
&self,
_input: Arc<dyn ExecutionPlan>,
_state: &SessionState,
_conf: FileSinkConfig,
) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("Writer not implemented for this format")
}
fn file_type(&self) -> FileType;
}
#[cfg(test)]
pub(crate) mod test_util {
use std::ops::Range;
use std::sync::Mutex;
use super::*;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::test::object_store::local_unpartitioned_file;
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::StreamExt;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
use object_store::{
GetOptions, GetResult, GetResultPayload, ListResult, MultipartId,
};
use tokio::io::AsyncWrite;
pub async fn scan_format(
state: &SessionState,
format: &dyn FileFormat,
store_root: &str,
file_name: &str,
projection: Option<Vec<usize>>,
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let store = Arc::new(LocalFileSystem::new()) as _;
let meta = local_unpartitioned_file(format!("{store_root}/{file_name}"));
let file_schema = format.infer_schema(state, &store, &[meta.clone()]).await?;
let statistics = format
.infer_stats(state, &store, file_schema.clone(), &meta)
.await?;
let file_groups = vec![vec![PartitionedFile {
object_meta: meta,
partition_values: vec![],
range: None,
extensions: None,
}]];
let exec = format
.create_physical_plan(
state,
FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_schema,
file_groups,
statistics,
projection,
limit,
table_partition_cols: vec![],
output_ordering: vec![],
infinite_source: false,
},
None,
)
.await?;
Ok(exec)
}
#[derive(Debug)]
pub struct VariableStream {
bytes_to_repeat: Bytes,
max_iterations: usize,
iterations_detected: Arc<Mutex<usize>>,
}
impl std::fmt::Display for VariableStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "VariableStream")
}
}
#[async_trait]
impl ObjectStore for VariableStream {
async fn put(&self, _location: &Path, _bytes: Bytes) -> object_store::Result<()> {
unimplemented!()
}
async fn put_multipart(
&self,
_location: &Path,
) -> object_store::Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)>
{
unimplemented!()
}
async fn abort_multipart(
&self,
_location: &Path,
_multipart_id: &MultipartId,
) -> object_store::Result<()> {
unimplemented!()
}
async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
let bytes = self.bytes_to_repeat.clone();
let range = 0..bytes.len() * self.max_iterations;
let arc = self.iterations_detected.clone();
let stream = futures::stream::repeat_with(move || {
let arc_inner = arc.clone();
*arc_inner.lock().unwrap() += 1;
Ok(bytes.clone())
})
.take(self.max_iterations)
.boxed();
Ok(GetResult {
payload: GetResultPayload::Stream(stream),
meta: ObjectMeta {
location: location.clone(),
last_modified: Default::default(),
size: range.end,
e_tag: None,
},
range: Default::default(),
})
}
async fn get_opts(
&self,
_location: &Path,
_opts: GetOptions,
) -> object_store::Result<GetResult> {
unimplemented!()
}
async fn get_ranges(
&self,
_location: &Path,
_ranges: &[Range<usize>],
) -> object_store::Result<Vec<Bytes>> {
unimplemented!()
}
async fn head(&self, _location: &Path) -> object_store::Result<ObjectMeta> {
unimplemented!()
}
async fn delete(&self, _location: &Path) -> object_store::Result<()> {
unimplemented!()
}
async fn list(
&self,
_prefix: Option<&Path>,
) -> object_store::Result<BoxStream<'_, object_store::Result<ObjectMeta>>>
{
unimplemented!()
}
async fn list_with_delimiter(
&self,
_prefix: Option<&Path>,
) -> object_store::Result<ListResult> {
unimplemented!()
}
async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
unimplemented!()
}
async fn copy_if_not_exists(
&self,
_from: &Path,
_to: &Path,
) -> object_store::Result<()> {
unimplemented!()
}
}
impl VariableStream {
pub fn new(bytes_to_repeat: Bytes, max_iterations: usize) -> Self {
Self {
bytes_to_repeat,
max_iterations,
iterations_detected: Arc::new(Mutex::new(0)),
}
}
pub fn get_iterations_detected(&self) -> usize {
*self.iterations_detected.lock().unwrap()
}
}
}