datafusion_physical_plan/
test.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Utilities for testing datafusion-physical-plan
19
20use std::any::Any;
21use std::collections::HashMap;
22use std::fmt;
23use std::fmt::{Debug, Formatter};
24use std::pin::Pin;
25use std::sync::Arc;
26use std::task::Context;
27
28use crate::ExecutionPlan;
29use crate::common;
30use crate::execution_plan::{Boundedness, EmissionType};
31use crate::memory::MemoryStream;
32use crate::metrics::MetricsSet;
33use crate::stream::RecordBatchStreamAdapter;
34use crate::streaming::PartitionStream;
35use crate::{DisplayAs, DisplayFormatType, PlanProperties};
36
37use arrow::array::{Array, ArrayRef, Int32Array, RecordBatch};
38use arrow_schema::{DataType, Field, Schema, SchemaRef};
39use datafusion_common::{
40    Result, Statistics, assert_or_internal_err, config::ConfigOptions, project_schema,
41};
42use datafusion_execution::{SendableRecordBatchStream, TaskContext};
43use datafusion_physical_expr::equivalence::{
44    OrderingEquivalenceClass, ProjectionMapping,
45};
46use datafusion_physical_expr::expressions::Column;
47use datafusion_physical_expr::utils::collect_columns;
48use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, Partitioning};
49
50use futures::{Future, FutureExt};
51
52pub mod exec;
53
54/// `TestMemoryExec` is a mock equivalent to [`MemorySourceConfig`] with [`ExecutionPlan`] implemented for testing.
55/// i.e. It has some but not all the functionality of [`MemorySourceConfig`].
56/// This implements an in-memory DataSource rather than explicitly implementing a trait.
57/// It is implemented in this manner to keep relevant unit tests in place
58/// while avoiding circular dependencies between `datafusion-physical-plan` and `datafusion-datasource`.
59///
60/// [`MemorySourceConfig`]: https://github.com/apache/datafusion/tree/main/datafusion/datasource/src/memory.rs
61#[derive(Clone, Debug)]
62pub struct TestMemoryExec {
63    /// The partitions to query
64    partitions: Vec<Vec<RecordBatch>>,
65    /// Schema representing the data before projection
66    schema: SchemaRef,
67    /// Schema representing the data after the optional projection is applied
68    projected_schema: SchemaRef,
69    /// Optional projection
70    projection: Option<Vec<usize>>,
71    /// Sort information: one or more equivalent orderings
72    sort_information: Vec<LexOrdering>,
73    /// if partition sizes should be displayed
74    show_sizes: bool,
75    /// The maximum number of records to read from this plan. If `None`,
76    /// all records after filtering are returned.
77    fetch: Option<usize>,
78    cache: PlanProperties,
79}
80
81impl DisplayAs for TestMemoryExec {
82    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
83        write!(f, "DataSourceExec: ")?;
84        match t {
85            DisplayFormatType::Default | DisplayFormatType::Verbose => {
86                let partition_sizes: Vec<_> =
87                    self.partitions.iter().map(|b| b.len()).collect();
88
89                let output_ordering = self
90                    .sort_information
91                    .first()
92                    .map(|output_ordering| format!(", output_ordering={output_ordering}"))
93                    .unwrap_or_default();
94
95                let eq_properties = self.eq_properties();
96                let constraints = eq_properties.constraints();
97                let constraints = if constraints.is_empty() {
98                    String::new()
99                } else {
100                    format!(", {constraints}")
101                };
102
103                let limit = self
104                    .fetch
105                    .map_or(String::new(), |limit| format!(", fetch={limit}"));
106                if self.show_sizes {
107                    write!(
108                        f,
109                        "partitions={}, partition_sizes={partition_sizes:?}{limit}{output_ordering}{constraints}",
110                        partition_sizes.len(),
111                    )
112                } else {
113                    write!(
114                        f,
115                        "partitions={}{limit}{output_ordering}{constraints}",
116                        partition_sizes.len(),
117                    )
118                }
119            }
120            DisplayFormatType::TreeRender => {
121                // TODO: collect info
122                write!(f, "")
123            }
124        }
125    }
126}
127
128impl ExecutionPlan for TestMemoryExec {
129    fn name(&self) -> &'static str {
130        "DataSourceExec"
131    }
132
133    fn as_any(&self) -> &dyn Any {
134        self
135    }
136
137    fn properties(&self) -> &PlanProperties {
138        &self.cache
139    }
140
141    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
142        Vec::new()
143    }
144
145    fn with_new_children(
146        self: Arc<Self>,
147        _: Vec<Arc<dyn ExecutionPlan>>,
148    ) -> Result<Arc<dyn ExecutionPlan>> {
149        unimplemented!()
150    }
151
152    fn repartitioned(
153        &self,
154        _target_partitions: usize,
155        _config: &ConfigOptions,
156    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
157        unimplemented!()
158    }
159
160    fn execute(
161        &self,
162        partition: usize,
163        context: Arc<TaskContext>,
164    ) -> Result<SendableRecordBatchStream> {
165        self.open(partition, context)
166    }
167
168    fn metrics(&self) -> Option<MetricsSet> {
169        unimplemented!()
170    }
171
172    fn statistics(&self) -> Result<Statistics> {
173        self.statistics_inner()
174    }
175
176    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
177        if partition.is_some() {
178            Ok(Statistics::new_unknown(&self.schema))
179        } else {
180            self.statistics_inner()
181        }
182    }
183
184    fn fetch(&self) -> Option<usize> {
185        self.fetch
186    }
187}
188
189impl TestMemoryExec {
190    fn open(
191        &self,
192        partition: usize,
193        _context: Arc<TaskContext>,
194    ) -> Result<SendableRecordBatchStream> {
195        Ok(Box::pin(
196            MemoryStream::try_new(
197                self.partitions[partition].clone(),
198                Arc::clone(&self.projected_schema),
199                self.projection.clone(),
200            )?
201            .with_fetch(self.fetch),
202        ))
203    }
204
205    fn compute_properties(&self) -> PlanProperties {
206        PlanProperties::new(
207            self.eq_properties(),
208            self.output_partitioning(),
209            EmissionType::Incremental,
210            Boundedness::Bounded,
211        )
212    }
213
214    fn output_partitioning(&self) -> Partitioning {
215        Partitioning::UnknownPartitioning(self.partitions.len())
216    }
217
218    fn eq_properties(&self) -> EquivalenceProperties {
219        EquivalenceProperties::new_with_orderings(
220            Arc::clone(&self.projected_schema),
221            self.sort_information.clone(),
222        )
223    }
224
225    fn statistics_inner(&self) -> Result<Statistics> {
226        Ok(common::compute_record_batch_statistics(
227            &self.partitions,
228            &self.schema,
229            self.projection.clone(),
230        ))
231    }
232
233    pub fn try_new(
234        partitions: &[Vec<RecordBatch>],
235        schema: SchemaRef,
236        projection: Option<Vec<usize>>,
237    ) -> Result<Self> {
238        let projected_schema = project_schema(&schema, projection.as_ref())?;
239        Ok(Self {
240            partitions: partitions.to_vec(),
241            schema,
242            cache: PlanProperties::new(
243                EquivalenceProperties::new_with_orderings(
244                    Arc::clone(&projected_schema),
245                    Vec::<LexOrdering>::new(),
246                ),
247                Partitioning::UnknownPartitioning(partitions.len()),
248                EmissionType::Incremental,
249                Boundedness::Bounded,
250            ),
251            projected_schema,
252            projection,
253            sort_information: vec![],
254            show_sizes: true,
255            fetch: None,
256        })
257    }
258
259    /// Create a new `DataSourceExec` Equivalent plan for reading in-memory record batches
260    /// The provided `schema` should not have the projection applied.
261    pub fn try_new_exec(
262        partitions: &[Vec<RecordBatch>],
263        schema: SchemaRef,
264        projection: Option<Vec<usize>>,
265    ) -> Result<Arc<TestMemoryExec>> {
266        let mut source = Self::try_new(partitions, schema, projection)?;
267        let cache = source.compute_properties();
268        source.cache = cache;
269        Ok(Arc::new(source))
270    }
271
272    // Equivalent of `DataSourceExec::new`
273    pub fn update_cache(source: &Arc<TestMemoryExec>) -> TestMemoryExec {
274        let cache = source.compute_properties();
275        let mut source = (**source).clone();
276        source.cache = cache;
277        source
278    }
279
280    /// Set the limit of the files
281    pub fn with_limit(mut self, limit: Option<usize>) -> Self {
282        self.fetch = limit;
283        self
284    }
285
286    /// Ref to partitions
287    pub fn partitions(&self) -> &[Vec<RecordBatch>] {
288        &self.partitions
289    }
290
291    /// Ref to projection
292    pub fn projection(&self) -> &Option<Vec<usize>> {
293        &self.projection
294    }
295
296    /// Ref to sort information
297    pub fn sort_information(&self) -> &[LexOrdering] {
298        &self.sort_information
299    }
300
301    /// refer to `try_with_sort_information` at MemorySourceConfig for more information.
302    /// <https://github.com/apache/datafusion/tree/main/datafusion/datasource/src/memory.rs>
303    pub fn try_with_sort_information(
304        mut self,
305        mut sort_information: Vec<LexOrdering>,
306    ) -> Result<Self> {
307        // All sort expressions must refer to the original schema
308        let fields = self.schema.fields();
309        let ambiguous_column = sort_information
310            .iter()
311            .flat_map(|ordering| ordering.clone())
312            .flat_map(|expr| collect_columns(&expr.expr))
313            .find(|col| {
314                fields
315                    .get(col.index())
316                    .map(|field| field.name() != col.name())
317                    .unwrap_or(true)
318            });
319        assert_or_internal_err!(
320            ambiguous_column.is_none(),
321            "Column {:?} is not found in the original schema of the TestMemoryExec",
322            ambiguous_column.as_ref().unwrap()
323        );
324
325        // If there is a projection on the source, we also need to project orderings
326        if let Some(projection) = &self.projection {
327            let base_schema = self.original_schema();
328            let proj_exprs = projection.iter().map(|idx| {
329                let name = base_schema.field(*idx).name();
330                (Arc::new(Column::new(name, *idx)) as _, name.to_string())
331            });
332            let projection_mapping =
333                ProjectionMapping::try_new(proj_exprs, &base_schema)?;
334            let base_eqp = EquivalenceProperties::new_with_orderings(
335                Arc::clone(&base_schema),
336                sort_information,
337            );
338            let proj_eqp =
339                base_eqp.project(&projection_mapping, Arc::clone(&self.projected_schema));
340            let oeq_class: OrderingEquivalenceClass = proj_eqp.into();
341            sort_information = oeq_class.into();
342        }
343
344        self.sort_information = sort_information;
345        self.cache = self.compute_properties();
346        Ok(self)
347    }
348
349    /// Arc clone of ref to original schema
350    pub fn original_schema(&self) -> SchemaRef {
351        Arc::clone(&self.schema)
352    }
353}
354
355/// Asserts that given future is pending.
356pub fn assert_is_pending<'a, T>(fut: &mut Pin<Box<dyn Future<Output = T> + Send + 'a>>) {
357    let waker = futures::task::noop_waker();
358    let mut cx = Context::from_waker(&waker);
359    let poll = fut.poll_unpin(&mut cx);
360
361    assert!(poll.is_pending());
362}
363
364/// Get the schema for the aggregate_test_* csv files
365pub fn aggr_test_schema() -> SchemaRef {
366    let mut f1 = Field::new("c1", DataType::Utf8, false);
367    f1.set_metadata(HashMap::from_iter(vec![("testing".into(), "test".into())]));
368    let schema = Schema::new(vec![
369        f1,
370        Field::new("c2", DataType::UInt32, false),
371        Field::new("c3", DataType::Int8, false),
372        Field::new("c4", DataType::Int16, false),
373        Field::new("c5", DataType::Int32, false),
374        Field::new("c6", DataType::Int64, false),
375        Field::new("c7", DataType::UInt8, false),
376        Field::new("c8", DataType::UInt16, false),
377        Field::new("c9", DataType::UInt32, false),
378        Field::new("c10", DataType::UInt64, false),
379        Field::new("c11", DataType::Float32, false),
380        Field::new("c12", DataType::Float64, false),
381        Field::new("c13", DataType::Utf8, false),
382    ]);
383
384    Arc::new(schema)
385}
386
387/// Returns record batch with 3 columns of i32 in memory
388pub fn build_table_i32(
389    a: (&str, &Vec<i32>),
390    b: (&str, &Vec<i32>),
391    c: (&str, &Vec<i32>),
392) -> RecordBatch {
393    let schema = Schema::new(vec![
394        Field::new(a.0, DataType::Int32, false),
395        Field::new(b.0, DataType::Int32, false),
396        Field::new(c.0, DataType::Int32, false),
397    ]);
398
399    RecordBatch::try_new(
400        Arc::new(schema),
401        vec![
402            Arc::new(Int32Array::from(a.1.clone())),
403            Arc::new(Int32Array::from(b.1.clone())),
404            Arc::new(Int32Array::from(c.1.clone())),
405        ],
406    )
407    .unwrap()
408}
409
410/// Returns record batch with 2 columns of i32 in memory
411pub fn build_table_i32_two_cols(
412    a: (&str, &Vec<i32>),
413    b: (&str, &Vec<i32>),
414) -> RecordBatch {
415    let schema = Schema::new(vec![
416        Field::new(a.0, DataType::Int32, false),
417        Field::new(b.0, DataType::Int32, false),
418    ]);
419
420    RecordBatch::try_new(
421        Arc::new(schema),
422        vec![
423            Arc::new(Int32Array::from(a.1.clone())),
424            Arc::new(Int32Array::from(b.1.clone())),
425        ],
426    )
427    .unwrap()
428}
429
430/// Returns memory table scan wrapped around record batch with 3 columns of i32
431pub fn build_table_scan_i32(
432    a: (&str, &Vec<i32>),
433    b: (&str, &Vec<i32>),
434    c: (&str, &Vec<i32>),
435) -> Arc<dyn ExecutionPlan> {
436    let batch = build_table_i32(a, b, c);
437    let schema = batch.schema();
438    TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap()
439}
440
441/// Return a RecordBatch with a single Int32 array with values (0..sz) in a field named "i"
442pub fn make_partition(sz: i32) -> RecordBatch {
443    let seq_start = 0;
444    let seq_end = sz;
445    let values = (seq_start..seq_end).collect::<Vec<_>>();
446    let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
447    let arr = Arc::new(Int32Array::from(values));
448    let arr = arr as ArrayRef;
449
450    RecordBatch::try_new(schema, vec![arr]).unwrap()
451}
452
453pub fn make_partition_utf8(sz: i32) -> RecordBatch {
454    let seq_start = 0;
455    let seq_end = sz;
456    let values = (seq_start..seq_end)
457        .map(|i| format!("test_long_string_that_is_roughly_42_bytes_{i}"))
458        .collect::<Vec<_>>();
459    let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Utf8, true)]));
460    let mut string_array = arrow::array::StringArray::from(values);
461    string_array.shrink_to_fit();
462    let arr = Arc::new(string_array);
463    let arr = arr as ArrayRef;
464
465    RecordBatch::try_new(schema, vec![arr]).unwrap()
466}
467
468/// Returns a `DataSourceExec` that scans `partitions` of 100 batches each
469pub fn scan_partitioned(partitions: usize) -> Arc<dyn ExecutionPlan> {
470    Arc::new(mem_exec(partitions))
471}
472
473pub fn scan_partitioned_utf8(partitions: usize) -> Arc<dyn ExecutionPlan> {
474    Arc::new(mem_exec_utf8(partitions))
475}
476
477/// Returns a `DataSourceExec` that scans `partitions` of 100 batches each
478pub fn mem_exec(partitions: usize) -> TestMemoryExec {
479    let data: Vec<Vec<_>> = (0..partitions).map(|_| vec![make_partition(100)]).collect();
480
481    let schema = data[0][0].schema();
482    let projection = None;
483
484    TestMemoryExec::try_new(&data, schema, projection).unwrap()
485}
486
487pub fn mem_exec_utf8(partitions: usize) -> TestMemoryExec {
488    let data: Vec<Vec<_>> = (0..partitions)
489        .map(|_| vec![make_partition_utf8(100)])
490        .collect();
491
492    let schema = data[0][0].schema();
493    let projection = None;
494
495    TestMemoryExec::try_new(&data, schema, projection).unwrap()
496}
497
498// Construct a stream partition for test purposes
499#[derive(Debug)]
500pub struct TestPartitionStream {
501    pub schema: SchemaRef,
502    pub batches: Vec<RecordBatch>,
503}
504
505impl TestPartitionStream {
506    /// Create a new stream partition with the provided batches
507    pub fn new_with_batches(batches: Vec<RecordBatch>) -> Self {
508        let schema = batches[0].schema();
509        Self { schema, batches }
510    }
511}
512impl PartitionStream for TestPartitionStream {
513    fn schema(&self) -> &SchemaRef {
514        &self.schema
515    }
516    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
517        let stream = futures::stream::iter(self.batches.clone().into_iter().map(Ok));
518        Box::pin(RecordBatchStreamAdapter::new(
519            Arc::clone(&self.schema),
520            stream,
521        ))
522    }
523}
524
525#[cfg(test)]
526macro_rules! assert_join_metrics {
527    ($metrics:expr, $expected_rows:expr) => {
528        assert_eq!($metrics.output_rows().unwrap(), $expected_rows);
529
530        let elapsed_compute = $metrics
531            .elapsed_compute()
532            .expect("did not find elapsed_compute metric");
533        let join_time = $metrics
534            .sum_by_name("join_time")
535            .expect("did not find join_time metric")
536            .as_usize();
537        let build_time = $metrics
538            .sum_by_name("build_time")
539            .expect("did not find build_time metric")
540            .as_usize();
541        // ensure join_time and build_time are considered in elapsed_compute
542        assert!(
543            join_time + build_time <= elapsed_compute,
544            "join_time ({}) + build_time ({}) = {} was <= elapsed_compute = {}",
545            join_time,
546            build_time,
547            join_time + build_time,
548            elapsed_compute
549        );
550    };
551}
552#[cfg(test)]
553pub(crate) use assert_join_metrics;