use std::any::Any;
use std::sync::Arc;
use crate::avro_to_arrow::Reader as AvroReader;
use datafusion_common::error::Result;
use datafusion_datasource::TableSchema;
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_datasource::file_stream::FileOpener;
use datafusion_datasource::projection::{ProjectionOpener, SplitProjection};
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::projection::ProjectionExprs;
use object_store::ObjectStore;
#[derive(Clone)]
pub struct AvroSource {
table_schema: TableSchema,
batch_size: Option<usize>,
projection: SplitProjection,
metrics: ExecutionPlanMetricsSet,
}
impl AvroSource {
pub fn new(table_schema: impl Into<TableSchema>) -> Self {
let table_schema = table_schema.into();
Self {
projection: SplitProjection::unprojected(&table_schema),
table_schema,
batch_size: None,
metrics: ExecutionPlanMetricsSet::new(),
}
}
fn open<R: std::io::Read>(&self, reader: R) -> Result<AvroReader<'static, R>> {
let file_schema = self.table_schema.file_schema();
let projection = Some(
self.projection
.file_indices
.iter()
.map(|&idx| file_schema.field(idx).name().clone())
.collect::<Vec<_>>(),
);
AvroReader::try_new(
reader,
&Arc::clone(self.table_schema.file_schema()),
self.batch_size.expect("Batch size must set before open"),
projection.as_ref(),
)
}
}
impl FileSource for AvroSource {
fn create_file_opener(
&self,
object_store: Arc<dyn ObjectStore>,
_base_config: &FileScanConfig,
_partition: usize,
) -> Result<Arc<dyn FileOpener>> {
let mut opener = Arc::new(private::AvroOpener {
config: Arc::new(self.clone()),
object_store,
}) as Arc<dyn FileOpener>;
opener = ProjectionOpener::try_new(
self.projection.clone(),
Arc::clone(&opener),
self.table_schema.file_schema(),
)?;
Ok(opener)
}
fn as_any(&self) -> &dyn Any {
self
}
fn table_schema(&self) -> &TableSchema {
&self.table_schema
}
fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
let mut conf = self.clone();
conf.batch_size = Some(batch_size);
Arc::new(conf)
}
fn try_pushdown_projection(
&self,
projection: &ProjectionExprs,
) -> Result<Option<Arc<dyn FileSource>>> {
let mut source = self.clone();
let new_projection = self.projection.source.try_merge(projection)?;
let split_projection =
SplitProjection::new(self.table_schema.file_schema(), &new_projection);
source.projection = split_projection;
Ok(Some(Arc::new(source)))
}
fn projection(&self) -> Option<&ProjectionExprs> {
Some(&self.projection.source)
}
fn metrics(&self) -> &ExecutionPlanMetricsSet {
&self.metrics
}
fn file_type(&self) -> &str {
"avro"
}
fn repartitioned(
&self,
_target_partitions: usize,
_repartition_file_min_size: usize,
_output_ordering: Option<LexOrdering>,
_config: &FileScanConfig,
) -> Result<Option<FileScanConfig>> {
Ok(None)
}
}
mod private {
use super::*;
use bytes::Buf;
use datafusion_datasource::{PartitionedFile, file_stream::FileOpenFuture};
use futures::StreamExt;
use object_store::{GetResultPayload, ObjectStore, ObjectStoreExt};
pub struct AvroOpener {
pub config: Arc<AvroSource>,
pub object_store: Arc<dyn ObjectStore>,
}
impl FileOpener for AvroOpener {
fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
let config = Arc::clone(&self.config);
let object_store = Arc::clone(&self.object_store);
Ok(Box::pin(async move {
let r = object_store
.get(&partitioned_file.object_meta.location)
.await?;
match r.payload {
GetResultPayload::File(file, _) => {
let reader = config.open(file)?;
Ok(futures::stream::iter(reader)
.map(|r| r.map_err(Into::into))
.boxed())
}
GetResultPayload::Stream(_) => {
let bytes = r.bytes().await?;
let reader = config.open(bytes.reader())?;
Ok(futures::stream::iter(reader)
.map(|r| r.map_err(Into::into))
.boxed())
}
}
}))
}
}
}