datafusion_physical_plan/
test.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Utilities for testing datafusion-physical-plan
19
20use std::any::Any;
21use std::collections::HashMap;
22use std::fmt;
23use std::fmt::{Debug, Formatter};
24use std::pin::Pin;
25use std::sync::Arc;
26use std::task::Context;
27
28use crate::common;
29use crate::execution_plan::{Boundedness, EmissionType};
30use crate::memory::MemoryStream;
31use crate::metrics::MetricsSet;
32use crate::stream::RecordBatchStreamAdapter;
33use crate::streaming::PartitionStream;
34use crate::ExecutionPlan;
35use crate::{DisplayAs, DisplayFormatType, PlanProperties};
36
37use arrow::array::{Array, ArrayRef, Int32Array, RecordBatch};
38use arrow_schema::{DataType, Field, Schema, SchemaRef};
39use datafusion_common::{
40    config::ConfigOptions, internal_err, project_schema, Result, Statistics,
41};
42use datafusion_execution::{SendableRecordBatchStream, TaskContext};
43use datafusion_physical_expr::{
44    equivalence::ProjectionMapping, expressions::Column, utils::collect_columns,
45    EquivalenceProperties, LexOrdering, Partitioning,
46};
47
48use futures::{Future, FutureExt};
49
50pub mod exec;
51
52/// `TestMemoryExec` is a mock equivalent to [`MemorySourceConfig`] with [`ExecutionPlan`] implemented for testing.
53/// i.e. It has some but not all the functionality of [`MemorySourceConfig`].
54/// This implements an in-memory DataSource rather than explicitly implementing a trait.
55/// It is implemented in this manner to keep relevant unit tests in place
56/// while avoiding circular dependencies between `datafusion-physical-plan` and `datafusion-datasource`.
57///
58/// [`MemorySourceConfig`]: https://github.com/apache/datafusion/tree/main/datafusion/datasource/src/memory.rs
59#[derive(Clone, Debug)]
60pub struct TestMemoryExec {
61    /// The partitions to query
62    partitions: Vec<Vec<RecordBatch>>,
63    /// Schema representing the data before projection
64    schema: SchemaRef,
65    /// Schema representing the data after the optional projection is applied
66    projected_schema: SchemaRef,
67    /// Optional projection
68    projection: Option<Vec<usize>>,
69    /// Sort information: one or more equivalent orderings
70    sort_information: Vec<LexOrdering>,
71    /// if partition sizes should be displayed
72    show_sizes: bool,
73    /// The maximum number of records to read from this plan. If `None`,
74    /// all records after filtering are returned.
75    fetch: Option<usize>,
76    cache: PlanProperties,
77}
78
79impl DisplayAs for TestMemoryExec {
80    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
81        write!(f, "DataSourceExec: ")?;
82        match t {
83            DisplayFormatType::Default | DisplayFormatType::Verbose => {
84                let partition_sizes: Vec<_> =
85                    self.partitions.iter().map(|b| b.len()).collect();
86
87                let output_ordering = self
88                    .sort_information
89                    .first()
90                    .map(|output_ordering| format!(", output_ordering={output_ordering}"))
91                    .unwrap_or_default();
92
93                let eq_properties = self.eq_properties();
94                let constraints = eq_properties.constraints();
95                let constraints = if constraints.is_empty() {
96                    String::new()
97                } else {
98                    format!(", {constraints}")
99                };
100
101                let limit = self
102                    .fetch
103                    .map_or(String::new(), |limit| format!(", fetch={limit}"));
104                if self.show_sizes {
105                    write!(
106                                f,
107                                "partitions={}, partition_sizes={partition_sizes:?}{limit}{output_ordering}{constraints}",
108                                partition_sizes.len(),
109                            )
110                } else {
111                    write!(
112                        f,
113                        "partitions={}{limit}{output_ordering}{constraints}",
114                        partition_sizes.len(),
115                    )
116                }
117            }
118            DisplayFormatType::TreeRender => {
119                // TODO: collect info
120                write!(f, "")
121            }
122        }
123    }
124}
125
126impl ExecutionPlan for TestMemoryExec {
127    fn name(&self) -> &'static str {
128        "DataSourceExec"
129    }
130
131    fn as_any(&self) -> &dyn Any {
132        unimplemented!()
133    }
134
135    fn properties(&self) -> &PlanProperties {
136        &self.cache
137    }
138
139    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
140        Vec::new()
141    }
142
143    fn with_new_children(
144        self: Arc<Self>,
145        _: Vec<Arc<dyn ExecutionPlan>>,
146    ) -> Result<Arc<dyn ExecutionPlan>> {
147        unimplemented!()
148    }
149
150    fn repartitioned(
151        &self,
152        _target_partitions: usize,
153        _config: &ConfigOptions,
154    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
155        unimplemented!()
156    }
157
158    fn execute(
159        &self,
160        partition: usize,
161        context: Arc<TaskContext>,
162    ) -> Result<SendableRecordBatchStream> {
163        self.open(partition, context)
164    }
165
166    fn metrics(&self) -> Option<MetricsSet> {
167        unimplemented!()
168    }
169
170    fn statistics(&self) -> Result<Statistics> {
171        self.statistics_inner()
172    }
173
174    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
175        if partition.is_some() {
176            Ok(Statistics::new_unknown(&self.schema))
177        } else {
178            self.statistics_inner()
179        }
180    }
181
182    fn fetch(&self) -> Option<usize> {
183        self.fetch
184    }
185}
186
187impl TestMemoryExec {
188    fn open(
189        &self,
190        partition: usize,
191        _context: Arc<TaskContext>,
192    ) -> Result<SendableRecordBatchStream> {
193        Ok(Box::pin(
194            MemoryStream::try_new(
195                self.partitions[partition].clone(),
196                Arc::clone(&self.projected_schema),
197                self.projection.clone(),
198            )?
199            .with_fetch(self.fetch),
200        ))
201    }
202
203    fn compute_properties(&self) -> PlanProperties {
204        PlanProperties::new(
205            self.eq_properties(),
206            self.output_partitioning(),
207            EmissionType::Incremental,
208            Boundedness::Bounded,
209        )
210    }
211
212    fn output_partitioning(&self) -> Partitioning {
213        Partitioning::UnknownPartitioning(self.partitions.len())
214    }
215
216    fn eq_properties(&self) -> EquivalenceProperties {
217        EquivalenceProperties::new_with_orderings(
218            Arc::clone(&self.projected_schema),
219            self.sort_information.as_slice(),
220        )
221    }
222
223    fn statistics_inner(&self) -> Result<Statistics> {
224        Ok(common::compute_record_batch_statistics(
225            &self.partitions,
226            &self.schema,
227            self.projection.clone(),
228        ))
229    }
230
231    pub fn try_new(
232        partitions: &[Vec<RecordBatch>],
233        schema: SchemaRef,
234        projection: Option<Vec<usize>>,
235    ) -> Result<Self> {
236        let projected_schema = project_schema(&schema, projection.as_ref())?;
237        Ok(Self {
238            partitions: partitions.to_vec(),
239            schema,
240            cache: PlanProperties::new(
241                EquivalenceProperties::new_with_orderings(
242                    Arc::clone(&projected_schema),
243                    vec![].as_slice(),
244                ),
245                Partitioning::UnknownPartitioning(partitions.len()),
246                EmissionType::Incremental,
247                Boundedness::Bounded,
248            ),
249            projected_schema,
250            projection,
251            sort_information: vec![],
252            show_sizes: true,
253            fetch: None,
254        })
255    }
256
257    /// Create a new `DataSourceExec` Equivalent plan for reading in-memory record batches
258    /// The provided `schema` should not have the projection applied.
259    pub fn try_new_exec(
260        partitions: &[Vec<RecordBatch>],
261        schema: SchemaRef,
262        projection: Option<Vec<usize>>,
263    ) -> Result<Arc<TestMemoryExec>> {
264        let mut source = Self::try_new(partitions, schema, projection)?;
265        let cache = source.compute_properties();
266        source.cache = cache;
267        Ok(Arc::new(source))
268    }
269
270    // Equivalent of `DataSourceExec::new`
271    pub fn update_cache(source: Arc<TestMemoryExec>) -> TestMemoryExec {
272        let cache = source.compute_properties();
273        let source = &*source;
274        let mut source = source.clone();
275        source.cache = cache;
276        source
277    }
278
279    /// Set the limit of the files
280    pub fn with_limit(mut self, limit: Option<usize>) -> Self {
281        self.fetch = limit;
282        self
283    }
284
285    /// Ref to partitions
286    pub fn partitions(&self) -> &[Vec<RecordBatch>] {
287        &self.partitions
288    }
289
290    /// Ref to projection
291    pub fn projection(&self) -> &Option<Vec<usize>> {
292        &self.projection
293    }
294
295    /// Ref to sort information
296    pub fn sort_information(&self) -> &[LexOrdering] {
297        &self.sort_information
298    }
299
300    /// refer to `try_with_sort_information` at MemorySourceConfig for more information.
301    /// https://github.com/apache/datafusion/tree/main/datafusion/datasource/src/memory.rs
302    pub fn try_with_sort_information(
303        mut self,
304        mut sort_information: Vec<LexOrdering>,
305    ) -> Result<Self> {
306        // All sort expressions must refer to the original schema
307        let fields = self.schema.fields();
308        let ambiguous_column = sort_information
309            .iter()
310            .flat_map(|ordering| ordering.clone())
311            .flat_map(|expr| collect_columns(&expr.expr))
312            .find(|col| {
313                fields
314                    .get(col.index())
315                    .map(|field| field.name() != col.name())
316                    .unwrap_or(true)
317            });
318        if let Some(col) = ambiguous_column {
319            return internal_err!(
320                "Column {:?} is not found in the original schema of the TestMemoryExec",
321                col
322            );
323        }
324
325        // If there is a projection on the source, we also need to project orderings
326        if let Some(projection) = &self.projection {
327            let base_eqp = EquivalenceProperties::new_with_orderings(
328                self.original_schema(),
329                &sort_information,
330            );
331            let proj_exprs = projection
332                .iter()
333                .map(|idx| {
334                    let base_schema = self.original_schema();
335                    let name = base_schema.field(*idx).name();
336                    (Arc::new(Column::new(name, *idx)) as _, name.to_string())
337                })
338                .collect::<Vec<_>>();
339            let projection_mapping =
340                ProjectionMapping::try_new(&proj_exprs, &self.original_schema())?;
341            sort_information = base_eqp
342                .project(&projection_mapping, Arc::clone(&self.projected_schema))
343                .into_oeq_class()
344                .into_inner();
345        }
346
347        self.sort_information = sort_information;
348        Ok(self)
349    }
350
351    /// Arc clone of ref to original schema
352    pub fn original_schema(&self) -> SchemaRef {
353        Arc::clone(&self.schema)
354    }
355}
356
357/// Asserts that given future is pending.
358pub fn assert_is_pending<'a, T>(fut: &mut Pin<Box<dyn Future<Output = T> + Send + 'a>>) {
359    let waker = futures::task::noop_waker();
360    let mut cx = Context::from_waker(&waker);
361    let poll = fut.poll_unpin(&mut cx);
362
363    assert!(poll.is_pending());
364}
365
366/// Get the schema for the aggregate_test_* csv files
367pub fn aggr_test_schema() -> SchemaRef {
368    let mut f1 = Field::new("c1", DataType::Utf8, false);
369    f1.set_metadata(HashMap::from_iter(vec![("testing".into(), "test".into())]));
370    let schema = Schema::new(vec![
371        f1,
372        Field::new("c2", DataType::UInt32, false),
373        Field::new("c3", DataType::Int8, false),
374        Field::new("c4", DataType::Int16, false),
375        Field::new("c5", DataType::Int32, false),
376        Field::new("c6", DataType::Int64, false),
377        Field::new("c7", DataType::UInt8, false),
378        Field::new("c8", DataType::UInt16, false),
379        Field::new("c9", DataType::UInt32, false),
380        Field::new("c10", DataType::UInt64, false),
381        Field::new("c11", DataType::Float32, false),
382        Field::new("c12", DataType::Float64, false),
383        Field::new("c13", DataType::Utf8, false),
384    ]);
385
386    Arc::new(schema)
387}
388
389/// Returns record batch with 3 columns of i32 in memory
390pub fn build_table_i32(
391    a: (&str, &Vec<i32>),
392    b: (&str, &Vec<i32>),
393    c: (&str, &Vec<i32>),
394) -> RecordBatch {
395    let schema = Schema::new(vec![
396        Field::new(a.0, DataType::Int32, false),
397        Field::new(b.0, DataType::Int32, false),
398        Field::new(c.0, DataType::Int32, false),
399    ]);
400
401    RecordBatch::try_new(
402        Arc::new(schema),
403        vec![
404            Arc::new(Int32Array::from(a.1.clone())),
405            Arc::new(Int32Array::from(b.1.clone())),
406            Arc::new(Int32Array::from(c.1.clone())),
407        ],
408    )
409    .unwrap()
410}
411
412/// Returns record batch with 2 columns of i32 in memory
413pub fn build_table_i32_two_cols(
414    a: (&str, &Vec<i32>),
415    b: (&str, &Vec<i32>),
416) -> RecordBatch {
417    let schema = Schema::new(vec![
418        Field::new(a.0, DataType::Int32, false),
419        Field::new(b.0, DataType::Int32, false),
420    ]);
421
422    RecordBatch::try_new(
423        Arc::new(schema),
424        vec![
425            Arc::new(Int32Array::from(a.1.clone())),
426            Arc::new(Int32Array::from(b.1.clone())),
427        ],
428    )
429    .unwrap()
430}
431
432/// Returns memory table scan wrapped around record batch with 3 columns of i32
433pub fn build_table_scan_i32(
434    a: (&str, &Vec<i32>),
435    b: (&str, &Vec<i32>),
436    c: (&str, &Vec<i32>),
437) -> Arc<dyn ExecutionPlan> {
438    let batch = build_table_i32(a, b, c);
439    let schema = batch.schema();
440    TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap()
441}
442
443/// Return a RecordBatch with a single Int32 array with values (0..sz) in a field named "i"
444pub fn make_partition(sz: i32) -> RecordBatch {
445    let seq_start = 0;
446    let seq_end = sz;
447    let values = (seq_start..seq_end).collect::<Vec<_>>();
448    let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
449    let arr = Arc::new(Int32Array::from(values));
450    let arr = arr as ArrayRef;
451
452    RecordBatch::try_new(schema, vec![arr]).unwrap()
453}
454
455pub fn make_partition_utf8(sz: i32) -> RecordBatch {
456    let seq_start = 0;
457    let seq_end = sz;
458    let values = (seq_start..seq_end)
459        .map(|i| format!("test_long_string_that_is_roughly_42_bytes_{i}"))
460        .collect::<Vec<_>>();
461    let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Utf8, true)]));
462    let mut string_array = arrow::array::StringArray::from(values);
463    string_array.shrink_to_fit();
464    let arr = Arc::new(string_array);
465    let arr = arr as ArrayRef;
466
467    RecordBatch::try_new(schema, vec![arr]).unwrap()
468}
469
470/// Returns a `DataSourceExec` that scans `partitions` of 100 batches each
471pub fn scan_partitioned(partitions: usize) -> Arc<dyn ExecutionPlan> {
472    Arc::new(mem_exec(partitions))
473}
474
475pub fn scan_partitioned_utf8(partitions: usize) -> Arc<dyn ExecutionPlan> {
476    Arc::new(mem_exec_utf8(partitions))
477}
478
479/// Returns a `DataSourceExec` that scans `partitions` of 100 batches each
480pub fn mem_exec(partitions: usize) -> TestMemoryExec {
481    let data: Vec<Vec<_>> = (0..partitions).map(|_| vec![make_partition(100)]).collect();
482
483    let schema = data[0][0].schema();
484    let projection = None;
485
486    TestMemoryExec::try_new(&data, schema, projection).unwrap()
487}
488
489pub fn mem_exec_utf8(partitions: usize) -> TestMemoryExec {
490    let data: Vec<Vec<_>> = (0..partitions)
491        .map(|_| vec![make_partition_utf8(100)])
492        .collect();
493
494    let schema = data[0][0].schema();
495    let projection = None;
496
497    TestMemoryExec::try_new(&data, schema, projection).unwrap()
498}
499
500// Construct a stream partition for test purposes
501#[derive(Debug)]
502pub struct TestPartitionStream {
503    pub schema: SchemaRef,
504    pub batches: Vec<RecordBatch>,
505}
506
507impl TestPartitionStream {
508    /// Create a new stream partition with the provided batches
509    pub fn new_with_batches(batches: Vec<RecordBatch>) -> Self {
510        let schema = batches[0].schema();
511        Self { schema, batches }
512    }
513}
514impl PartitionStream for TestPartitionStream {
515    fn schema(&self) -> &SchemaRef {
516        &self.schema
517    }
518    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
519        let stream = futures::stream::iter(self.batches.clone().into_iter().map(Ok));
520        Box::pin(RecordBatchStreamAdapter::new(
521            Arc::clone(&self.schema),
522            stream,
523        ))
524    }
525}