use std::any::Any;
use std::sync::Arc;
use super::{DisplayAs, DisplayFormatType};
use crate::display::{OutputOrderingDisplay, ProjectSchemaDisplay};
use crate::stream::RecordBatchStreamAdapter;
use crate::{ExecutionPlan, Partitioning, SendableRecordBatchStream};
use arrow::datatypes::SchemaRef;
use arrow_schema::Schema;
use datafusion_common::{internal_err, plan_err, DataFusionError, Result};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalSortExpr};
use async_trait::async_trait;
use futures::stream::StreamExt;
use log::debug;
pub trait PartitionStream: Send + Sync {
fn schema(&self) -> &SchemaRef;
fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream;
}
pub struct StreamingTableExec {
partitions: Vec<Arc<dyn PartitionStream>>,
projection: Option<Arc<[usize]>>,
projected_schema: SchemaRef,
projected_output_ordering: Vec<LexOrdering>,
infinite: bool,
}
impl StreamingTableExec {
pub fn try_new(
schema: SchemaRef,
partitions: Vec<Arc<dyn PartitionStream>>,
projection: Option<&Vec<usize>>,
projected_output_ordering: impl IntoIterator<Item = LexOrdering>,
infinite: bool,
) -> Result<Self> {
for x in partitions.iter() {
let partition_schema = x.schema();
if !schema.eq(partition_schema) {
debug!(
"Target schema does not match with partition schema. \
Target_schema: {schema:?}. Partiton Schema: {partition_schema:?}"
);
return plan_err!("Mismatch between schema and batches");
}
}
let projected_schema = match projection {
Some(p) => Arc::new(schema.project(p)?),
None => schema,
};
Ok(Self {
partitions,
projected_schema,
projection: projection.cloned().map(Into::into),
projected_output_ordering: projected_output_ordering.into_iter().collect(),
infinite,
})
}
pub fn partitions(&self) -> &Vec<Arc<dyn PartitionStream>> {
&self.partitions
}
pub fn partition_schema(&self) -> &SchemaRef {
self.partitions[0].schema()
}
pub fn projection(&self) -> &Option<Arc<[usize]>> {
&self.projection
}
pub fn projected_schema(&self) -> &Schema {
&self.projected_schema
}
pub fn projected_output_ordering(&self) -> impl IntoIterator<Item = LexOrdering> {
self.projected_output_ordering.clone()
}
pub fn is_infinite(&self) -> bool {
self.infinite
}
}
impl std::fmt::Debug for StreamingTableExec {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LazyMemTableExec").finish_non_exhaustive()
}
}
impl DisplayAs for StreamingTableExec {
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"StreamingTableExec: partition_sizes={:?}",
self.partitions.len(),
)?;
if !self.projected_schema.fields().is_empty() {
write!(
f,
", projection={}",
ProjectSchemaDisplay(&self.projected_schema)
)?;
}
if self.infinite {
write!(f, ", infinite_source=true")?;
}
self.projected_output_ordering
.first()
.map_or(Ok(()), |ordering| {
if !ordering.is_empty() {
write!(
f,
", output_ordering={}",
OutputOrderingDisplay(ordering)
)?;
}
Ok(())
})
}
}
}
}
#[async_trait]
impl ExecutionPlan for StreamingTableExec {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.projected_schema.clone()
}
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(self.partitions.len())
}
fn unbounded_output(&self, _children: &[bool]) -> Result<bool> {
Ok(self.infinite)
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
self.projected_output_ordering
.first()
.map(|ordering| ordering.as_slice())
}
fn equivalence_properties(&self) -> EquivalenceProperties {
EquivalenceProperties::new_with_orderings(
self.schema(),
&self.projected_output_ordering,
)
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
if children.is_empty() {
Ok(self)
} else {
internal_err!("Children cannot be replaced in {self:?}")
}
}
fn execute(
&self,
partition: usize,
ctx: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let stream = self.partitions[partition].execute(ctx);
Ok(match self.projection.clone() {
Some(projection) => Box::pin(RecordBatchStreamAdapter::new(
self.projected_schema.clone(),
stream.map(move |x| {
x.and_then(|b| b.project(projection.as_ref()).map_err(Into::into))
}),
)),
None => stream,
})
}
}