datafusion_physical_plan/
values.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Values execution plan
19
20use std::any::Any;
21use std::sync::Arc;
22
23use crate::execution_plan::{Boundedness, EmissionType};
24use crate::memory::MemoryStream;
25use crate::{common, DisplayAs, PlanProperties, SendableRecordBatchStream, Statistics};
26use crate::{
27    ColumnarValue, DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr,
28};
29use arrow::datatypes::{Schema, SchemaRef};
30use arrow::record_batch::{RecordBatch, RecordBatchOptions};
31use datafusion_common::{internal_err, plan_err, Result, ScalarValue};
32use datafusion_execution::TaskContext;
33use datafusion_physical_expr::EquivalenceProperties;
34
35/// Execution plan for values list based relation (produces constant rows)
36#[deprecated(
37    since = "45.0.0",
38    note = "Use `MemorySourceConfig::try_new_as_values` instead"
39)]
40#[derive(Debug, Clone)]
41pub struct ValuesExec {
42    /// The schema
43    schema: SchemaRef,
44    /// The data
45    data: Vec<RecordBatch>,
46    /// Cache holding plan properties like equivalences, output partitioning etc.
47    cache: PlanProperties,
48}
49
50#[allow(deprecated)]
51impl ValuesExec {
52    /// Create a new values exec from data as expr
53    #[deprecated(since = "45.0.0", note = "Use `MemoryExec::try_new` instead")]
54    pub fn try_new(
55        schema: SchemaRef,
56        data: Vec<Vec<Arc<dyn PhysicalExpr>>>,
57    ) -> Result<Self> {
58        if data.is_empty() {
59            return plan_err!("Values list cannot be empty");
60        }
61        let n_row = data.len();
62        let n_col = schema.fields().len();
63        // We have this single row batch as a placeholder to satisfy evaluation argument
64        // and generate a single output row
65        let batch = RecordBatch::try_new_with_options(
66            Arc::new(Schema::empty()),
67            vec![],
68            &RecordBatchOptions::new().with_row_count(Some(1)),
69        )?;
70
71        let arr = (0..n_col)
72            .map(|j| {
73                (0..n_row)
74                    .map(|i| {
75                        let r = data[i][j].evaluate(&batch);
76
77                        match r {
78                            Ok(ColumnarValue::Scalar(scalar)) => Ok(scalar),
79                            Ok(ColumnarValue::Array(a)) if a.len() == 1 => {
80                                ScalarValue::try_from_array(&a, 0)
81                            }
82                            Ok(ColumnarValue::Array(a)) => {
83                                plan_err!(
84                                    "Cannot have array values {a:?} in a values list"
85                                )
86                            }
87                            Err(err) => Err(err),
88                        }
89                    })
90                    .collect::<Result<Vec<_>>>()
91                    .and_then(ScalarValue::iter_to_array)
92            })
93            .collect::<Result<Vec<_>>>()?;
94        let batch = RecordBatch::try_new_with_options(
95            Arc::clone(&schema),
96            arr,
97            &RecordBatchOptions::new().with_row_count(Some(n_row)),
98        )?;
99        let data: Vec<RecordBatch> = vec![batch];
100        Self::try_new_from_batches(schema, data)
101    }
102
103    /// Create a new plan using the provided schema and batches.
104    ///
105    /// Errors if any of the batches don't match the provided schema, or if no
106    /// batches are provided.
107    #[deprecated(
108        since = "45.0.0",
109        note = "Use `MemoryExec::try_new_from_batches` instead"
110    )]
111    pub fn try_new_from_batches(
112        schema: SchemaRef,
113        batches: Vec<RecordBatch>,
114    ) -> Result<Self> {
115        if batches.is_empty() {
116            return plan_err!("Values list cannot be empty");
117        }
118
119        for batch in &batches {
120            let batch_schema = batch.schema();
121            if batch_schema != schema {
122                return plan_err!(
123                    "Batch has invalid schema. Expected: {schema}, got: {batch_schema}"
124                );
125            }
126        }
127
128        let cache = Self::compute_properties(Arc::clone(&schema));
129        #[allow(deprecated)]
130        Ok(ValuesExec {
131            schema,
132            data: batches,
133            cache,
134        })
135    }
136
137    /// Provides the data
138    pub fn data(&self) -> Vec<RecordBatch> {
139        #[allow(deprecated)]
140        self.data.clone()
141    }
142
143    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
144    fn compute_properties(schema: SchemaRef) -> PlanProperties {
145        PlanProperties::new(
146            EquivalenceProperties::new(schema),
147            Partitioning::UnknownPartitioning(1),
148            EmissionType::Incremental,
149            Boundedness::Bounded,
150        )
151    }
152}
153
154#[allow(deprecated)]
155impl DisplayAs for ValuesExec {
156    fn fmt_as(
157        &self,
158        t: DisplayFormatType,
159        f: &mut std::fmt::Formatter,
160    ) -> std::fmt::Result {
161        match t {
162            DisplayFormatType::Default | DisplayFormatType::Verbose => {
163                write!(f, "ValuesExec")
164            }
165            DisplayFormatType::TreeRender => {
166                // TODO: collect info
167                write!(f, "")
168            }
169        }
170    }
171}
172
173#[allow(deprecated)]
174impl ExecutionPlan for ValuesExec {
175    fn name(&self) -> &'static str {
176        "ValuesExec"
177    }
178
179    /// Return a reference to Any that can be used for downcasting
180    fn as_any(&self) -> &dyn Any {
181        self
182    }
183
184    fn properties(&self) -> &PlanProperties {
185        #[allow(deprecated)]
186        &self.cache
187    }
188
189    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
190        vec![]
191    }
192
193    fn with_new_children(
194        self: Arc<Self>,
195        _: Vec<Arc<dyn ExecutionPlan>>,
196    ) -> Result<Arc<dyn ExecutionPlan>> {
197        #[allow(deprecated)]
198        ValuesExec::try_new_from_batches(Arc::clone(&self.schema), self.data.clone())
199            .map(|e| Arc::new(e) as _)
200    }
201
202    fn execute(
203        &self,
204        partition: usize,
205        _context: Arc<TaskContext>,
206    ) -> Result<SendableRecordBatchStream> {
207        // ValuesExec has a single output partition
208        if 0 != partition {
209            return internal_err!(
210                "ValuesExec invalid partition {partition} (expected 0)"
211            );
212        }
213
214        Ok(Box::pin(MemoryStream::try_new(
215            self.data(),
216            #[allow(deprecated)]
217            Arc::clone(&self.schema),
218            None,
219        )?))
220    }
221
222    fn statistics(&self) -> Result<Statistics> {
223        let batch = self.data();
224        Ok(common::compute_record_batch_statistics(
225            &[batch],
226            #[allow(deprecated)]
227            &self.schema,
228            None,
229        ))
230    }
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236    use crate::expressions::lit;
237    use crate::test::{self, make_partition};
238
239    use arrow::datatypes::{DataType, Field};
240    use datafusion_common::stats::{ColumnStatistics, Precision};
241
242    #[tokio::test]
243    async fn values_empty_case() -> Result<()> {
244        let schema = test::aggr_test_schema();
245        #[allow(deprecated)]
246        let empty = ValuesExec::try_new(schema, vec![]);
247        assert!(empty.is_err());
248        Ok(())
249    }
250
251    #[test]
252    fn new_exec_with_batches() {
253        let batch = make_partition(7);
254        let schema = batch.schema();
255        let batches = vec![batch.clone(), batch];
256        #[allow(deprecated)]
257        let _exec = ValuesExec::try_new_from_batches(schema, batches).unwrap();
258    }
259
260    #[test]
261    fn new_exec_with_batches_empty() {
262        let batch = make_partition(7);
263        let schema = batch.schema();
264        #[allow(deprecated)]
265        let _ = ValuesExec::try_new_from_batches(schema, Vec::new()).unwrap_err();
266    }
267
268    #[test]
269    fn new_exec_with_batches_invalid_schema() {
270        let batch = make_partition(7);
271        let batches = vec![batch.clone(), batch];
272
273        let invalid_schema = Arc::new(Schema::new(vec![
274            Field::new("col0", DataType::UInt32, false),
275            Field::new("col1", DataType::Utf8, false),
276        ]));
277        #[allow(deprecated)]
278        let _ = ValuesExec::try_new_from_batches(invalid_schema, batches).unwrap_err();
279    }
280
281    // Test issue: https://github.com/apache/datafusion/issues/8763
282    #[test]
283    fn new_exec_with_non_nullable_schema() {
284        let schema = Arc::new(Schema::new(vec![Field::new(
285            "col0",
286            DataType::UInt32,
287            false,
288        )]));
289        #[allow(deprecated)]
290        let _ = ValuesExec::try_new(Arc::clone(&schema), vec![vec![lit(1u32)]]).unwrap();
291        // Test that a null value is rejected
292        #[allow(deprecated)]
293        let _ = ValuesExec::try_new(schema, vec![vec![lit(ScalarValue::UInt32(None))]])
294            .unwrap_err();
295    }
296
297    #[test]
298    fn values_stats_with_nulls_only() -> Result<()> {
299        let data = vec![
300            vec![lit(ScalarValue::Null)],
301            vec![lit(ScalarValue::Null)],
302            vec![lit(ScalarValue::Null)],
303        ];
304        let rows = data.len();
305        #[allow(deprecated)]
306        let values = ValuesExec::try_new(
307            Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)])),
308            data,
309        )?;
310
311        #[allow(deprecated)]
312        let stats = values.statistics()?;
313        assert_eq!(
314            stats,
315            Statistics {
316                num_rows: Precision::Exact(rows),
317                total_byte_size: Precision::Exact(8), // not important
318                column_statistics: vec![ColumnStatistics {
319                    null_count: Precision::Exact(rows), // there are only nulls
320                    distinct_count: Precision::Absent,
321                    max_value: Precision::Absent,
322                    min_value: Precision::Absent,
323                    sum_value: Precision::Absent,
324                },],
325            }
326        );
327
328        Ok(())
329    }
330}