use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use crate::batch_coalescer::coalescer::{CoalescerOptions, CoalescerStatus, SizedBatchCoalescer};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion::common::{Result, Statistics};
use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_plan::execution_plan::CardinalityEffect;
use datafusion::physical_plan::filter_pushdown::{
ChildPushdownResult, FilterDescription, FilterPushdownPropagation,
};
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties as _, PlanProperties,
};
use futures::ready;
use futures::stream::{Stream, StreamExt as _};
#[derive(Debug, Clone)]
pub struct SizedCoalesceBatchesExec {
input: Arc<dyn ExecutionPlan>,
coalescer_options: CoalescerOptions,
metrics: ExecutionPlanMetricsSet,
cache: PlanProperties,
}
impl SizedCoalesceBatchesExec {
pub fn new(input: Arc<dyn ExecutionPlan>, coalescer_options: CoalescerOptions) -> Self {
let cache = Self::compute_properties(&input);
Self {
input,
coalescer_options,
metrics: ExecutionPlanMetricsSet::new(),
cache,
}
}
pub fn with_max_rows(mut self, max_rows: Option<usize>) -> Self {
self.coalescer_options.max_rows = max_rows;
self
}
fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
PlanProperties::new(
input.equivalence_properties().clone(), input.output_partitioning().clone(), input.pipeline_behavior(),
input.boundedness(),
)
}
}
impl DisplayAs for SizedCoalesceBatchesExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"SizedCoalesceBatchesExec: target_batch_bytes={} target_batch_rows={}",
self.coalescer_options.target_batch_bytes,
self.coalescer_options.target_batch_rows,
)?;
if let Some(max_rows) = self.coalescer_options.max_rows {
write!(f, ", max_rows={max_rows}")?;
}
Ok(())
}
DisplayFormatType::TreeRender => {
writeln!(
f,
"target_batch_bytes={} target_batch_rows={}",
self.coalescer_options.target_batch_bytes,
self.coalescer_options.target_batch_rows
)?;
if let Some(max_rows) = self.coalescer_options.max_rows {
write!(f, "limit={max_rows}")?;
}
Ok(())
}
}
}
}
impl ExecutionPlan for SizedCoalesceBatchesExec {
fn name(&self) -> &'static str {
"SizedCoalesceBatchesExec"
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &PlanProperties {
&self.cache
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}
fn maintains_input_order(&self) -> Vec<bool> {
vec![true]
}
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
vec![false]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(
Self::new(Arc::clone(&children[0]), self.coalescer_options.clone())
.with_max_rows(self.coalescer_options.max_rows),
))
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
Ok(Box::pin(SizedCoalesceBatchesStream {
input: self.input.execute(partition, context)?,
coalescer: SizedBatchCoalescer::new(
self.input.schema(),
self.coalescer_options.clone(),
),
baseline_metrics: BaselineMetrics::new(&self.metrics, partition),
inner_state: CoalesceBatchesStreamState::Pull,
}))
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
fn statistics(&self) -> Result<Statistics> {
self.partition_statistics(None)
}
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
self.input.partition_statistics(partition)?.with_fetch(
self.coalescer_options.max_rows,
0,
1,
)
}
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
Some(Arc::new(self.clone().with_max_rows(limit)))
}
fn fetch(&self) -> Option<usize> {
self.coalescer_options.max_rows
}
fn cardinality_effect(&self) -> CardinalityEffect {
CardinalityEffect::Equal
}
fn gather_filters_for_pushdown(
&self,
_phase: datafusion::physical_plan::filter_pushdown::FilterPushdownPhase,
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
_config: &datafusion::config::ConfigOptions,
) -> Result<FilterDescription> {
FilterDescription::from_children(parent_filters, &self.children())
}
fn handle_child_pushdown_result(
&self,
_phase: datafusion::physical_plan::filter_pushdown::FilterPushdownPhase,
child_pushdown_result: ChildPushdownResult,
_config: &datafusion::config::ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
}
}
struct SizedCoalesceBatchesStream {
input: SendableRecordBatchStream,
coalescer: SizedBatchCoalescer,
baseline_metrics: BaselineMetrics,
inner_state: CoalesceBatchesStreamState,
}
impl Stream for SizedCoalesceBatchesStream {
type Item = Result<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let poll = self.poll_next_inner(cx);
self.baseline_metrics.record_poll(poll)
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.input.size_hint()
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
enum CoalesceBatchesStreamState {
Pull,
ReturnBuffer,
Exhausted,
}
impl SizedCoalesceBatchesStream {
fn poll_next_inner(
self: &mut Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<RecordBatch>>> {
let cloned_time = self.baseline_metrics.elapsed_compute().clone();
loop {
match &self.inner_state {
CoalesceBatchesStreamState::Pull => {
let input_batch = ready!(self.input.poll_next_unpin(cx));
let _timer = cloned_time.timer();
match input_batch {
Some(Ok(batch)) => match self.coalescer.push_batch(&batch) {
CoalescerStatus::Continue => {}
CoalescerStatus::BatchFull => {
self.inner_state = CoalesceBatchesStreamState::ReturnBuffer;
}
CoalescerStatus::EndReached => {
self.inner_state = CoalesceBatchesStreamState::Exhausted;
}
},
None => {
self.inner_state = CoalesceBatchesStreamState::Exhausted;
}
other => return Poll::Ready(other),
}
}
CoalesceBatchesStreamState::ReturnBuffer => {
let _timer = cloned_time.timer();
let batch = self.coalescer.finish_batch()?;
self.inner_state = CoalesceBatchesStreamState::Pull;
return Poll::Ready(Some(Ok(batch)));
}
CoalesceBatchesStreamState::Exhausted => {
return if self.coalescer.is_empty() {
Poll::Ready(None)
} else {
let _timer = cloned_time.timer();
let batch = self.coalescer.finish_batch()?;
Poll::Ready(Some(Ok(batch)))
};
}
}
}
}
}
impl RecordBatchStream for SizedCoalesceBatchesStream {
fn schema(&self) -> SchemaRef {
self.coalescer.schema()
}
}