use crate::{
error::Result,
physical_optimizer::PhysicalOptimizerRule,
physical_plan::{
coalesce_batches::CoalesceBatchesExec, filter::FilterExec,
hash_join::HashJoinExec, repartition::RepartitionExec,
with_new_children_if_necessary,
},
};
use std::sync::Arc;
#[derive(Default)]
pub struct CoalesceBatches {
target_batch_size: usize,
}
impl CoalesceBatches {
#[allow(missing_docs)]
pub fn new(target_batch_size: usize) -> Self {
Self { target_batch_size }
}
}
impl PhysicalOptimizerRule for CoalesceBatches {
fn optimize(
&self,
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
config: &crate::execution::context::SessionConfig,
) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
if plan.children().is_empty() {
Ok(plan.clone())
} else {
let children = plan
.children()
.iter()
.map(|child| self.optimize(child.clone(), config))
.collect::<Result<Vec<_>>>()?;
let plan = with_new_children_if_necessary(plan, children)?;
let plan_any = plan.as_any();
let wrap_in_coalesce = plan_any.downcast_ref::<FilterExec>().is_some()
|| plan_any.downcast_ref::<HashJoinExec>().is_some()
|| plan_any.downcast_ref::<RepartitionExec>().is_some();
Ok(if wrap_in_coalesce {
Arc::new(CoalesceBatchesExec::new(
plan.clone(),
self.target_batch_size,
))
} else {
plan.clone()
})
}
}
fn name(&self) -> &str {
"coalesce_batches"
}
}