datafusion_physical_plan/
empty.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! EmptyRelation with produce_one_row=false execution plan
19
20use std::any::Any;
21use std::sync::Arc;
22
23use crate::memory::MemoryStream;
24use crate::{common, DisplayAs, PlanProperties, SendableRecordBatchStream, Statistics};
25use crate::{
26    execution_plan::{Boundedness, EmissionType},
27    DisplayFormatType, ExecutionPlan, Partitioning,
28};
29
30use arrow::datatypes::SchemaRef;
31use arrow::record_batch::RecordBatch;
32use datafusion_common::{internal_err, Result};
33use datafusion_execution::TaskContext;
34use datafusion_physical_expr::EquivalenceProperties;
35
36use log::trace;
37
38/// Execution plan for empty relation with produce_one_row=false
39#[derive(Debug, Clone)]
40pub struct EmptyExec {
41    /// The schema for the produced row
42    schema: SchemaRef,
43    /// Number of partitions
44    partitions: usize,
45    cache: PlanProperties,
46}
47
48impl EmptyExec {
49    /// Create a new EmptyExec
50    pub fn new(schema: SchemaRef) -> Self {
51        let cache = Self::compute_properties(Arc::clone(&schema), 1);
52        EmptyExec {
53            schema,
54            partitions: 1,
55            cache,
56        }
57    }
58
59    /// Create a new EmptyExec with specified partition number
60    pub fn with_partitions(mut self, partitions: usize) -> Self {
61        self.partitions = partitions;
62        // Changing partitions may invalidate output partitioning, so update it:
63        let output_partitioning = Self::output_partitioning_helper(self.partitions);
64        self.cache = self.cache.with_partitioning(output_partitioning);
65        self
66    }
67
68    fn data(&self) -> Result<Vec<RecordBatch>> {
69        Ok(vec![])
70    }
71
72    fn output_partitioning_helper(n_partitions: usize) -> Partitioning {
73        Partitioning::UnknownPartitioning(n_partitions)
74    }
75
76    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
77    fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties {
78        PlanProperties::new(
79            EquivalenceProperties::new(schema),
80            Self::output_partitioning_helper(n_partitions),
81            EmissionType::Incremental,
82            Boundedness::Bounded,
83        )
84    }
85}
86
87impl DisplayAs for EmptyExec {
88    fn fmt_as(
89        &self,
90        t: DisplayFormatType,
91        f: &mut std::fmt::Formatter,
92    ) -> std::fmt::Result {
93        match t {
94            DisplayFormatType::Default | DisplayFormatType::Verbose => {
95                write!(f, "EmptyExec")
96            }
97            DisplayFormatType::TreeRender => {
98                // TODO: collect info
99                write!(f, "")
100            }
101        }
102    }
103}
104
105impl ExecutionPlan for EmptyExec {
106    fn name(&self) -> &'static str {
107        "EmptyExec"
108    }
109
110    /// Return a reference to Any that can be used for downcasting
111    fn as_any(&self) -> &dyn Any {
112        self
113    }
114
115    fn properties(&self) -> &PlanProperties {
116        &self.cache
117    }
118
119    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
120        vec![]
121    }
122
123    fn with_new_children(
124        self: Arc<Self>,
125        _: Vec<Arc<dyn ExecutionPlan>>,
126    ) -> Result<Arc<dyn ExecutionPlan>> {
127        Ok(self)
128    }
129
130    fn execute(
131        &self,
132        partition: usize,
133        context: Arc<TaskContext>,
134    ) -> Result<SendableRecordBatchStream> {
135        trace!("Start EmptyExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
136
137        if partition >= self.partitions {
138            return internal_err!(
139                "EmptyExec invalid partition {} (expected less than {})",
140                partition,
141                self.partitions
142            );
143        }
144
145        Ok(Box::pin(MemoryStream::try_new(
146            self.data()?,
147            Arc::clone(&self.schema),
148            None,
149        )?))
150    }
151
152    fn statistics(&self) -> Result<Statistics> {
153        self.partition_statistics(None)
154    }
155
156    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
157        if let Some(partition) = partition {
158            if partition >= self.partitions {
159                return internal_err!(
160                    "EmptyExec invalid partition {} (expected less than {})",
161                    partition,
162                    self.partitions
163                );
164            }
165        }
166
167        let batch = self
168            .data()
169            .expect("Create empty RecordBatch should not fail");
170        Ok(common::compute_record_batch_statistics(
171            &[batch],
172            &self.schema,
173            None,
174        ))
175    }
176}
177
178#[cfg(test)]
179mod tests {
180    use super::*;
181    use crate::test;
182    use crate::with_new_children_if_necessary;
183
184    #[tokio::test]
185    async fn empty() -> Result<()> {
186        let task_ctx = Arc::new(TaskContext::default());
187        let schema = test::aggr_test_schema();
188
189        let empty = EmptyExec::new(Arc::clone(&schema));
190        assert_eq!(empty.schema(), schema);
191
192        // We should have no results
193        let iter = empty.execute(0, task_ctx)?;
194        let batches = common::collect(iter).await?;
195        assert!(batches.is_empty());
196
197        Ok(())
198    }
199
200    #[test]
201    fn with_new_children() -> Result<()> {
202        let schema = test::aggr_test_schema();
203        let empty = Arc::new(EmptyExec::new(Arc::clone(&schema)));
204
205        let empty2 = with_new_children_if_necessary(
206            Arc::clone(&empty) as Arc<dyn ExecutionPlan>,
207            vec![],
208        )?;
209        assert_eq!(empty.schema(), empty2.schema());
210
211        let too_many_kids = vec![empty2];
212        assert!(
213            with_new_children_if_necessary(empty, too_many_kids).is_err(),
214            "expected error when providing list of kids"
215        );
216        Ok(())
217    }
218
219    #[tokio::test]
220    async fn invalid_execute() -> Result<()> {
221        let task_ctx = Arc::new(TaskContext::default());
222        let schema = test::aggr_test_schema();
223        let empty = EmptyExec::new(schema);
224
225        // ask for the wrong partition
226        assert!(empty.execute(1, Arc::clone(&task_ctx)).is_err());
227        assert!(empty.execute(20, task_ctx).is_err());
228        Ok(())
229    }
230}