use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use super::{DisplayAs, ExecutionPlanProperties, PlanProperties, Statistics};
use crate::projection::ProjectionExec;
use crate::{
DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
check_if_same_properties,
};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::PhysicalExpr;
use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus};
use crate::execution_plan::CardinalityEffect;
use crate::filter_pushdown::{
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
FilterPushdownPropagation,
};
use crate::sort_pushdown::SortOrderPushdownResult;
use datafusion_common::config::ConfigOptions;
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
use futures::ready;
use futures::stream::{Stream, StreamExt};
#[deprecated(
since = "52.0.0",
note = "We now use BatchCoalescer from arrow-rs instead of a dedicated operator"
)]
#[derive(Debug, Clone)]
pub struct CoalesceBatchesExec {
input: Arc<dyn ExecutionPlan>,
target_batch_size: usize,
fetch: Option<usize>,
metrics: ExecutionPlanMetricsSet,
cache: Arc<PlanProperties>,
}
#[expect(deprecated)]
impl CoalesceBatchesExec {
pub fn new(input: Arc<dyn ExecutionPlan>, target_batch_size: usize) -> Self {
let cache = Self::compute_properties(&input);
Self {
input,
target_batch_size,
fetch: None,
metrics: ExecutionPlanMetricsSet::new(),
cache: Arc::new(cache),
}
}
pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
self.fetch = fetch;
self
}
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
&self.input
}
pub fn target_batch_size(&self) -> usize {
self.target_batch_size
}
fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
PlanProperties::new(
input.equivalence_properties().clone(), input.output_partitioning().clone(), input.pipeline_behavior(),
input.boundedness(),
)
}
fn with_new_children_and_same_properties(
&self,
mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Self {
Self {
input: children.swap_remove(0),
metrics: ExecutionPlanMetricsSet::new(),
..Self::clone(self)
}
}
}
#[expect(deprecated)]
impl DisplayAs for CoalesceBatchesExec {
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"CoalesceBatchesExec: target_batch_size={}",
self.target_batch_size,
)?;
if let Some(fetch) = self.fetch {
write!(f, ", fetch={fetch}")?;
};
Ok(())
}
DisplayFormatType::TreeRender => {
writeln!(f, "target_batch_size={}", self.target_batch_size)?;
if let Some(fetch) = self.fetch {
write!(f, "limit={fetch}")?;
};
Ok(())
}
}
}
}
#[expect(deprecated)]
impl ExecutionPlan for CoalesceBatchesExec {
fn name(&self) -> &'static str {
"CoalesceBatchesExec"
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}
fn maintains_input_order(&self) -> Vec<bool> {
vec![true]
}
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
vec![false]
}
fn with_new_children(
self: Arc<Self>,
mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
check_if_same_properties!(self, children);
Ok(Arc::new(
CoalesceBatchesExec::new(children.swap_remove(0), self.target_batch_size)
.with_fetch(self.fetch),
))
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
Ok(Box::pin(CoalesceBatchesStream {
input: self.input.execute(partition, context)?,
coalescer: LimitedBatchCoalescer::new(
self.input.schema(),
self.target_batch_size,
self.fetch,
),
baseline_metrics: BaselineMetrics::new(&self.metrics, partition),
completed: false,
}))
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
self.input
.partition_statistics(partition)?
.with_fetch(self.fetch, 0, 1)
}
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
Some(Arc::new(CoalesceBatchesExec {
input: Arc::clone(&self.input),
target_batch_size: self.target_batch_size,
fetch: limit,
metrics: self.metrics.clone(),
cache: Arc::clone(&self.cache),
}))
}
fn fetch(&self) -> Option<usize> {
self.fetch
}
fn cardinality_effect(&self) -> CardinalityEffect {
CardinalityEffect::Equal
}
fn try_swapping_with_projection(
&self,
projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
match self.input.try_swapping_with_projection(projection)? {
Some(new_input) => Ok(Some(
Arc::new(self.clone()).with_new_children(vec![new_input])?,
)),
None => Ok(None),
}
}
fn gather_filters_for_pushdown(
&self,
_phase: FilterPushdownPhase,
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
_config: &ConfigOptions,
) -> Result<FilterDescription> {
FilterDescription::from_children(parent_filters, &self.children())
}
fn handle_child_pushdown_result(
&self,
_phase: FilterPushdownPhase,
child_pushdown_result: ChildPushdownResult,
_config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
}
fn try_pushdown_sort(
&self,
order: &[PhysicalSortExpr],
) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
self.input.try_pushdown_sort(order)?.try_map(|new_input| {
Ok(Arc::new(
CoalesceBatchesExec::new(new_input, self.target_batch_size)
.with_fetch(self.fetch),
) as Arc<dyn ExecutionPlan>)
})
}
}
struct CoalesceBatchesStream {
input: SendableRecordBatchStream,
coalescer: LimitedBatchCoalescer,
baseline_metrics: BaselineMetrics,
completed: bool,
}
impl Stream for CoalesceBatchesStream {
type Item = Result<RecordBatch>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let poll = self.poll_next_inner(cx);
self.baseline_metrics.record_poll(poll)
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.input.size_hint()
}
}
impl CoalesceBatchesStream {
fn poll_next_inner(
self: &mut Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<RecordBatch>>> {
let cloned_time = self.baseline_metrics.elapsed_compute().clone();
loop {
if let Some(batch) = self.coalescer.next_completed_batch() {
return Poll::Ready(Some(Ok(batch)));
}
if self.completed {
return Poll::Ready(None);
}
let input_batch = ready!(self.input.poll_next_unpin(cx));
let _timer = cloned_time.timer();
match input_batch {
None => {
self.completed = true;
self.coalescer.finish()?;
}
Some(Ok(batch)) => {
match self.coalescer.push_batch(batch)? {
PushBatchStatus::Continue => {
}
PushBatchStatus::LimitReached => {
self.completed = true;
self.coalescer.finish()?;
}
}
}
other => return Poll::Ready(other),
}
}
}
}
impl RecordBatchStream for CoalesceBatchesStream {
fn schema(&self) -> SchemaRef {
self.coalescer.schema()
}
}