datafusion_physical_plan/
values.rs1use std::any::Any;
21use std::sync::Arc;
22
23use crate::execution_plan::{Boundedness, EmissionType};
24use crate::memory::MemoryStream;
25use crate::{common, DisplayAs, PlanProperties, SendableRecordBatchStream, Statistics};
26use crate::{
27 ColumnarValue, DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr,
28};
29use arrow::datatypes::{Schema, SchemaRef};
30use arrow::record_batch::{RecordBatch, RecordBatchOptions};
31use datafusion_common::{internal_err, plan_err, Result, ScalarValue};
32use datafusion_execution::TaskContext;
33use datafusion_physical_expr::EquivalenceProperties;
34
35#[deprecated(
37 since = "45.0.0",
38 note = "Use `MemorySourceConfig::try_new_as_values` instead"
39)]
40#[derive(Debug, Clone)]
41pub struct ValuesExec {
42 schema: SchemaRef,
44 data: Vec<RecordBatch>,
46 cache: PlanProperties,
48}
49
50#[allow(deprecated)]
51impl ValuesExec {
52 #[deprecated(since = "45.0.0", note = "Use `MemoryExec::try_new` instead")]
54 pub fn try_new(
55 schema: SchemaRef,
56 data: Vec<Vec<Arc<dyn PhysicalExpr>>>,
57 ) -> Result<Self> {
58 if data.is_empty() {
59 return plan_err!("Values list cannot be empty");
60 }
61 let n_row = data.len();
62 let n_col = schema.fields().len();
63 let batch = RecordBatch::try_new_with_options(
66 Arc::new(Schema::empty()),
67 vec![],
68 &RecordBatchOptions::new().with_row_count(Some(1)),
69 )?;
70
71 let arr = (0..n_col)
72 .map(|j| {
73 (0..n_row)
74 .map(|i| {
75 let r = data[i][j].evaluate(&batch);
76
77 match r {
78 Ok(ColumnarValue::Scalar(scalar)) => Ok(scalar),
79 Ok(ColumnarValue::Array(a)) if a.len() == 1 => {
80 ScalarValue::try_from_array(&a, 0)
81 }
82 Ok(ColumnarValue::Array(a)) => {
83 plan_err!(
84 "Cannot have array values {a:?} in a values list"
85 )
86 }
87 Err(err) => Err(err),
88 }
89 })
90 .collect::<Result<Vec<_>>>()
91 .and_then(ScalarValue::iter_to_array)
92 })
93 .collect::<Result<Vec<_>>>()?;
94 let batch = RecordBatch::try_new_with_options(
95 Arc::clone(&schema),
96 arr,
97 &RecordBatchOptions::new().with_row_count(Some(n_row)),
98 )?;
99 let data: Vec<RecordBatch> = vec![batch];
100 Self::try_new_from_batches(schema, data)
101 }
102
103 #[deprecated(
108 since = "45.0.0",
109 note = "Use `MemoryExec::try_new_from_batches` instead"
110 )]
111 pub fn try_new_from_batches(
112 schema: SchemaRef,
113 batches: Vec<RecordBatch>,
114 ) -> Result<Self> {
115 if batches.is_empty() {
116 return plan_err!("Values list cannot be empty");
117 }
118
119 for batch in &batches {
120 let batch_schema = batch.schema();
121 if batch_schema != schema {
122 return plan_err!(
123 "Batch has invalid schema. Expected: {schema}, got: {batch_schema}"
124 );
125 }
126 }
127
128 let cache = Self::compute_properties(Arc::clone(&schema));
129 #[allow(deprecated)]
130 Ok(ValuesExec {
131 schema,
132 data: batches,
133 cache,
134 })
135 }
136
137 pub fn data(&self) -> Vec<RecordBatch> {
139 #[allow(deprecated)]
140 self.data.clone()
141 }
142
143 fn compute_properties(schema: SchemaRef) -> PlanProperties {
145 PlanProperties::new(
146 EquivalenceProperties::new(schema),
147 Partitioning::UnknownPartitioning(1),
148 EmissionType::Incremental,
149 Boundedness::Bounded,
150 )
151 }
152}
153
154#[allow(deprecated)]
155impl DisplayAs for ValuesExec {
156 fn fmt_as(
157 &self,
158 t: DisplayFormatType,
159 f: &mut std::fmt::Formatter,
160 ) -> std::fmt::Result {
161 match t {
162 DisplayFormatType::Default | DisplayFormatType::Verbose => {
163 write!(f, "ValuesExec")
164 }
165 DisplayFormatType::TreeRender => {
166 write!(f, "")
168 }
169 }
170 }
171}
172
173#[allow(deprecated)]
174impl ExecutionPlan for ValuesExec {
175 fn name(&self) -> &'static str {
176 "ValuesExec"
177 }
178
179 fn as_any(&self) -> &dyn Any {
181 self
182 }
183
184 fn properties(&self) -> &PlanProperties {
185 #[allow(deprecated)]
186 &self.cache
187 }
188
189 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
190 vec![]
191 }
192
193 fn with_new_children(
194 self: Arc<Self>,
195 _: Vec<Arc<dyn ExecutionPlan>>,
196 ) -> Result<Arc<dyn ExecutionPlan>> {
197 #[allow(deprecated)]
198 ValuesExec::try_new_from_batches(Arc::clone(&self.schema), self.data.clone())
199 .map(|e| Arc::new(e) as _)
200 }
201
202 fn execute(
203 &self,
204 partition: usize,
205 _context: Arc<TaskContext>,
206 ) -> Result<SendableRecordBatchStream> {
207 if 0 != partition {
209 return internal_err!(
210 "ValuesExec invalid partition {partition} (expected 0)"
211 );
212 }
213
214 Ok(Box::pin(MemoryStream::try_new(
215 self.data(),
216 #[allow(deprecated)]
217 Arc::clone(&self.schema),
218 None,
219 )?))
220 }
221
222 fn statistics(&self) -> Result<Statistics> {
223 let batch = self.data();
224 Ok(common::compute_record_batch_statistics(
225 &[batch],
226 #[allow(deprecated)]
227 &self.schema,
228 None,
229 ))
230 }
231}
232
233#[cfg(test)]
234mod tests {
235 use super::*;
236 use crate::expressions::lit;
237 use crate::test::{self, make_partition};
238
239 use arrow::datatypes::{DataType, Field};
240 use datafusion_common::stats::{ColumnStatistics, Precision};
241
242 #[tokio::test]
243 async fn values_empty_case() -> Result<()> {
244 let schema = test::aggr_test_schema();
245 #[allow(deprecated)]
246 let empty = ValuesExec::try_new(schema, vec![]);
247 assert!(empty.is_err());
248 Ok(())
249 }
250
251 #[test]
252 fn new_exec_with_batches() {
253 let batch = make_partition(7);
254 let schema = batch.schema();
255 let batches = vec![batch.clone(), batch];
256 #[allow(deprecated)]
257 let _exec = ValuesExec::try_new_from_batches(schema, batches).unwrap();
258 }
259
260 #[test]
261 fn new_exec_with_batches_empty() {
262 let batch = make_partition(7);
263 let schema = batch.schema();
264 #[allow(deprecated)]
265 let _ = ValuesExec::try_new_from_batches(schema, Vec::new()).unwrap_err();
266 }
267
268 #[test]
269 fn new_exec_with_batches_invalid_schema() {
270 let batch = make_partition(7);
271 let batches = vec![batch.clone(), batch];
272
273 let invalid_schema = Arc::new(Schema::new(vec![
274 Field::new("col0", DataType::UInt32, false),
275 Field::new("col1", DataType::Utf8, false),
276 ]));
277 #[allow(deprecated)]
278 let _ = ValuesExec::try_new_from_batches(invalid_schema, batches).unwrap_err();
279 }
280
281 #[test]
283 fn new_exec_with_non_nullable_schema() {
284 let schema = Arc::new(Schema::new(vec![Field::new(
285 "col0",
286 DataType::UInt32,
287 false,
288 )]));
289 #[allow(deprecated)]
290 let _ = ValuesExec::try_new(Arc::clone(&schema), vec![vec![lit(1u32)]]).unwrap();
291 #[allow(deprecated)]
293 let _ = ValuesExec::try_new(schema, vec![vec![lit(ScalarValue::UInt32(None))]])
294 .unwrap_err();
295 }
296
297 #[test]
298 fn values_stats_with_nulls_only() -> Result<()> {
299 let data = vec![
300 vec![lit(ScalarValue::Null)],
301 vec![lit(ScalarValue::Null)],
302 vec![lit(ScalarValue::Null)],
303 ];
304 let rows = data.len();
305 #[allow(deprecated)]
306 let values = ValuesExec::try_new(
307 Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)])),
308 data,
309 )?;
310
311 #[allow(deprecated)]
312 let stats = values.statistics()?;
313 assert_eq!(
314 stats,
315 Statistics {
316 num_rows: Precision::Exact(rows),
317 total_byte_size: Precision::Exact(8), column_statistics: vec![ColumnStatistics {
319 null_count: Precision::Exact(rows), distinct_count: Precision::Absent,
321 max_value: Precision::Absent,
322 min_value: Precision::Absent,
323 sum_value: Precision::Absent,
324 },],
325 }
326 );
327
328 Ok(())
329 }
330}