use crate::datasource::physical_plan::{
FileMeta, FileOpenFuture, FileOpener, FileScanConfig,
};
use crate::error::Result;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
ordering_equivalence_properties_helper, DisplayAs, DisplayFormatType, ExecutionPlan,
Partitioning, SendableRecordBatchStream,
};
use arrow_schema::SchemaRef;
use datafusion_common::Statistics;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{
LexOrdering, OrderingEquivalenceProperties, PhysicalSortExpr,
};
use futures::StreamExt;
use object_store::{GetResult, ObjectStore};
use std::any::Any;
use std::sync::Arc;
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct ArrowExec {
base_config: FileScanConfig,
projected_statistics: Statistics,
projected_schema: SchemaRef,
projected_output_ordering: Vec<LexOrdering>,
metrics: ExecutionPlanMetricsSet,
}
impl ArrowExec {
pub fn new(base_config: FileScanConfig) -> Self {
let (projected_schema, projected_statistics, projected_output_ordering) =
base_config.project();
Self {
base_config,
projected_schema,
projected_statistics,
projected_output_ordering,
metrics: ExecutionPlanMetricsSet::new(),
}
}
pub fn base_config(&self) -> &FileScanConfig {
&self.base_config
}
}
impl ExecutionPlan for ArrowExec {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.projected_schema.clone()
}
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
}
fn unbounded_output(&self, _: &[bool]) -> Result<bool> {
Ok(self.base_config().infinite_source)
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
self.projected_output_ordering
.first()
.map(|ordering| ordering.as_slice())
}
fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {
ordering_equivalence_properties_helper(
self.schema(),
&self.projected_output_ordering,
)
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
Vec::new()
}
fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
use super::file_stream::FileStream;
let object_store = context
.runtime_env()
.object_store(&self.base_config.object_store_url)?;
let opener = ArrowOpener {
object_store,
projection: self.base_config.projection.clone(),
};
let stream =
FileStream::new(&self.base_config, partition, opener, &self.metrics)?;
Ok(Box::pin(stream))
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
write!(f, "ArrowExec: ")?;
self.base_config.fmt_as(t, f)
}
fn statistics(&self) -> Statistics {
self.projected_statistics.clone()
}
}
pub struct ArrowOpener {
pub object_store: Arc<dyn ObjectStore>,
pub projection: Option<Vec<usize>>,
}
impl FileOpener for ArrowOpener {
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
let object_store = self.object_store.clone();
let projection = self.projection.clone();
Ok(Box::pin(async move {
match object_store.get(file_meta.location()).await? {
GetResult::File(file, _) => {
let arrow_reader =
arrow::ipc::reader::FileReader::try_new(file, projection)?;
Ok(futures::stream::iter(arrow_reader).boxed())
}
r @ GetResult::Stream(_) => {
let bytes = r.bytes().await?;
let cursor = std::io::Cursor::new(bytes);
let arrow_reader =
arrow::ipc::reader::FileReader::try_new(cursor, projection)?;
Ok(futures::stream::iter(arrow_reader).boxed())
}
}
}))
}
}