use super::optimizer::PhysicalOptimizerRule;
use crate::{
error::Result,
physical_plan::{
coalesce_batches::CoalesceBatchesExec, filter::FilterExec,
hash_join::HashJoinExec, repartition::RepartitionExec,
},
};
use std::sync::Arc;
pub struct CoalesceBatches {}
impl CoalesceBatches {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}
impl PhysicalOptimizerRule for CoalesceBatches {
fn optimize(
&self,
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
config: &crate::execution::context::ExecutionConfig,
) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
let children = plan
.children()
.iter()
.map(|child| self.optimize(child.clone(), config))
.collect::<Result<Vec<_>>>()?;
let plan_any = plan.as_any();
let wrap_in_coalesce = plan_any.downcast_ref::<FilterExec>().is_some()
|| plan_any.downcast_ref::<HashJoinExec>().is_some()
|| plan_any.downcast_ref::<RepartitionExec>().is_some();
if plan.children().is_empty() {
Ok(plan.clone())
} else {
let plan = plan.with_new_children(children)?;
Ok(if wrap_in_coalesce {
let target_batch_size = config.batch_size / 2;
Arc::new(CoalesceBatchesExec::new(plan.clone(), target_batch_size))
} else {
plan.clone()
})
}
}
fn name(&self) -> &str {
"coalesce_batches"
}
}