datafusion 17.0.0

DataFusion is an in-memory query engine that uses Apache Arrow as the memory model
//! CoalesceBatchesExec combines small batches into larger batches for more efficient use of
//! vectorized processing by upstream operators.

use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use crate::error::Result;
use crate::physical_plan::{
    DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning,
    RecordBatchStream, SendableRecordBatchStream,

use crate::execution::context::TaskContext;
use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
use futures::stream::{Stream, StreamExt};
use log::trace;

use super::expressions::PhysicalSortExpr;
use super::metrics::{BaselineMetrics, MetricsSet};
use super::{metrics::ExecutionPlanMetricsSet, Statistics};

/// CoalesceBatchesExec combines small batches into larger batches for more efficient use of
/// vectorized processing by upstream operators.
pub struct CoalesceBatchesExec {
    /// The input plan
    input: Arc<dyn ExecutionPlan>,
    /// Minimum number of rows for coalesces batches
    target_batch_size: usize,
    /// Execution metrics
    metrics: ExecutionPlanMetricsSet,

impl CoalesceBatchesExec {
    /// Create a new CoalesceBatchesExec
    pub fn new(input: Arc<dyn ExecutionPlan>, target_batch_size: usize) -> Self {
        Self {
            metrics: ExecutionPlanMetricsSet::new(),

    /// The input plan
    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {

    /// Minimum number of rows for coalesces batches
    pub fn target_batch_size(&self) -> usize {

impl ExecutionPlan for CoalesceBatchesExec {
    /// Return a reference to Any that can be used for downcasting
    fn as_any(&self) -> &dyn Any {

    /// Get the schema for this execution plan
    fn schema(&self) -> SchemaRef {
        // The coalesce batches operator does not make any changes to the schema of its input

    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {

    /// Get the output partitioning of this plan
    fn output_partitioning(&self) -> Partitioning {
        // The coalesce batches operator does not make any changes to the partitioning of its input

    /// Specifies whether this plan generates an infinite stream of records.
    /// If the plan does not support pipelining, but it its input(s) are
    /// infinite, returns an error to indicate this.
    fn unbounded_output(&self, children: &[bool]) -> Result<bool> {

    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
        // The coalesce batches operator does not make any changes to the sorting of its input

    fn equivalence_properties(&self) -> EquivalenceProperties {

    fn with_new_children(
        self: Arc<Self>,
        children: Vec<Arc<dyn ExecutionPlan>>,
    ) -> Result<Arc<dyn ExecutionPlan>> {

    fn execute(
        partition: usize,
        context: Arc<TaskContext>,
    ) -> Result<SendableRecordBatchStream> {
        Ok(Box::pin(CoalesceBatchesStream {
            input: self.input.execute(partition, context)?,
            schema: self.input.schema(),
            target_batch_size: self.target_batch_size,
            buffer: Vec::new(),
            buffered_rows: 0,
            is_closed: false,
            baseline_metrics: BaselineMetrics::new(&self.metrics, partition),

    fn fmt_as(
        t: DisplayFormatType,
        f: &mut std::fmt::Formatter,
    ) -> std::fmt::Result {
        match t {
            DisplayFormatType::Default => {
                    "CoalesceBatchesExec: target_batch_size={}",

    fn metrics(&self) -> Option<MetricsSet> {

    fn statistics(&self) -> Statistics {

struct CoalesceBatchesStream {
    /// The input plan
    input: SendableRecordBatchStream,
    /// The input schema
    schema: SchemaRef,
    /// Minimum number of rows for coalesces batches
    target_batch_size: usize,
    /// Buffered batches
    buffer: Vec<RecordBatch>,
    /// Buffered row count
    buffered_rows: usize,
    /// Whether the stream has finished returning all of its data or not
    is_closed: bool,
    /// Execution metrics
    baseline_metrics: BaselineMetrics,

impl Stream for CoalesceBatchesStream {
    type Item = ArrowResult<RecordBatch>;

    fn poll_next(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        let poll = self.poll_next_inner(cx);

    fn size_hint(&self) -> (usize, Option<usize>) {
        // we can't predict the size of incoming batches so re-use the size hint from the input

impl CoalesceBatchesStream {
    fn poll_next_inner(
        self: &mut Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<ArrowResult<RecordBatch>>> {
        // Get a clone (uses same underlying atomic) as self gets borrowed below
        let cloned_time = self.baseline_metrics.elapsed_compute().clone();

        if self.is_closed {
            return Poll::Ready(None);
        loop {
            let input_batch = self.input.poll_next_unpin(cx);
            // records time on drop
            let _timer = cloned_time.timer();
            match input_batch {
                Poll::Ready(x) => match x {
                    Some(Ok(ref batch)) => {
                        if batch.num_rows() >= self.target_batch_size
                            && self.buffer.is_empty()
                            return Poll::Ready(Some(Ok(batch.clone())));
                        } else if batch.num_rows() == 0 {
                            // discard empty batches
                        } else {
                            // add to the buffered batches
                            self.buffered_rows += batch.num_rows();
                            // check to see if we have enough batches yet
                            if self.buffered_rows >= self.target_batch_size {
                                // combine the batches and return
                                let batch = concat_batches(
                                // reset buffer state
                                self.buffered_rows = 0;
                                // return batch
                                return Poll::Ready(Some(Ok(batch)));
                    None => {
                        self.is_closed = true;
                        // we have reached the end of the input stream but there could still
                        // be buffered batches
                        if self.buffer.is_empty() {
                            return Poll::Ready(None);
                        } else {
                            // combine the batches and return
                            let batch = concat_batches(
                            // reset buffer state
                            self.buffered_rows = 0;
                            // return batch
                            return Poll::Ready(Some(Ok(batch)));
                    other => return Poll::Ready(other),
                Poll::Pending => return Poll::Pending,

impl RecordBatchStream for CoalesceBatchesStream {
    fn schema(&self) -> SchemaRef {

/// Concatenates an array of `RecordBatch` into one batch
pub fn concat_batches(
    schema: &SchemaRef,
    batches: &[RecordBatch],
    row_count: usize,
) -> ArrowResult<RecordBatch> {
        "Combined {} batches containing {} rows",
    let b = arrow::compute::concat_batches(schema, batches)?;

mod tests {
    use super::*;
    use crate::config::ConfigOptions;
    use crate::datasource::MemTable;
    use crate::physical_plan::filter::FilterExec;
    use crate::physical_plan::projection::ProjectionExec;
    use crate::physical_plan::{memory::MemoryExec, repartition::RepartitionExec};
    use crate::prelude::SessionContext;
    use crate::test::create_vec_batches;
    use arrow::datatypes::{DataType, Field, Schema};

    async fn test_custom_batch_size() -> Result<()> {
        let mut config = ConfigOptions::new();
        config.execution.batch_size = 1234;

        let ctx = SessionContext::with_config(config.into());
        let plan = create_physical_plan(ctx).await?;
        let projection = plan.as_any().downcast_ref::<ProjectionExec>().unwrap();
        let coalesce = projection
        assert_eq!(1234, coalesce.target_batch_size);

    async fn test_disable_coalesce() -> Result<()> {
        let mut config = ConfigOptions::new();
        config.execution.coalesce_batches = false;

        let ctx = SessionContext::with_config(config.into());
        let plan = create_physical_plan(ctx).await?;
        let projection = plan.as_any().downcast_ref::<ProjectionExec>().unwrap();
        // projection should directly wrap filter with no coalesce step
        let _filter = projection

    async fn create_physical_plan(ctx: SessionContext) -> Result<Arc<dyn ExecutionPlan>> {
        let schema = test_schema();
        let partition = create_vec_batches(&schema, 10);
        let table = MemTable::try_new(schema, vec![partition])?;
        ctx.register_table("a", Arc::new(table))?;
        let dataframe = ctx.sql("SELECT * FROM a WHERE c0 < 1").await?;

    #[tokio::test(flavor = "multi_thread")]
    async fn test_concat_batches() -> Result<()> {
        let schema = test_schema();
        let partition = create_vec_batches(&schema, 10);
        let partitions = vec![partition];

        let output_partitions = coalesce_batches(&schema, partitions, 21).await?;
        assert_eq!(1, output_partitions.len());

        // input is 10 batches x 8 rows (80 rows)
        // expected output is batches of at least 20 rows (except for the final batch)
        let batches = &output_partitions[0];
        assert_eq!(4, batches.len());
        assert_eq!(24, batches[0].num_rows());
        assert_eq!(24, batches[1].num_rows());
        assert_eq!(24, batches[2].num_rows());
        assert_eq!(8, batches[3].num_rows());


    fn test_schema() -> Arc<Schema> {
        Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]))

    async fn coalesce_batches(
        schema: &SchemaRef,
        input_partitions: Vec<Vec<RecordBatch>>,
        target_batch_size: usize,
    ) -> Result<Vec<Vec<RecordBatch>>> {
        // create physical plan
        let exec = MemoryExec::try_new(&input_partitions, schema.clone(), None)?;
        let exec =
            RepartitionExec::try_new(Arc::new(exec), Partitioning::RoundRobinBatch(1))?;
        let exec: Arc<dyn ExecutionPlan> =
            Arc::new(CoalesceBatchesExec::new(Arc::new(exec), target_batch_size));

        // execute and collect results
        let output_partition_count = exec.output_partitioning().partition_count();
        let mut output_partitions = Vec::with_capacity(output_partition_count);
        let session_ctx = SessionContext::new();
        for i in 0..output_partition_count {
            // execute this *output* partition and collect all batches
            let task_ctx = session_ctx.task_ctx();
            let mut stream = exec.execute(i, task_ctx.clone())?;
            let mut batches = vec![];
            while let Some(result) = {