Skip to main content

datafusion_physical_plan/
empty.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! EmptyRelation with produce_one_row=false execution plan
19
20use std::any::Any;
21use std::sync::Arc;
22
23use crate::memory::MemoryStream;
24use crate::{DisplayAs, PlanProperties, SendableRecordBatchStream, Statistics};
25use crate::{
26    DisplayFormatType, ExecutionPlan, Partitioning,
27    execution_plan::{Boundedness, EmissionType},
28};
29
30use arrow::datatypes::SchemaRef;
31use arrow::record_batch::RecordBatch;
32use datafusion_common::stats::Precision;
33use datafusion_common::{ColumnStatistics, Result, ScalarValue, assert_or_internal_err};
34use datafusion_execution::TaskContext;
35use datafusion_physical_expr::EquivalenceProperties;
36
37use crate::execution_plan::SchedulingType;
38use log::trace;
39
40/// Execution plan for empty relation with produce_one_row=false
41#[derive(Debug, Clone)]
42pub struct EmptyExec {
43    /// The schema for the produced row
44    schema: SchemaRef,
45    /// Number of partitions
46    partitions: usize,
47    cache: Arc<PlanProperties>,
48}
49
50impl EmptyExec {
51    /// Create a new EmptyExec
52    pub fn new(schema: SchemaRef) -> Self {
53        let cache = Self::compute_properties(Arc::clone(&schema), 1);
54        EmptyExec {
55            schema,
56            partitions: 1,
57            cache: Arc::new(cache),
58        }
59    }
60
61    /// Create a new EmptyExec with specified partition number
62    pub fn with_partitions(mut self, partitions: usize) -> Self {
63        self.partitions = partitions;
64        // Changing partitions may invalidate output partitioning, so update it:
65        let output_partitioning = Self::output_partitioning_helper(self.partitions);
66        Arc::make_mut(&mut self.cache).partitioning = output_partitioning;
67        self
68    }
69
70    fn data(&self) -> Result<Vec<RecordBatch>> {
71        Ok(vec![])
72    }
73
74    fn output_partitioning_helper(n_partitions: usize) -> Partitioning {
75        Partitioning::UnknownPartitioning(n_partitions)
76    }
77
78    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
79    fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties {
80        PlanProperties::new(
81            EquivalenceProperties::new(schema),
82            Self::output_partitioning_helper(n_partitions),
83            EmissionType::Incremental,
84            Boundedness::Bounded,
85        )
86        .with_scheduling_type(SchedulingType::Cooperative)
87    }
88}
89
90impl DisplayAs for EmptyExec {
91    fn fmt_as(
92        &self,
93        t: DisplayFormatType,
94        f: &mut std::fmt::Formatter,
95    ) -> std::fmt::Result {
96        match t {
97            DisplayFormatType::Default | DisplayFormatType::Verbose => {
98                write!(f, "EmptyExec")
99            }
100            DisplayFormatType::TreeRender => {
101                // TODO: collect info
102                write!(f, "")
103            }
104        }
105    }
106}
107
108impl ExecutionPlan for EmptyExec {
109    fn name(&self) -> &'static str {
110        "EmptyExec"
111    }
112
113    /// Return a reference to Any that can be used for downcasting
114    fn as_any(&self) -> &dyn Any {
115        self
116    }
117
118    fn properties(&self) -> &Arc<PlanProperties> {
119        &self.cache
120    }
121
122    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
123        vec![]
124    }
125
126    fn with_new_children(
127        self: Arc<Self>,
128        _: Vec<Arc<dyn ExecutionPlan>>,
129    ) -> Result<Arc<dyn ExecutionPlan>> {
130        Ok(self)
131    }
132
133    fn execute(
134        &self,
135        partition: usize,
136        context: Arc<TaskContext>,
137    ) -> Result<SendableRecordBatchStream> {
138        trace!(
139            "Start EmptyExec::execute for partition {} of context session_id {} and task_id {:?}",
140            partition,
141            context.session_id(),
142            context.task_id()
143        );
144
145        assert_or_internal_err!(
146            partition < self.partitions,
147            "EmptyExec invalid partition {} (expected less than {})",
148            partition,
149            self.partitions
150        );
151
152        Ok(Box::pin(MemoryStream::try_new(
153            self.data()?,
154            Arc::clone(&self.schema),
155            None,
156        )?))
157    }
158
159    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
160        if let Some(partition) = partition {
161            assert_or_internal_err!(
162                partition < self.partitions,
163                "EmptyExec invalid partition {} (expected less than {})",
164                partition,
165                self.partitions
166            );
167        }
168
169        // Build explicit stats: exact zero rows and bytes, with explicit known column stats
170        let mut stats = Statistics::default()
171            .with_num_rows(Precision::Exact(0))
172            .with_total_byte_size(Precision::Exact(0));
173
174        // Add explicit column stats for each field in schema
175        for _ in self.schema.fields() {
176            stats = stats.add_column_statistics(ColumnStatistics {
177                null_count: Precision::Exact(0),
178                distinct_count: Precision::Exact(0),
179                min_value: Precision::<ScalarValue>::Absent,
180                max_value: Precision::<ScalarValue>::Absent,
181                sum_value: Precision::<ScalarValue>::Absent,
182                byte_size: Precision::Exact(0),
183            });
184        }
185
186        Ok(stats)
187    }
188}
189
190#[cfg(test)]
191mod tests {
192    use super::*;
193    use crate::common;
194    use crate::test;
195    use crate::with_new_children_if_necessary;
196
197    #[tokio::test]
198    async fn empty() -> Result<()> {
199        let task_ctx = Arc::new(TaskContext::default());
200        let schema = test::aggr_test_schema();
201
202        let empty = EmptyExec::new(Arc::clone(&schema));
203        assert_eq!(empty.schema(), schema);
204
205        // We should have no results
206        let iter = empty.execute(0, task_ctx)?;
207        let batches = common::collect(iter).await?;
208        assert!(batches.is_empty());
209
210        Ok(())
211    }
212
213    #[test]
214    fn with_new_children() -> Result<()> {
215        let schema = test::aggr_test_schema();
216        let empty = Arc::new(EmptyExec::new(Arc::clone(&schema)));
217
218        let empty2 = with_new_children_if_necessary(
219            Arc::clone(&empty) as Arc<dyn ExecutionPlan>,
220            vec![],
221        )?;
222        assert_eq!(empty.schema(), empty2.schema());
223
224        let too_many_kids = vec![empty2];
225        assert!(
226            with_new_children_if_necessary(empty, too_many_kids).is_err(),
227            "expected error when providing list of kids"
228        );
229        Ok(())
230    }
231
232    #[tokio::test]
233    async fn invalid_execute() -> Result<()> {
234        let task_ctx = Arc::new(TaskContext::default());
235        let schema = test::aggr_test_schema();
236        let empty = EmptyExec::new(schema);
237
238        // ask for the wrong partition
239        assert!(empty.execute(1, Arc::clone(&task_ctx)).is_err());
240        assert!(empty.execute(20, task_ctx).is_err());
241        Ok(())
242    }
243}