datafusion_physical_optimizer/
coalesce_async_exec_input.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::PhysicalOptimizerRule;
19use datafusion_common::config::ConfigOptions;
20use datafusion_common::internal_err;
21use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
22use datafusion_physical_plan::async_func::AsyncFuncExec;
23use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
24use datafusion_physical_plan::ExecutionPlan;
25use std::sync::Arc;
26
27/// Optimizer rule that introduces CoalesceAsyncExec to reduce the number of async executions.
28#[derive(Default, Debug)]
29pub struct CoalesceAsyncExecInput {}
30
31impl CoalesceAsyncExecInput {
32    #[allow(missing_docs)]
33    pub fn new() -> Self {
34        Self::default()
35    }
36}
37
38impl PhysicalOptimizerRule for CoalesceAsyncExecInput {
39    fn optimize(
40        &self,
41        plan: Arc<dyn ExecutionPlan>,
42        config: &ConfigOptions,
43    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
44        let target_batch_size = config.execution.batch_size;
45        plan.transform(|plan| {
46            if let Some(async_exec) = plan.as_any().downcast_ref::<AsyncFuncExec>() {
47                if async_exec.children().len() != 1 {
48                    return internal_err!(
49                        "Expected AsyncFuncExec to have exactly one child"
50                    );
51                }
52                let child = Arc::clone(async_exec.children()[0]);
53                let coalesce_exec =
54                    Arc::new(CoalesceBatchesExec::new(child, target_batch_size));
55                let coalesce_async_exec = plan.with_new_children(vec![coalesce_exec])?;
56                Ok(Transformed::yes(coalesce_async_exec))
57            } else {
58                Ok(Transformed::no(plan))
59            }
60        })
61        .data()
62    }
63
64    fn name(&self) -> &str {
65        "coalesce_async_exec_input"
66    }
67
68    fn schema_check(&self) -> bool {
69        true
70    }
71}