datafusion_physical_optimizer/
coalesce_async_exec_input.rs1use crate::PhysicalOptimizerRule;
19use datafusion_common::config::ConfigOptions;
20use datafusion_common::internal_err;
21use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
22use datafusion_physical_plan::async_func::AsyncFuncExec;
23use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
24use datafusion_physical_plan::ExecutionPlan;
25use std::sync::Arc;
26
27#[derive(Default, Debug)]
29pub struct CoalesceAsyncExecInput {}
30
31impl CoalesceAsyncExecInput {
32 #[allow(missing_docs)]
33 pub fn new() -> Self {
34 Self::default()
35 }
36}
37
38impl PhysicalOptimizerRule for CoalesceAsyncExecInput {
39 fn optimize(
40 &self,
41 plan: Arc<dyn ExecutionPlan>,
42 config: &ConfigOptions,
43 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
44 let target_batch_size = config.execution.batch_size;
45 plan.transform(|plan| {
46 if let Some(async_exec) = plan.as_any().downcast_ref::<AsyncFuncExec>() {
47 if async_exec.children().len() != 1 {
48 return internal_err!(
49 "Expected AsyncFuncExec to have exactly one child"
50 );
51 }
52 let child = Arc::clone(async_exec.children()[0]);
53 let coalesce_exec =
54 Arc::new(CoalesceBatchesExec::new(child, target_batch_size));
55 let coalesce_async_exec = plan.with_new_children(vec![coalesce_exec])?;
56 Ok(Transformed::yes(coalesce_async_exec))
57 } else {
58 Ok(Transformed::no(plan))
59 }
60 })
61 .data()
62 }
63
64 fn name(&self) -> &str {
65 "coalesce_async_exec_input"
66 }
67
68 fn schema_check(&self) -> bool {
69 true
70 }
71}