Skip to main content

datafusion_physical_plan/
empty.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! EmptyRelation with produce_one_row=false execution plan
19
20use std::any::Any;
21use std::sync::Arc;
22
23use crate::memory::MemoryStream;
24use crate::{DisplayAs, PlanProperties, SendableRecordBatchStream, Statistics, common};
25use crate::{
26    DisplayFormatType, ExecutionPlan, Partitioning,
27    execution_plan::{Boundedness, EmissionType},
28};
29
30use arrow::datatypes::SchemaRef;
31use arrow::record_batch::RecordBatch;
32use datafusion_common::{Result, assert_or_internal_err};
33use datafusion_execution::TaskContext;
34use datafusion_physical_expr::EquivalenceProperties;
35
36use crate::execution_plan::SchedulingType;
37use log::trace;
38
39/// Execution plan for empty relation with produce_one_row=false
40#[derive(Debug, Clone)]
41pub struct EmptyExec {
42    /// The schema for the produced row
43    schema: SchemaRef,
44    /// Number of partitions
45    partitions: usize,
46    cache: PlanProperties,
47}
48
49impl EmptyExec {
50    /// Create a new EmptyExec
51    pub fn new(schema: SchemaRef) -> Self {
52        let cache = Self::compute_properties(Arc::clone(&schema), 1);
53        EmptyExec {
54            schema,
55            partitions: 1,
56            cache,
57        }
58    }
59
60    /// Create a new EmptyExec with specified partition number
61    pub fn with_partitions(mut self, partitions: usize) -> Self {
62        self.partitions = partitions;
63        // Changing partitions may invalidate output partitioning, so update it:
64        let output_partitioning = Self::output_partitioning_helper(self.partitions);
65        self.cache = self.cache.with_partitioning(output_partitioning);
66        self
67    }
68
69    fn data(&self) -> Result<Vec<RecordBatch>> {
70        Ok(vec![])
71    }
72
73    fn output_partitioning_helper(n_partitions: usize) -> Partitioning {
74        Partitioning::UnknownPartitioning(n_partitions)
75    }
76
77    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
78    fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties {
79        PlanProperties::new(
80            EquivalenceProperties::new(schema),
81            Self::output_partitioning_helper(n_partitions),
82            EmissionType::Incremental,
83            Boundedness::Bounded,
84        )
85        .with_scheduling_type(SchedulingType::Cooperative)
86    }
87}
88
89impl DisplayAs for EmptyExec {
90    fn fmt_as(
91        &self,
92        t: DisplayFormatType,
93        f: &mut std::fmt::Formatter,
94    ) -> std::fmt::Result {
95        match t {
96            DisplayFormatType::Default | DisplayFormatType::Verbose => {
97                write!(f, "EmptyExec")
98            }
99            DisplayFormatType::TreeRender => {
100                // TODO: collect info
101                write!(f, "")
102            }
103        }
104    }
105}
106
107impl ExecutionPlan for EmptyExec {
108    fn name(&self) -> &'static str {
109        "EmptyExec"
110    }
111
112    /// Return a reference to Any that can be used for downcasting
113    fn as_any(&self) -> &dyn Any {
114        self
115    }
116
117    fn properties(&self) -> &PlanProperties {
118        &self.cache
119    }
120
121    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
122        vec![]
123    }
124
125    fn with_new_children(
126        self: Arc<Self>,
127        _: Vec<Arc<dyn ExecutionPlan>>,
128    ) -> Result<Arc<dyn ExecutionPlan>> {
129        Ok(self)
130    }
131
132    fn execute(
133        &self,
134        partition: usize,
135        context: Arc<TaskContext>,
136    ) -> Result<SendableRecordBatchStream> {
137        trace!(
138            "Start EmptyExec::execute for partition {} of context session_id {} and task_id {:?}",
139            partition,
140            context.session_id(),
141            context.task_id()
142        );
143
144        assert_or_internal_err!(
145            partition < self.partitions,
146            "EmptyExec invalid partition {} (expected less than {})",
147            partition,
148            self.partitions
149        );
150
151        Ok(Box::pin(MemoryStream::try_new(
152            self.data()?,
153            Arc::clone(&self.schema),
154            None,
155        )?))
156    }
157
158    fn statistics(&self) -> Result<Statistics> {
159        self.partition_statistics(None)
160    }
161
162    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
163        if let Some(partition) = partition {
164            assert_or_internal_err!(
165                partition < self.partitions,
166                "EmptyExec invalid partition {} (expected less than {})",
167                partition,
168                self.partitions
169            );
170        }
171
172        let batch = self
173            .data()
174            .expect("Create empty RecordBatch should not fail");
175        Ok(common::compute_record_batch_statistics(
176            &[batch],
177            &self.schema,
178            None,
179        ))
180    }
181}
182
183#[cfg(test)]
184mod tests {
185    use super::*;
186    use crate::test;
187    use crate::with_new_children_if_necessary;
188
189    #[tokio::test]
190    async fn empty() -> Result<()> {
191        let task_ctx = Arc::new(TaskContext::default());
192        let schema = test::aggr_test_schema();
193
194        let empty = EmptyExec::new(Arc::clone(&schema));
195        assert_eq!(empty.schema(), schema);
196
197        // We should have no results
198        let iter = empty.execute(0, task_ctx)?;
199        let batches = common::collect(iter).await?;
200        assert!(batches.is_empty());
201
202        Ok(())
203    }
204
205    #[test]
206    fn with_new_children() -> Result<()> {
207        let schema = test::aggr_test_schema();
208        let empty = Arc::new(EmptyExec::new(Arc::clone(&schema)));
209
210        let empty2 = with_new_children_if_necessary(
211            Arc::clone(&empty) as Arc<dyn ExecutionPlan>,
212            vec![],
213        )?;
214        assert_eq!(empty.schema(), empty2.schema());
215
216        let too_many_kids = vec![empty2];
217        assert!(
218            with_new_children_if_necessary(empty, too_many_kids).is_err(),
219            "expected error when providing list of kids"
220        );
221        Ok(())
222    }
223
224    #[tokio::test]
225    async fn invalid_execute() -> Result<()> {
226        let task_ctx = Arc::new(TaskContext::default());
227        let schema = test::aggr_test_schema();
228        let empty = EmptyExec::new(schema);
229
230        // ask for the wrong partition
231        assert!(empty.execute(1, Arc::clone(&task_ctx)).is_err());
232        assert!(empty.execute(20, task_ctx).is_err());
233        Ok(())
234    }
235}