Skip to main content

datafusion_physical_plan/
test.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Utilities for testing datafusion-physical-plan
19
20use std::collections::HashMap;
21use std::fmt;
22use std::fmt::{Debug, Formatter};
23use std::pin::Pin;
24use std::sync::Arc;
25use std::task::Context;
26
27use crate::ExecutionPlan;
28use crate::common;
29use crate::execution_plan::{Boundedness, EmissionType};
30use crate::memory::MemoryStream;
31use crate::metrics::MetricsSet;
32use crate::stream::RecordBatchStreamAdapter;
33use crate::streaming::PartitionStream;
34use crate::{DisplayAs, DisplayFormatType, PlanProperties};
35
36use arrow::array::{Array, ArrayRef, Int32Array, RecordBatch};
37use arrow_schema::{DataType, Field, Schema, SchemaRef};
38use datafusion_common::{
39    Result, Statistics, assert_or_internal_err, config::ConfigOptions, project_schema,
40};
41use datafusion_execution::{SendableRecordBatchStream, TaskContext};
42use datafusion_physical_expr::equivalence::{
43    OrderingEquivalenceClass, ProjectionMapping,
44};
45use datafusion_physical_expr::expressions::Column;
46use datafusion_physical_expr::utils::collect_columns;
47use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, Partitioning};
48
49use futures::{Future, FutureExt};
50
51pub mod exec;
52
53/// `TestMemoryExec` is a mock equivalent to [`MemorySourceConfig`] with [`ExecutionPlan`] implemented for testing.
54/// i.e. It has some but not all the functionality of [`MemorySourceConfig`].
55/// This implements an in-memory DataSource rather than explicitly implementing a trait.
56/// It is implemented in this manner to keep relevant unit tests in place
57/// while avoiding circular dependencies between `datafusion-physical-plan` and `datafusion-datasource`.
58///
59/// [`MemorySourceConfig`]: https://github.com/apache/datafusion/tree/main/datafusion/datasource/src/memory.rs
60#[derive(Clone, Debug)]
61pub struct TestMemoryExec {
62    /// The partitions to query
63    partitions: Vec<Vec<RecordBatch>>,
64    /// Schema representing the data before projection
65    schema: SchemaRef,
66    /// Schema representing the data after the optional projection is applied
67    projected_schema: SchemaRef,
68    /// Optional projection
69    projection: Option<Vec<usize>>,
70    /// Sort information: one or more equivalent orderings
71    sort_information: Vec<LexOrdering>,
72    /// if partition sizes should be displayed
73    show_sizes: bool,
74    /// The maximum number of records to read from this plan. If `None`,
75    /// all records after filtering are returned.
76    fetch: Option<usize>,
77    cache: Arc<PlanProperties>,
78}
79
80impl DisplayAs for TestMemoryExec {
81    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
82        write!(f, "DataSourceExec: ")?;
83        match t {
84            DisplayFormatType::Default | DisplayFormatType::Verbose => {
85                let partition_sizes: Vec<_> =
86                    self.partitions.iter().map(|b| b.len()).collect();
87
88                let output_ordering = self
89                    .sort_information
90                    .first()
91                    .map(|output_ordering| format!(", output_ordering={output_ordering}"))
92                    .unwrap_or_default();
93
94                let eq_properties = self.eq_properties();
95                let constraints = eq_properties.constraints();
96                let constraints = if constraints.is_empty() {
97                    String::new()
98                } else {
99                    format!(", {constraints}")
100                };
101
102                let limit = self
103                    .fetch
104                    .map_or(String::new(), |limit| format!(", fetch={limit}"));
105                if self.show_sizes {
106                    write!(
107                        f,
108                        "partitions={}, partition_sizes={partition_sizes:?}{limit}{output_ordering}{constraints}",
109                        partition_sizes.len(),
110                    )
111                } else {
112                    write!(
113                        f,
114                        "partitions={}{limit}{output_ordering}{constraints}",
115                        partition_sizes.len(),
116                    )
117                }
118            }
119            DisplayFormatType::TreeRender => {
120                // TODO: collect info
121                write!(f, "")
122            }
123        }
124    }
125}
126
127impl ExecutionPlan for TestMemoryExec {
128    fn name(&self) -> &'static str {
129        "DataSourceExec"
130    }
131
132    fn properties(&self) -> &Arc<PlanProperties> {
133        &self.cache
134    }
135
136    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
137        Vec::new()
138    }
139
140    fn with_new_children(
141        self: Arc<Self>,
142        _: Vec<Arc<dyn ExecutionPlan>>,
143    ) -> Result<Arc<dyn ExecutionPlan>> {
144        Ok(self)
145    }
146
147    fn repartitioned(
148        &self,
149        _target_partitions: usize,
150        _config: &ConfigOptions,
151    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
152        unimplemented!()
153    }
154
155    fn execute(
156        &self,
157        partition: usize,
158        context: Arc<TaskContext>,
159    ) -> Result<SendableRecordBatchStream> {
160        self.open(partition, context)
161    }
162
163    fn metrics(&self) -> Option<MetricsSet> {
164        unimplemented!()
165    }
166
167    fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
168        if partition.is_some() {
169            Ok(Arc::new(Statistics::new_unknown(&self.schema)))
170        } else {
171            Ok(Arc::new(self.statistics_inner()?))
172        }
173    }
174
175    fn fetch(&self) -> Option<usize> {
176        self.fetch
177    }
178}
179
180impl TestMemoryExec {
181    fn open(
182        &self,
183        partition: usize,
184        _context: Arc<TaskContext>,
185    ) -> Result<SendableRecordBatchStream> {
186        Ok(Box::pin(
187            MemoryStream::try_new(
188                self.partitions[partition].clone(),
189                Arc::clone(&self.projected_schema),
190                self.projection.clone(),
191            )?
192            .with_fetch(self.fetch),
193        ))
194    }
195
196    fn compute_properties(&self) -> PlanProperties {
197        PlanProperties::new(
198            self.eq_properties(),
199            self.output_partitioning(),
200            EmissionType::Incremental,
201            Boundedness::Bounded,
202        )
203    }
204
205    fn output_partitioning(&self) -> Partitioning {
206        Partitioning::UnknownPartitioning(self.partitions.len())
207    }
208
209    fn eq_properties(&self) -> EquivalenceProperties {
210        EquivalenceProperties::new_with_orderings(
211            Arc::clone(&self.projected_schema),
212            self.sort_information.clone(),
213        )
214    }
215
216    fn statistics_inner(&self) -> Result<Statistics> {
217        Ok(common::compute_record_batch_statistics(
218            &self.partitions,
219            &self.schema,
220            self.projection.clone(),
221        ))
222    }
223
224    pub fn try_new(
225        partitions: &[Vec<RecordBatch>],
226        schema: SchemaRef,
227        projection: Option<Vec<usize>>,
228    ) -> Result<Self> {
229        let projected_schema = project_schema(&schema, projection.as_ref())?;
230        Ok(Self {
231            partitions: partitions.to_vec(),
232            schema,
233            cache: Arc::new(PlanProperties::new(
234                EquivalenceProperties::new_with_orderings(
235                    Arc::clone(&projected_schema),
236                    Vec::<LexOrdering>::new(),
237                ),
238                Partitioning::UnknownPartitioning(partitions.len()),
239                EmissionType::Incremental,
240                Boundedness::Bounded,
241            )),
242            projected_schema,
243            projection,
244            sort_information: vec![],
245            show_sizes: true,
246            fetch: None,
247        })
248    }
249
250    /// Create a new `DataSourceExec` Equivalent plan for reading in-memory record batches
251    /// The provided `schema` should not have the projection applied.
252    pub fn try_new_exec(
253        partitions: &[Vec<RecordBatch>],
254        schema: SchemaRef,
255        projection: Option<Vec<usize>>,
256    ) -> Result<Arc<TestMemoryExec>> {
257        let mut source = Self::try_new(partitions, schema, projection)?;
258        let cache = source.compute_properties();
259        source.cache = Arc::new(cache);
260        Ok(Arc::new(source))
261    }
262
263    // Equivalent of `DataSourceExec::new`
264    pub fn update_cache(source: &Arc<TestMemoryExec>) -> TestMemoryExec {
265        let cache = source.compute_properties();
266        let mut source = (**source).clone();
267        source.cache = Arc::new(cache);
268        source
269    }
270
271    /// Set the limit of the files
272    pub fn with_limit(mut self, limit: Option<usize>) -> Self {
273        self.fetch = limit;
274        self
275    }
276
277    /// Ref to partitions
278    pub fn partitions(&self) -> &[Vec<RecordBatch>] {
279        &self.partitions
280    }
281
282    /// Ref to projection
283    pub fn projection(&self) -> &Option<Vec<usize>> {
284        &self.projection
285    }
286
287    /// Ref to sort information
288    pub fn sort_information(&self) -> &[LexOrdering] {
289        &self.sort_information
290    }
291
292    /// refer to `try_with_sort_information` at MemorySourceConfig for more information.
293    /// <https://github.com/apache/datafusion/tree/main/datafusion/datasource/src/memory.rs>
294    pub fn try_with_sort_information(
295        mut self,
296        mut sort_information: Vec<LexOrdering>,
297    ) -> Result<Self> {
298        // All sort expressions must refer to the original schema
299        let fields = self.schema.fields();
300        let ambiguous_column = sort_information
301            .iter()
302            .flat_map(|ordering| ordering.clone())
303            .flat_map(|expr| collect_columns(&expr.expr))
304            .find(|col| {
305                fields
306                    .get(col.index())
307                    .map(|field| field.name() != col.name())
308                    .unwrap_or(true)
309            });
310        assert_or_internal_err!(
311            ambiguous_column.is_none(),
312            "Column {:?} is not found in the original schema of the TestMemoryExec",
313            ambiguous_column.as_ref().unwrap()
314        );
315
316        // If there is a projection on the source, we also need to project orderings
317        if let Some(projection) = &self.projection {
318            let base_schema = self.original_schema();
319            let proj_exprs = projection.iter().map(|idx| {
320                let name = base_schema.field(*idx).name();
321                (Arc::new(Column::new(name, *idx)) as _, name.to_string())
322            });
323            let projection_mapping =
324                ProjectionMapping::try_new(proj_exprs, &base_schema)?;
325            let base_eqp = EquivalenceProperties::new_with_orderings(
326                Arc::clone(&base_schema),
327                sort_information,
328            );
329            let proj_eqp =
330                base_eqp.project(&projection_mapping, Arc::clone(&self.projected_schema));
331            let oeq_class: OrderingEquivalenceClass = proj_eqp.into();
332            sort_information = oeq_class.into();
333        }
334
335        self.sort_information = sort_information;
336        self.cache = Arc::new(self.compute_properties());
337        Ok(self)
338    }
339
340    /// Arc clone of ref to original schema
341    pub fn original_schema(&self) -> SchemaRef {
342        Arc::clone(&self.schema)
343    }
344}
345
346/// Asserts that given future is pending.
347pub fn assert_is_pending<'a, T>(fut: &mut Pin<Box<dyn Future<Output = T> + Send + 'a>>) {
348    let waker = futures::task::noop_waker();
349    let mut cx = Context::from_waker(&waker);
350    let poll = fut.poll_unpin(&mut cx);
351
352    assert!(poll.is_pending());
353}
354
355/// Get the schema for the aggregate_test_* csv files
356pub fn aggr_test_schema() -> SchemaRef {
357    let mut f1 = Field::new("c1", DataType::Utf8, false);
358    f1.set_metadata(HashMap::from_iter(vec![("testing".into(), "test".into())]));
359    let schema = Schema::new(vec![
360        f1,
361        Field::new("c2", DataType::UInt32, false),
362        Field::new("c3", DataType::Int8, false),
363        Field::new("c4", DataType::Int16, false),
364        Field::new("c5", DataType::Int32, false),
365        Field::new("c6", DataType::Int64, false),
366        Field::new("c7", DataType::UInt8, false),
367        Field::new("c8", DataType::UInt16, false),
368        Field::new("c9", DataType::UInt32, false),
369        Field::new("c10", DataType::UInt64, false),
370        Field::new("c11", DataType::Float32, false),
371        Field::new("c12", DataType::Float64, false),
372        Field::new("c13", DataType::Utf8, false),
373    ]);
374
375    Arc::new(schema)
376}
377
378/// Returns record batch with 3 columns of i32 in memory
379pub fn build_table_i32(
380    a: (&str, &Vec<i32>),
381    b: (&str, &Vec<i32>),
382    c: (&str, &Vec<i32>),
383) -> RecordBatch {
384    let schema = Schema::new(vec![
385        Field::new(a.0, DataType::Int32, false),
386        Field::new(b.0, DataType::Int32, false),
387        Field::new(c.0, DataType::Int32, false),
388    ]);
389
390    RecordBatch::try_new(
391        Arc::new(schema),
392        vec![
393            Arc::new(Int32Array::from(a.1.clone())),
394            Arc::new(Int32Array::from(b.1.clone())),
395            Arc::new(Int32Array::from(c.1.clone())),
396        ],
397    )
398    .unwrap()
399}
400
401/// Returns record batch with 2 columns of i32 in memory
402pub fn build_table_i32_two_cols(
403    a: (&str, &Vec<i32>),
404    b: (&str, &Vec<i32>),
405) -> RecordBatch {
406    let schema = Schema::new(vec![
407        Field::new(a.0, DataType::Int32, false),
408        Field::new(b.0, DataType::Int32, false),
409    ]);
410
411    RecordBatch::try_new(
412        Arc::new(schema),
413        vec![
414            Arc::new(Int32Array::from(a.1.clone())),
415            Arc::new(Int32Array::from(b.1.clone())),
416        ],
417    )
418    .unwrap()
419}
420
421/// Returns memory table scan wrapped around record batch with 3 columns of i32
422pub fn build_table_scan_i32(
423    a: (&str, &Vec<i32>),
424    b: (&str, &Vec<i32>),
425    c: (&str, &Vec<i32>),
426) -> Arc<dyn ExecutionPlan> {
427    let batch = build_table_i32(a, b, c);
428    let schema = batch.schema();
429    TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap()
430}
431
432/// Return a RecordBatch with a single Int32 array with values (0..sz) in a field named "i"
433pub fn make_partition(sz: i32) -> RecordBatch {
434    let seq_start = 0;
435    let seq_end = sz;
436    let values = (seq_start..seq_end).collect::<Vec<_>>();
437    let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
438    let arr = Arc::new(Int32Array::from(values));
439    let arr = arr as ArrayRef;
440
441    RecordBatch::try_new(schema, vec![arr]).unwrap()
442}
443
444pub fn make_partition_utf8(sz: i32) -> RecordBatch {
445    let seq_start = 0;
446    let seq_end = sz;
447    let values = (seq_start..seq_end)
448        .map(|i| format!("test_long_string_that_is_roughly_42_bytes_{i}"))
449        .collect::<Vec<_>>();
450    let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Utf8, true)]));
451    let mut string_array = arrow::array::StringArray::from(values);
452    string_array.shrink_to_fit();
453    let arr = Arc::new(string_array);
454    let arr = arr as ArrayRef;
455
456    RecordBatch::try_new(schema, vec![arr]).unwrap()
457}
458
459/// Returns a `DataSourceExec` that scans `partitions` of 100 batches each
460pub fn scan_partitioned(partitions: usize) -> Arc<dyn ExecutionPlan> {
461    Arc::new(mem_exec(partitions))
462}
463
464pub fn scan_partitioned_utf8(partitions: usize) -> Arc<dyn ExecutionPlan> {
465    Arc::new(mem_exec_utf8(partitions))
466}
467
468/// Returns a `DataSourceExec` that scans `partitions` of 100 batches each
469pub fn mem_exec(partitions: usize) -> TestMemoryExec {
470    let data: Vec<Vec<_>> = (0..partitions).map(|_| vec![make_partition(100)]).collect();
471
472    let schema = data[0][0].schema();
473    let projection = None;
474
475    TestMemoryExec::try_new(&data, schema, projection).unwrap()
476}
477
478pub fn mem_exec_utf8(partitions: usize) -> TestMemoryExec {
479    let data: Vec<Vec<_>> = (0..partitions)
480        .map(|_| vec![make_partition_utf8(100)])
481        .collect();
482
483    let schema = data[0][0].schema();
484    let projection = None;
485
486    TestMemoryExec::try_new(&data, schema, projection).unwrap()
487}
488
489// Construct a stream partition for test purposes
490#[derive(Debug)]
491pub struct TestPartitionStream {
492    pub schema: SchemaRef,
493    pub batches: Vec<RecordBatch>,
494}
495
496impl TestPartitionStream {
497    /// Create a new stream partition with the provided batches
498    pub fn new_with_batches(batches: Vec<RecordBatch>) -> Self {
499        let schema = batches[0].schema();
500        Self { schema, batches }
501    }
502}
503impl PartitionStream for TestPartitionStream {
504    fn schema(&self) -> &SchemaRef {
505        &self.schema
506    }
507    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
508        let stream = futures::stream::iter(self.batches.clone().into_iter().map(Ok));
509        Box::pin(RecordBatchStreamAdapter::new(
510            Arc::clone(&self.schema),
511            stream,
512        ))
513    }
514}
515
516#[cfg(test)]
517macro_rules! assert_join_metrics {
518    ($metrics:expr, $expected_rows:expr) => {
519        assert_eq!($metrics.output_rows().unwrap(), $expected_rows);
520
521        let elapsed_compute = $metrics
522            .elapsed_compute()
523            .expect("did not find elapsed_compute metric");
524        let join_time = $metrics
525            .sum_by_name("join_time")
526            .expect("did not find join_time metric")
527            .as_usize();
528        let build_time = $metrics
529            .sum_by_name("build_time")
530            .expect("did not find build_time metric")
531            .as_usize();
532        // ensure join_time and build_time are considered in elapsed_compute
533        assert!(
534            join_time + build_time <= elapsed_compute,
535            "join_time ({}) + build_time ({}) = {} was <= elapsed_compute = {}",
536            join_time,
537            build_time,
538            join_time + build_time,
539            elapsed_compute
540        );
541    };
542}
543#[cfg(test)]
544pub(crate) use assert_join_metrics;