datafusion_physical_plan/
empty.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! EmptyRelation with produce_one_row=false execution plan
19
20use std::any::Any;
21use std::sync::Arc;
22
23use crate::memory::MemoryStream;
24use crate::{common, DisplayAs, PlanProperties, SendableRecordBatchStream, Statistics};
25use crate::{
26    execution_plan::{Boundedness, EmissionType},
27    DisplayFormatType, ExecutionPlan, Partitioning,
28};
29
30use arrow::datatypes::SchemaRef;
31use arrow::record_batch::RecordBatch;
32use datafusion_common::{internal_err, Result};
33use datafusion_execution::TaskContext;
34use datafusion_physical_expr::EquivalenceProperties;
35
36use crate::execution_plan::SchedulingType;
37use log::trace;
38
39/// Execution plan for empty relation with produce_one_row=false
40#[derive(Debug, Clone)]
41pub struct EmptyExec {
42    /// The schema for the produced row
43    schema: SchemaRef,
44    /// Number of partitions
45    partitions: usize,
46    cache: PlanProperties,
47}
48
49impl EmptyExec {
50    /// Create a new EmptyExec
51    pub fn new(schema: SchemaRef) -> Self {
52        let cache = Self::compute_properties(Arc::clone(&schema), 1);
53        EmptyExec {
54            schema,
55            partitions: 1,
56            cache,
57        }
58    }
59
60    /// Create a new EmptyExec with specified partition number
61    pub fn with_partitions(mut self, partitions: usize) -> Self {
62        self.partitions = partitions;
63        // Changing partitions may invalidate output partitioning, so update it:
64        let output_partitioning = Self::output_partitioning_helper(self.partitions);
65        self.cache = self.cache.with_partitioning(output_partitioning);
66        self
67    }
68
69    fn data(&self) -> Result<Vec<RecordBatch>> {
70        Ok(vec![])
71    }
72
73    fn output_partitioning_helper(n_partitions: usize) -> Partitioning {
74        Partitioning::UnknownPartitioning(n_partitions)
75    }
76
77    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
78    fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties {
79        PlanProperties::new(
80            EquivalenceProperties::new(schema),
81            Self::output_partitioning_helper(n_partitions),
82            EmissionType::Incremental,
83            Boundedness::Bounded,
84        )
85        .with_scheduling_type(SchedulingType::Cooperative)
86    }
87}
88
89impl DisplayAs for EmptyExec {
90    fn fmt_as(
91        &self,
92        t: DisplayFormatType,
93        f: &mut std::fmt::Formatter,
94    ) -> std::fmt::Result {
95        match t {
96            DisplayFormatType::Default | DisplayFormatType::Verbose => {
97                write!(f, "EmptyExec")
98            }
99            DisplayFormatType::TreeRender => {
100                // TODO: collect info
101                write!(f, "")
102            }
103        }
104    }
105}
106
107impl ExecutionPlan for EmptyExec {
108    fn name(&self) -> &'static str {
109        "EmptyExec"
110    }
111
112    /// Return a reference to Any that can be used for downcasting
113    fn as_any(&self) -> &dyn Any {
114        self
115    }
116
117    fn properties(&self) -> &PlanProperties {
118        &self.cache
119    }
120
121    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
122        vec![]
123    }
124
125    fn with_new_children(
126        self: Arc<Self>,
127        _: Vec<Arc<dyn ExecutionPlan>>,
128    ) -> Result<Arc<dyn ExecutionPlan>> {
129        Ok(self)
130    }
131
132    fn execute(
133        &self,
134        partition: usize,
135        context: Arc<TaskContext>,
136    ) -> Result<SendableRecordBatchStream> {
137        trace!("Start EmptyExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
138
139        if partition >= self.partitions {
140            return internal_err!(
141                "EmptyExec invalid partition {} (expected less than {})",
142                partition,
143                self.partitions
144            );
145        }
146
147        Ok(Box::pin(MemoryStream::try_new(
148            self.data()?,
149            Arc::clone(&self.schema),
150            None,
151        )?))
152    }
153
154    fn statistics(&self) -> Result<Statistics> {
155        self.partition_statistics(None)
156    }
157
158    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
159        if let Some(partition) = partition {
160            if partition >= self.partitions {
161                return internal_err!(
162                    "EmptyExec invalid partition {} (expected less than {})",
163                    partition,
164                    self.partitions
165                );
166            }
167        }
168
169        let batch = self
170            .data()
171            .expect("Create empty RecordBatch should not fail");
172        Ok(common::compute_record_batch_statistics(
173            &[batch],
174            &self.schema,
175            None,
176        ))
177    }
178}
179
180#[cfg(test)]
181mod tests {
182    use super::*;
183    use crate::test;
184    use crate::with_new_children_if_necessary;
185
186    #[tokio::test]
187    async fn empty() -> Result<()> {
188        let task_ctx = Arc::new(TaskContext::default());
189        let schema = test::aggr_test_schema();
190
191        let empty = EmptyExec::new(Arc::clone(&schema));
192        assert_eq!(empty.schema(), schema);
193
194        // We should have no results
195        let iter = empty.execute(0, task_ctx)?;
196        let batches = common::collect(iter).await?;
197        assert!(batches.is_empty());
198
199        Ok(())
200    }
201
202    #[test]
203    fn with_new_children() -> Result<()> {
204        let schema = test::aggr_test_schema();
205        let empty = Arc::new(EmptyExec::new(Arc::clone(&schema)));
206
207        let empty2 = with_new_children_if_necessary(
208            Arc::clone(&empty) as Arc<dyn ExecutionPlan>,
209            vec![],
210        )?;
211        assert_eq!(empty.schema(), empty2.schema());
212
213        let too_many_kids = vec![empty2];
214        assert!(
215            with_new_children_if_necessary(empty, too_many_kids).is_err(),
216            "expected error when providing list of kids"
217        );
218        Ok(())
219    }
220
221    #[tokio::test]
222    async fn invalid_execute() -> Result<()> {
223        let task_ctx = Arc::new(TaskContext::default());
224        let schema = test::aggr_test_schema();
225        let empty = EmptyExec::new(schema);
226
227        // ask for the wrong partition
228        assert!(empty.execute(1, Arc::clone(&task_ctx)).is_err());
229        assert!(empty.execute(20, task_ctx).is_err());
230        Ok(())
231    }
232}