Skip to main content

datafusion_physical_plan/
placeholder_row.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! EmptyRelation produce_one_row=true execution plan
19
20use std::sync::Arc;
21
22use crate::coop::cooperative;
23use crate::execution_plan::{Boundedness, EmissionType, SchedulingType};
24use crate::memory::MemoryStream;
25use crate::{
26    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
27    SendableRecordBatchStream, Statistics, common,
28};
29
30use arrow::array::{ArrayRef, NullArray, RecordBatch, RecordBatchOptions};
31use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
32use datafusion_common::{Result, assert_or_internal_err};
33use datafusion_execution::TaskContext;
34use datafusion_physical_expr::EquivalenceProperties;
35
36use log::trace;
37
38/// Execution plan for empty relation with produce_one_row=true
39#[derive(Debug, Clone)]
40pub struct PlaceholderRowExec {
41    /// The schema for the produced row
42    schema: SchemaRef,
43    /// Number of partitions
44    partitions: usize,
45    cache: Arc<PlanProperties>,
46}
47
48impl PlaceholderRowExec {
49    /// Create a new PlaceholderRowExec
50    pub fn new(schema: SchemaRef) -> Self {
51        let partitions = 1;
52        let cache = Self::compute_properties(Arc::clone(&schema), partitions);
53        PlaceholderRowExec {
54            schema,
55            partitions,
56            cache: Arc::new(cache),
57        }
58    }
59
60    /// Create a new PlaceholderRowExecPlaceholderRowExec with specified partition number
61    pub fn with_partitions(mut self, partitions: usize) -> Self {
62        self.partitions = partitions;
63        // Update output partitioning when updating partitions:
64        let output_partitioning = Self::output_partitioning_helper(self.partitions);
65        Arc::make_mut(&mut self.cache).partitioning = output_partitioning;
66        self
67    }
68
69    fn data(&self) -> Result<Vec<RecordBatch>> {
70        Ok({
71            let n_field = self.schema.fields.len();
72            vec![RecordBatch::try_new_with_options(
73                Arc::new(Schema::new(
74                    (0..n_field)
75                        .map(|i| {
76                            Field::new(format!("placeholder_{i}"), DataType::Null, true)
77                        })
78                        .collect::<Fields>(),
79                )),
80                (0..n_field)
81                    .map(|_i| {
82                        let ret: ArrayRef = Arc::new(NullArray::new(1));
83                        ret
84                    })
85                    .collect(),
86                // Even if column number is empty we can generate single row.
87                &RecordBatchOptions::new().with_row_count(Some(1)),
88            )?]
89        })
90    }
91
92    fn output_partitioning_helper(n_partitions: usize) -> Partitioning {
93        Partitioning::UnknownPartitioning(n_partitions)
94    }
95
96    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
97    fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties {
98        PlanProperties::new(
99            EquivalenceProperties::new(schema),
100            Self::output_partitioning_helper(n_partitions),
101            EmissionType::Incremental,
102            Boundedness::Bounded,
103        )
104        .with_scheduling_type(SchedulingType::Cooperative)
105    }
106}
107
108impl DisplayAs for PlaceholderRowExec {
109    fn fmt_as(
110        &self,
111        t: DisplayFormatType,
112        f: &mut std::fmt::Formatter,
113    ) -> std::fmt::Result {
114        match t {
115            DisplayFormatType::Default | DisplayFormatType::Verbose => {
116                write!(f, "PlaceholderRowExec")
117            }
118
119            DisplayFormatType::TreeRender => Ok(()),
120        }
121    }
122}
123
124impl ExecutionPlan for PlaceholderRowExec {
125    fn name(&self) -> &'static str {
126        "PlaceholderRowExec"
127    }
128
129    /// Return a reference to Any that can be used for downcasting
130    fn properties(&self) -> &Arc<PlanProperties> {
131        &self.cache
132    }
133
134    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
135        vec![]
136    }
137
138    fn with_new_children(
139        self: Arc<Self>,
140        _: Vec<Arc<dyn ExecutionPlan>>,
141    ) -> Result<Arc<dyn ExecutionPlan>> {
142        Ok(self)
143    }
144
145    fn execute(
146        &self,
147        partition: usize,
148        context: Arc<TaskContext>,
149    ) -> Result<SendableRecordBatchStream> {
150        trace!(
151            "Start PlaceholderRowExec::execute for partition {} of context session_id {} and task_id {:?}",
152            partition,
153            context.session_id(),
154            context.task_id()
155        );
156
157        assert_or_internal_err!(
158            partition < self.partitions,
159            "PlaceholderRowExec invalid partition {partition} (expected less than {})",
160            self.partitions
161        );
162
163        let ms = MemoryStream::try_new(self.data()?, Arc::clone(&self.schema), None)?;
164        Ok(Box::pin(cooperative(ms)))
165    }
166
167    fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
168        let batches = self
169            .data()
170            .expect("Create single row placeholder RecordBatch should not fail");
171
172        let batches = match partition {
173            Some(_) => vec![batches],
174            // entire plan
175            None => vec![batches; self.partitions],
176        };
177
178        Ok(Arc::new(common::compute_record_batch_statistics(
179            &batches,
180            &self.schema,
181            None,
182        )))
183    }
184}
185
186#[cfg(test)]
187mod tests {
188    use super::*;
189    use crate::test;
190    use crate::with_new_children_if_necessary;
191
192    #[test]
193    fn with_new_children() -> Result<()> {
194        let schema = test::aggr_test_schema();
195
196        let placeholder = Arc::new(PlaceholderRowExec::new(schema));
197
198        let placeholder_2 = with_new_children_if_necessary(
199            Arc::clone(&placeholder) as Arc<dyn ExecutionPlan>,
200            vec![],
201        )?;
202        assert_eq!(placeholder.schema(), placeholder_2.schema());
203
204        let too_many_kids = vec![placeholder_2];
205        assert!(
206            with_new_children_if_necessary(placeholder, too_many_kids).is_err(),
207            "expected error when providing list of kids"
208        );
209        Ok(())
210    }
211
212    #[tokio::test]
213    async fn invalid_execute() -> Result<()> {
214        let task_ctx = Arc::new(TaskContext::default());
215        let schema = test::aggr_test_schema();
216        let placeholder = PlaceholderRowExec::new(schema);
217
218        // Ask for the wrong partition
219        assert!(placeholder.execute(1, Arc::clone(&task_ctx)).is_err());
220        assert!(placeholder.execute(20, task_ctx).is_err());
221        Ok(())
222    }
223
224    #[tokio::test]
225    async fn produce_one_row() -> Result<()> {
226        let task_ctx = Arc::new(TaskContext::default());
227        let schema = test::aggr_test_schema();
228        let placeholder = PlaceholderRowExec::new(schema);
229
230        let iter = placeholder.execute(0, task_ctx)?;
231        let batches = common::collect(iter).await?;
232
233        // Should have one item
234        assert_eq!(batches.len(), 1);
235
236        Ok(())
237    }
238
239    #[tokio::test]
240    async fn produce_one_row_multiple_partition() -> Result<()> {
241        let task_ctx = Arc::new(TaskContext::default());
242        let schema = test::aggr_test_schema();
243        let partitions = 3;
244        let placeholder = PlaceholderRowExec::new(schema).with_partitions(partitions);
245
246        for n in 0..partitions {
247            let iter = placeholder.execute(n, Arc::clone(&task_ctx))?;
248            let batches = common::collect(iter).await?;
249
250            // Should have one item
251            assert_eq!(batches.len(), 1);
252        }
253
254        Ok(())
255    }
256}