Skip to main content

datafusion_physical_plan/
empty.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! EmptyRelation with produce_one_row=false execution plan
19
20use std::sync::Arc;
21
22use crate::memory::MemoryStream;
23use crate::{DisplayAs, PlanProperties, SendableRecordBatchStream, Statistics};
24use crate::{
25    DisplayFormatType, ExecutionPlan, Partitioning,
26    execution_plan::{Boundedness, EmissionType},
27};
28
29use arrow::datatypes::SchemaRef;
30use arrow::record_batch::RecordBatch;
31use datafusion_common::stats::Precision;
32use datafusion_common::{ColumnStatistics, Result, ScalarValue, assert_or_internal_err};
33use datafusion_execution::TaskContext;
34use datafusion_physical_expr::EquivalenceProperties;
35
36use crate::execution_plan::SchedulingType;
37use log::trace;
38
39/// Execution plan for empty relation with produce_one_row=false
40#[derive(Debug, Clone)]
41pub struct EmptyExec {
42    /// The schema for the produced row
43    schema: SchemaRef,
44    /// Number of partitions
45    partitions: usize,
46    cache: Arc<PlanProperties>,
47}
48
49impl EmptyExec {
50    /// Create a new EmptyExec
51    pub fn new(schema: SchemaRef) -> Self {
52        let cache = Self::compute_properties(Arc::clone(&schema), 1);
53        EmptyExec {
54            schema,
55            partitions: 1,
56            cache: Arc::new(cache),
57        }
58    }
59
60    /// Create a new EmptyExec with specified partition number
61    pub fn with_partitions(mut self, partitions: usize) -> Self {
62        self.partitions = partitions;
63        // Changing partitions may invalidate output partitioning, so update it:
64        let output_partitioning = Self::output_partitioning_helper(self.partitions);
65        Arc::make_mut(&mut self.cache).partitioning = output_partitioning;
66        self
67    }
68
69    fn data(&self) -> Result<Vec<RecordBatch>> {
70        Ok(vec![])
71    }
72
73    fn output_partitioning_helper(n_partitions: usize) -> Partitioning {
74        Partitioning::UnknownPartitioning(n_partitions)
75    }
76
77    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
78    fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties {
79        PlanProperties::new(
80            EquivalenceProperties::new(schema),
81            Self::output_partitioning_helper(n_partitions),
82            EmissionType::Incremental,
83            Boundedness::Bounded,
84        )
85        .with_scheduling_type(SchedulingType::Cooperative)
86    }
87}
88
89impl DisplayAs for EmptyExec {
90    fn fmt_as(
91        &self,
92        t: DisplayFormatType,
93        f: &mut std::fmt::Formatter,
94    ) -> std::fmt::Result {
95        match t {
96            DisplayFormatType::Default | DisplayFormatType::Verbose => {
97                write!(f, "EmptyExec")
98            }
99            DisplayFormatType::TreeRender => {
100                // TODO: collect info
101                write!(f, "")
102            }
103        }
104    }
105}
106
107impl ExecutionPlan for EmptyExec {
108    fn name(&self) -> &'static str {
109        "EmptyExec"
110    }
111
112    /// Return a reference to Any that can be used for downcasting
113    fn properties(&self) -> &Arc<PlanProperties> {
114        &self.cache
115    }
116
117    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
118        vec![]
119    }
120
121    fn with_new_children(
122        self: Arc<Self>,
123        _: Vec<Arc<dyn ExecutionPlan>>,
124    ) -> Result<Arc<dyn ExecutionPlan>> {
125        Ok(self)
126    }
127
128    fn execute(
129        &self,
130        partition: usize,
131        context: Arc<TaskContext>,
132    ) -> Result<SendableRecordBatchStream> {
133        trace!(
134            "Start EmptyExec::execute for partition {} of context session_id {} and task_id {:?}",
135            partition,
136            context.session_id(),
137            context.task_id()
138        );
139
140        assert_or_internal_err!(
141            partition < self.partitions,
142            "EmptyExec invalid partition {} (expected less than {})",
143            partition,
144            self.partitions
145        );
146
147        Ok(Box::pin(MemoryStream::try_new(
148            self.data()?,
149            Arc::clone(&self.schema),
150            None,
151        )?))
152    }
153
154    fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
155        if let Some(partition) = partition {
156            assert_or_internal_err!(
157                partition < self.partitions,
158                "EmptyExec invalid partition {} (expected less than {})",
159                partition,
160                self.partitions
161            );
162        }
163
164        // Build explicit stats: exact zero rows and bytes, with explicit known column stats
165        let mut stats = Statistics::default()
166            .with_num_rows(Precision::Exact(0))
167            .with_total_byte_size(Precision::Exact(0));
168
169        // Add explicit column stats for each field in schema
170        for _ in self.schema.fields() {
171            stats = stats.add_column_statistics(ColumnStatistics {
172                null_count: Precision::Exact(0),
173                distinct_count: Precision::Exact(0),
174                min_value: Precision::<ScalarValue>::Absent,
175                max_value: Precision::<ScalarValue>::Absent,
176                sum_value: Precision::<ScalarValue>::Absent,
177                byte_size: Precision::Exact(0),
178            });
179        }
180
181        Ok(Arc::new(stats))
182    }
183}
184
185#[cfg(test)]
186mod tests {
187    use super::*;
188    use crate::common;
189    use crate::test;
190    use crate::with_new_children_if_necessary;
191
192    #[tokio::test]
193    async fn empty() -> Result<()> {
194        let task_ctx = Arc::new(TaskContext::default());
195        let schema = test::aggr_test_schema();
196
197        let empty = EmptyExec::new(Arc::clone(&schema));
198        assert_eq!(empty.schema(), schema);
199
200        // We should have no results
201        let iter = empty.execute(0, task_ctx)?;
202        let batches = common::collect(iter).await?;
203        assert!(batches.is_empty());
204
205        Ok(())
206    }
207
208    #[test]
209    fn with_new_children() -> Result<()> {
210        let schema = test::aggr_test_schema();
211        let empty = Arc::new(EmptyExec::new(Arc::clone(&schema)));
212
213        let empty2 = with_new_children_if_necessary(
214            Arc::clone(&empty) as Arc<dyn ExecutionPlan>,
215            vec![],
216        )?;
217        assert_eq!(empty.schema(), empty2.schema());
218
219        let too_many_kids = vec![empty2];
220        assert!(
221            with_new_children_if_necessary(empty, too_many_kids).is_err(),
222            "expected error when providing list of kids"
223        );
224        Ok(())
225    }
226
227    #[tokio::test]
228    async fn invalid_execute() -> Result<()> {
229        let task_ctx = Arc::new(TaskContext::default());
230        let schema = test::aggr_test_schema();
231        let empty = EmptyExec::new(schema);
232
233        // ask for the wrong partition
234        assert!(empty.execute(1, Arc::clone(&task_ctx)).is_err());
235        assert!(empty.execute(20, task_ctx).is_err());
236        Ok(())
237    }
238}