use crate::datasource::file_format::FileFormat;
use crate::datasource::physical_plan::{ArrowExec, FileScanConfig};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::ExecutionPlan;
use arrow::ipc::reader::FileReader;
use arrow_schema::{Schema, SchemaRef};
use async_trait::async_trait;
use datafusion_common::{FileType, Statistics};
use datafusion_physical_expr::PhysicalExpr;
use object_store::{GetResultPayload, ObjectMeta, ObjectStore};
use std::any::Any;
use std::io::{Read, Seek};
use std::sync::Arc;
#[derive(Default, Debug)]
pub struct ArrowFormat;
#[async_trait]
impl FileFormat for ArrowFormat {
fn as_any(&self) -> &dyn Any {
self
}
async fn infer_schema(
&self,
_state: &SessionState,
store: &Arc<dyn ObjectStore>,
objects: &[ObjectMeta],
) -> Result<SchemaRef> {
let mut schemas = vec![];
for object in objects {
let r = store.as_ref().get(&object.location).await?;
let schema = match r.payload {
GetResultPayload::File(mut file, _) => {
read_arrow_schema_from_reader(&mut file)?
}
GetResultPayload::Stream(_) => {
let data = r.bytes().await?;
let mut cursor = std::io::Cursor::new(&data);
read_arrow_schema_from_reader(&mut cursor)?
}
};
schemas.push(schema.as_ref().clone());
}
let merged_schema = Schema::try_merge(schemas)?;
Ok(Arc::new(merged_schema))
}
async fn infer_stats(
&self,
_state: &SessionState,
_store: &Arc<dyn ObjectStore>,
_table_schema: SchemaRef,
_object: &ObjectMeta,
) -> Result<Statistics> {
Ok(Statistics::default())
}
async fn create_physical_plan(
&self,
_state: &SessionState,
conf: FileScanConfig,
_filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
let exec = ArrowExec::new(conf);
Ok(Arc::new(exec))
}
fn file_type(&self) -> FileType {
FileType::ARROW
}
}
fn read_arrow_schema_from_reader<R: Read + Seek>(reader: R) -> Result<SchemaRef> {
let reader = FileReader::try_new(reader, None)?;
Ok(reader.schema())
}