pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;
pub mod avro;
pub mod csv;
pub mod json;
pub mod parquet;
use std::any::Any;
use std::fmt;
use std::sync::Arc;
use crate::arrow::datatypes::SchemaRef;
use crate::error::Result;
use crate::logical_plan::Expr;
use crate::physical_plan::file_format::FileScanConfig;
use crate::physical_plan::{ExecutionPlan, Statistics};
use async_trait::async_trait;
use object_store::{ObjectMeta, ObjectStore};
#[async_trait]
pub trait FileFormat: Send + Sync + fmt::Debug {
fn as_any(&self) -> &dyn Any;
async fn infer_schema(
&self,
store: &Arc<dyn ObjectStore>,
objects: &[ObjectMeta],
) -> Result<SchemaRef>;
async fn infer_stats(
&self,
store: &Arc<dyn ObjectStore>,
table_schema: SchemaRef,
object: &ObjectMeta,
) -> Result<Statistics>;
async fn create_physical_plan(
&self,
conf: FileScanConfig,
filters: &[Expr],
) -> Result<Arc<dyn ExecutionPlan>>;
}
#[cfg(test)]
pub(crate) mod test_util {
use super::*;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::test::object_store::local_unpartitioned_file;
use object_store::local::LocalFileSystem;
pub async fn scan_format(
format: &dyn FileFormat,
store_root: &str,
file_name: &str,
projection: Option<Vec<usize>>,
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let store = Arc::new(LocalFileSystem::new()) as _;
let meta = local_unpartitioned_file(format!("{}/{}", store_root, file_name));
let file_schema = format.infer_schema(&store, &[meta.clone()]).await?;
let statistics = format
.infer_stats(&store, file_schema.clone(), &meta)
.await?;
let file_groups = vec![vec![PartitionedFile {
object_meta: meta,
partition_values: vec![],
range: None,
extensions: None,
}]];
let exec = format
.create_physical_plan(
FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_schema,
file_groups,
statistics,
projection,
limit,
table_partition_cols: vec![],
},
&[],
)
.await?;
Ok(exec)
}
}