datafusion_physical_plan/
placeholder_row.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! EmptyRelation produce_one_row=true execution plan
19
20use std::any::Any;
21use std::sync::Arc;
22
23use crate::coop::cooperative;
24use crate::execution_plan::{Boundedness, EmissionType, SchedulingType};
25use crate::memory::MemoryStream;
26use crate::{
27    common, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
28    SendableRecordBatchStream, Statistics,
29};
30
31use arrow::array::{ArrayRef, NullArray, RecordBatch, RecordBatchOptions};
32use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
33use datafusion_common::{internal_err, Result};
34use datafusion_execution::TaskContext;
35use datafusion_physical_expr::EquivalenceProperties;
36
37use log::trace;
38
39/// Execution plan for empty relation with produce_one_row=true
40#[derive(Debug, Clone)]
41pub struct PlaceholderRowExec {
42    /// The schema for the produced row
43    schema: SchemaRef,
44    /// Number of partitions
45    partitions: usize,
46    cache: PlanProperties,
47}
48
49impl PlaceholderRowExec {
50    /// Create a new PlaceholderRowExec
51    pub fn new(schema: SchemaRef) -> Self {
52        let partitions = 1;
53        let cache = Self::compute_properties(Arc::clone(&schema), partitions);
54        PlaceholderRowExec {
55            schema,
56            partitions,
57            cache,
58        }
59    }
60
61    /// Create a new PlaceholderRowExecPlaceholderRowExec with specified partition number
62    pub fn with_partitions(mut self, partitions: usize) -> Self {
63        self.partitions = partitions;
64        // Update output partitioning when updating partitions:
65        let output_partitioning = Self::output_partitioning_helper(self.partitions);
66        self.cache = self.cache.with_partitioning(output_partitioning);
67        self
68    }
69
70    fn data(&self) -> Result<Vec<RecordBatch>> {
71        Ok({
72            let n_field = self.schema.fields.len();
73            vec![RecordBatch::try_new_with_options(
74                Arc::new(Schema::new(
75                    (0..n_field)
76                        .map(|i| {
77                            Field::new(format!("placeholder_{i}"), DataType::Null, true)
78                        })
79                        .collect::<Fields>(),
80                )),
81                (0..n_field)
82                    .map(|_i| {
83                        let ret: ArrayRef = Arc::new(NullArray::new(1));
84                        ret
85                    })
86                    .collect(),
87                // Even if column number is empty we can generate single row.
88                &RecordBatchOptions::new().with_row_count(Some(1)),
89            )?]
90        })
91    }
92
93    fn output_partitioning_helper(n_partitions: usize) -> Partitioning {
94        Partitioning::UnknownPartitioning(n_partitions)
95    }
96
97    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
98    fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties {
99        PlanProperties::new(
100            EquivalenceProperties::new(schema),
101            Self::output_partitioning_helper(n_partitions),
102            EmissionType::Incremental,
103            Boundedness::Bounded,
104        )
105        .with_scheduling_type(SchedulingType::Cooperative)
106    }
107}
108
109impl DisplayAs for PlaceholderRowExec {
110    fn fmt_as(
111        &self,
112        t: DisplayFormatType,
113        f: &mut std::fmt::Formatter,
114    ) -> std::fmt::Result {
115        match t {
116            DisplayFormatType::Default | DisplayFormatType::Verbose => {
117                write!(f, "PlaceholderRowExec")
118            }
119
120            DisplayFormatType::TreeRender => Ok(()),
121        }
122    }
123}
124
125impl ExecutionPlan for PlaceholderRowExec {
126    fn name(&self) -> &'static str {
127        "PlaceholderRowExec"
128    }
129
130    /// Return a reference to Any that can be used for downcasting
131    fn as_any(&self) -> &dyn Any {
132        self
133    }
134
135    fn properties(&self) -> &PlanProperties {
136        &self.cache
137    }
138
139    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
140        vec![]
141    }
142
143    fn with_new_children(
144        self: Arc<Self>,
145        _: Vec<Arc<dyn ExecutionPlan>>,
146    ) -> Result<Arc<dyn ExecutionPlan>> {
147        Ok(self)
148    }
149
150    fn execute(
151        &self,
152        partition: usize,
153        context: Arc<TaskContext>,
154    ) -> Result<SendableRecordBatchStream> {
155        trace!("Start PlaceholderRowExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
156
157        if partition >= self.partitions {
158            return internal_err!(
159                "PlaceholderRowExec invalid partition {} (expected less than {})",
160                partition,
161                self.partitions
162            );
163        }
164
165        let ms = MemoryStream::try_new(self.data()?, Arc::clone(&self.schema), None)?;
166        Ok(Box::pin(cooperative(ms)))
167    }
168
169    fn statistics(&self) -> Result<Statistics> {
170        self.partition_statistics(None)
171    }
172
173    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
174        let batches = self
175            .data()
176            .expect("Create single row placeholder RecordBatch should not fail");
177
178        let batches = match partition {
179            Some(_) => vec![batches],
180            // entire plan
181            None => vec![batches; self.partitions],
182        };
183
184        Ok(common::compute_record_batch_statistics(
185            &batches,
186            &self.schema,
187            None,
188        ))
189    }
190}
191
192#[cfg(test)]
193mod tests {
194    use super::*;
195    use crate::test;
196    use crate::with_new_children_if_necessary;
197
198    #[test]
199    fn with_new_children() -> Result<()> {
200        let schema = test::aggr_test_schema();
201
202        let placeholder = Arc::new(PlaceholderRowExec::new(schema));
203
204        let placeholder_2 = with_new_children_if_necessary(
205            Arc::clone(&placeholder) as Arc<dyn ExecutionPlan>,
206            vec![],
207        )?;
208        assert_eq!(placeholder.schema(), placeholder_2.schema());
209
210        let too_many_kids = vec![placeholder_2];
211        assert!(
212            with_new_children_if_necessary(placeholder, too_many_kids).is_err(),
213            "expected error when providing list of kids"
214        );
215        Ok(())
216    }
217
218    #[tokio::test]
219    async fn invalid_execute() -> Result<()> {
220        let task_ctx = Arc::new(TaskContext::default());
221        let schema = test::aggr_test_schema();
222        let placeholder = PlaceholderRowExec::new(schema);
223
224        // Ask for the wrong partition
225        assert!(placeholder.execute(1, Arc::clone(&task_ctx)).is_err());
226        assert!(placeholder.execute(20, task_ctx).is_err());
227        Ok(())
228    }
229
230    #[tokio::test]
231    async fn produce_one_row() -> Result<()> {
232        let task_ctx = Arc::new(TaskContext::default());
233        let schema = test::aggr_test_schema();
234        let placeholder = PlaceholderRowExec::new(schema);
235
236        let iter = placeholder.execute(0, task_ctx)?;
237        let batches = common::collect(iter).await?;
238
239        // Should have one item
240        assert_eq!(batches.len(), 1);
241
242        Ok(())
243    }
244
245    #[tokio::test]
246    async fn produce_one_row_multiple_partition() -> Result<()> {
247        let task_ctx = Arc::new(TaskContext::default());
248        let schema = test::aggr_test_schema();
249        let partitions = 3;
250        let placeholder = PlaceholderRowExec::new(schema).with_partitions(partitions);
251
252        for n in 0..partitions {
253            let iter = placeholder.execute(n, Arc::clone(&task_ctx))?;
254            let batches = common::collect(iter).await?;
255
256            // Should have one item
257            assert_eq!(batches.len(), 1);
258        }
259
260        Ok(())
261    }
262}