datafusion_datasource/
memory.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Execution plan for reading in-memory batches of data
19
20use std::any::Any;
21use std::fmt;
22use std::sync::Arc;
23
24use crate::source::{DataSource, DataSourceExec};
25use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
26use datafusion_physical_plan::memory::MemoryStream;
27use datafusion_physical_plan::projection::{
28    all_alias_free_columns, new_projections_for_columns, ProjectionExec,
29};
30use datafusion_physical_plan::{
31    common, ColumnarValue, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
32    PhysicalExpr, PlanProperties, SendableRecordBatchStream, Statistics,
33};
34
35use arrow::array::{RecordBatch, RecordBatchOptions};
36use arrow::datatypes::{Schema, SchemaRef};
37use datafusion_common::{
38    internal_err, plan_err, project_schema, Constraints, Result, ScalarValue,
39};
40use datafusion_execution::TaskContext;
41use datafusion_physical_expr::equivalence::ProjectionMapping;
42use datafusion_physical_expr::expressions::Column;
43use datafusion_physical_expr::utils::collect_columns;
44use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
45
46/// Execution plan for reading in-memory batches of data
47#[derive(Clone)]
48#[deprecated(
49    since = "46.0.0",
50    note = "use MemorySourceConfig and DataSourceExec instead"
51)]
52pub struct MemoryExec {
53    inner: DataSourceExec,
54    /// The partitions to query
55    partitions: Vec<Vec<RecordBatch>>,
56    /// Optional projection
57    projection: Option<Vec<usize>>,
58    // Sort information: one or more equivalent orderings
59    sort_information: Vec<LexOrdering>,
60    /// if partition sizes should be displayed
61    show_sizes: bool,
62}
63
64#[allow(unused, deprecated)]
65impl fmt::Debug for MemoryExec {
66    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
67        self.inner.fmt_as(DisplayFormatType::Default, f)
68    }
69}
70
71#[allow(unused, deprecated)]
72impl DisplayAs for MemoryExec {
73    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
74        self.inner.fmt_as(t, f)
75    }
76}
77
78#[allow(unused, deprecated)]
79impl ExecutionPlan for MemoryExec {
80    fn name(&self) -> &'static str {
81        "MemoryExec"
82    }
83
84    /// Return a reference to Any that can be used for downcasting
85    fn as_any(&self) -> &dyn Any {
86        self
87    }
88
89    fn properties(&self) -> &PlanProperties {
90        self.inner.properties()
91    }
92
93    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
94        // This is a leaf node and has no children
95        vec![]
96    }
97
98    fn with_new_children(
99        self: Arc<Self>,
100        children: Vec<Arc<dyn ExecutionPlan>>,
101    ) -> Result<Arc<dyn ExecutionPlan>> {
102        // MemoryExec has no children
103        if children.is_empty() {
104            Ok(self)
105        } else {
106            internal_err!("Children cannot be replaced in {self:?}")
107        }
108    }
109
110    fn execute(
111        &self,
112        partition: usize,
113        context: Arc<TaskContext>,
114    ) -> Result<SendableRecordBatchStream> {
115        self.inner.execute(partition, context)
116    }
117
118    /// We recompute the statistics dynamically from the arrow metadata as it is pretty cheap to do so
119    fn statistics(&self) -> Result<Statistics> {
120        self.inner.statistics()
121    }
122
123    fn try_swapping_with_projection(
124        &self,
125        projection: &ProjectionExec,
126    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
127        self.inner.try_swapping_with_projection(projection)
128    }
129}
130
131#[allow(unused, deprecated)]
132impl MemoryExec {
133    /// Create a new execution plan for reading in-memory record batches
134    /// The provided `schema` should not have the projection applied.
135    pub fn try_new(
136        partitions: &[Vec<RecordBatch>],
137        schema: SchemaRef,
138        projection: Option<Vec<usize>>,
139    ) -> Result<Self> {
140        let source = MemorySourceConfig::try_new(partitions, schema, projection.clone())?;
141        let data_source = DataSourceExec::new(Arc::new(source));
142        Ok(Self {
143            inner: data_source,
144            partitions: partitions.to_vec(),
145            projection,
146            sort_information: vec![],
147            show_sizes: true,
148        })
149    }
150
151    /// Create a new execution plan from a list of constant values (`ValuesExec`)
152    pub fn try_new_as_values(
153        schema: SchemaRef,
154        data: Vec<Vec<Arc<dyn PhysicalExpr>>>,
155    ) -> Result<Self> {
156        if data.is_empty() {
157            return plan_err!("Values list cannot be empty");
158        }
159
160        let n_row = data.len();
161        let n_col = schema.fields().len();
162
163        // We have this single row batch as a placeholder to satisfy evaluation argument
164        // and generate a single output row
165        let placeholder_schema = Arc::new(Schema::empty());
166        let placeholder_batch = RecordBatch::try_new_with_options(
167            Arc::clone(&placeholder_schema),
168            vec![],
169            &RecordBatchOptions::new().with_row_count(Some(1)),
170        )?;
171
172        // Evaluate each column
173        let arrays = (0..n_col)
174            .map(|j| {
175                (0..n_row)
176                    .map(|i| {
177                        let expr = &data[i][j];
178                        let result = expr.evaluate(&placeholder_batch)?;
179
180                        match result {
181                            ColumnarValue::Scalar(scalar) => Ok(scalar),
182                            ColumnarValue::Array(array) if array.len() == 1 => {
183                                ScalarValue::try_from_array(&array, 0)
184                            }
185                            ColumnarValue::Array(_) => {
186                                plan_err!("Cannot have array values in a values list")
187                            }
188                        }
189                    })
190                    .collect::<Result<Vec<_>>>()
191                    .and_then(ScalarValue::iter_to_array)
192            })
193            .collect::<Result<Vec<_>>>()?;
194
195        let batch = RecordBatch::try_new_with_options(
196            Arc::clone(&schema),
197            arrays,
198            &RecordBatchOptions::new().with_row_count(Some(n_row)),
199        )?;
200
201        let partitions = vec![batch];
202        Self::try_new_from_batches(Arc::clone(&schema), partitions)
203    }
204
205    /// Create a new plan using the provided schema and batches.
206    ///
207    /// Errors if any of the batches don't match the provided schema, or if no
208    /// batches are provided.
209    pub fn try_new_from_batches(
210        schema: SchemaRef,
211        batches: Vec<RecordBatch>,
212    ) -> Result<Self> {
213        if batches.is_empty() {
214            return plan_err!("Values list cannot be empty");
215        }
216
217        for batch in &batches {
218            let batch_schema = batch.schema();
219            if batch_schema != schema {
220                return plan_err!(
221                    "Batch has invalid schema. Expected: {}, got: {}",
222                    schema,
223                    batch_schema
224                );
225            }
226        }
227
228        let partitions = vec![batches];
229        let source = MemorySourceConfig {
230            partitions: partitions.clone(),
231            schema: Arc::clone(&schema),
232            projected_schema: Arc::clone(&schema),
233            projection: None,
234            sort_information: vec![],
235            show_sizes: true,
236            fetch: None,
237        };
238        let data_source = DataSourceExec::new(Arc::new(source));
239        Ok(Self {
240            inner: data_source,
241            partitions,
242            projection: None,
243            sort_information: vec![],
244            show_sizes: true,
245        })
246    }
247
248    fn memory_source_config(&self) -> MemorySourceConfig {
249        self.inner
250            .data_source()
251            .as_any()
252            .downcast_ref::<MemorySourceConfig>()
253            .unwrap()
254            .clone()
255    }
256
257    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
258        self.inner = self.inner.with_constraints(constraints);
259        self
260    }
261
262    /// Set `show_sizes` to determine whether to display partition sizes
263    pub fn with_show_sizes(mut self, show_sizes: bool) -> Self {
264        let mut memory_source = self.memory_source_config();
265        memory_source.show_sizes = show_sizes;
266        self.show_sizes = show_sizes;
267        self.inner = DataSourceExec::new(Arc::new(memory_source));
268        self
269    }
270
271    /// Ref to constraints
272    pub fn constraints(&self) -> &Constraints {
273        self.properties().equivalence_properties().constraints()
274    }
275
276    /// Ref to partitions
277    pub fn partitions(&self) -> &[Vec<RecordBatch>] {
278        &self.partitions
279    }
280
281    /// Ref to projection
282    pub fn projection(&self) -> &Option<Vec<usize>> {
283        &self.projection
284    }
285
286    /// Show sizes
287    pub fn show_sizes(&self) -> bool {
288        self.show_sizes
289    }
290
291    /// Ref to sort information
292    pub fn sort_information(&self) -> &[LexOrdering] {
293        &self.sort_information
294    }
295
296    /// A memory table can be ordered by multiple expressions simultaneously.
297    /// [`EquivalenceProperties`] keeps track of expressions that describe the
298    /// global ordering of the schema. These columns are not necessarily same; e.g.
299    /// ```text
300    /// ┌-------┐
301    /// | a | b |
302    /// |---|---|
303    /// | 1 | 9 |
304    /// | 2 | 8 |
305    /// | 3 | 7 |
306    /// | 5 | 5 |
307    /// └---┴---┘
308    /// ```
309    /// where both `a ASC` and `b DESC` can describe the table ordering. With
310    /// [`EquivalenceProperties`], we can keep track of these equivalences
311    /// and treat `a ASC` and `b DESC` as the same ordering requirement.
312    ///
313    /// Note that if there is an internal projection, that projection will be
314    /// also applied to the given `sort_information`.
315    pub fn try_with_sort_information(
316        mut self,
317        sort_information: Vec<LexOrdering>,
318    ) -> Result<Self> {
319        self.sort_information = sort_information.clone();
320        let mut memory_source = self.memory_source_config();
321        memory_source = memory_source.try_with_sort_information(sort_information)?;
322        self.inner = DataSourceExec::new(Arc::new(memory_source));
323        Ok(self)
324    }
325
326    /// Arc clone of ref to original schema
327    pub fn original_schema(&self) -> SchemaRef {
328        Arc::clone(&self.inner.schema())
329    }
330
331    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
332    fn compute_properties(
333        schema: SchemaRef,
334        orderings: &[LexOrdering],
335        constraints: Constraints,
336        partitions: &[Vec<RecordBatch>],
337    ) -> PlanProperties {
338        PlanProperties::new(
339            EquivalenceProperties::new_with_orderings(schema, orderings)
340                .with_constraints(constraints),
341            Partitioning::UnknownPartitioning(partitions.len()),
342            EmissionType::Incremental,
343            Boundedness::Bounded,
344        )
345    }
346}
347
348/// Data source configuration for reading in-memory batches of data
349#[derive(Clone, Debug)]
350pub struct MemorySourceConfig {
351    /// The partitions to query
352    partitions: Vec<Vec<RecordBatch>>,
353    /// Schema representing the data before projection
354    schema: SchemaRef,
355    /// Schema representing the data after the optional projection is applied
356    projected_schema: SchemaRef,
357    /// Optional projection
358    projection: Option<Vec<usize>>,
359    /// Sort information: one or more equivalent orderings
360    sort_information: Vec<LexOrdering>,
361    /// if partition sizes should be displayed
362    show_sizes: bool,
363    /// The maximum number of records to read from this plan. If `None`,
364    /// all records after filtering are returned.
365    fetch: Option<usize>,
366}
367
368impl DataSource for MemorySourceConfig {
369    fn open(
370        &self,
371        partition: usize,
372        _context: Arc<TaskContext>,
373    ) -> Result<SendableRecordBatchStream> {
374        Ok(Box::pin(
375            MemoryStream::try_new(
376                self.partitions[partition].clone(),
377                Arc::clone(&self.projected_schema),
378                self.projection.clone(),
379            )?
380            .with_fetch(self.fetch),
381        ))
382    }
383
384    fn as_any(&self) -> &dyn Any {
385        self
386    }
387
388    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
389        match t {
390            DisplayFormatType::Default | DisplayFormatType::Verbose => {
391                let partition_sizes: Vec<_> =
392                    self.partitions.iter().map(|b| b.len()).collect();
393
394                let output_ordering = self
395                    .sort_information
396                    .first()
397                    .map(|output_ordering| {
398                        format!(", output_ordering={}", output_ordering)
399                    })
400                    .unwrap_or_default();
401
402                let eq_properties = self.eq_properties();
403                let constraints = eq_properties.constraints();
404                let constraints = if constraints.is_empty() {
405                    String::new()
406                } else {
407                    format!(", {}", constraints)
408                };
409
410                let limit = self
411                    .fetch
412                    .map_or(String::new(), |limit| format!(", fetch={}", limit));
413                if self.show_sizes {
414                    write!(
415                        f,
416                        "partitions={}, partition_sizes={partition_sizes:?}{limit}{output_ordering}{constraints}",
417                        partition_sizes.len(),
418                    )
419                } else {
420                    write!(
421                        f,
422                        "partitions={}{limit}{output_ordering}{constraints}",
423                        partition_sizes.len(),
424                    )
425                }
426            }
427        }
428    }
429
430    fn output_partitioning(&self) -> Partitioning {
431        Partitioning::UnknownPartitioning(self.partitions.len())
432    }
433
434    fn eq_properties(&self) -> EquivalenceProperties {
435        EquivalenceProperties::new_with_orderings(
436            Arc::clone(&self.projected_schema),
437            self.sort_information.as_slice(),
438        )
439    }
440
441    fn statistics(&self) -> Result<Statistics> {
442        Ok(common::compute_record_batch_statistics(
443            &self.partitions,
444            &self.schema,
445            self.projection.clone(),
446        ))
447    }
448
449    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
450        let source = self.clone();
451        Some(Arc::new(source.with_limit(limit)))
452    }
453
454    fn fetch(&self) -> Option<usize> {
455        self.fetch
456    }
457
458    fn try_swapping_with_projection(
459        &self,
460        projection: &ProjectionExec,
461    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
462        // If there is any non-column or alias-carrier expression, Projection should not be removed.
463        // This process can be moved into MemoryExec, but it would be an overlap of their responsibility.
464        all_alias_free_columns(projection.expr())
465            .then(|| {
466                let all_projections = (0..self.schema.fields().len()).collect();
467                let new_projections = new_projections_for_columns(
468                    projection,
469                    self.projection().as_ref().unwrap_or(&all_projections),
470                );
471
472                MemorySourceConfig::try_new_exec(
473                    self.partitions(),
474                    self.original_schema(),
475                    Some(new_projections),
476                )
477                .map(|e| e as _)
478            })
479            .transpose()
480    }
481}
482
483impl MemorySourceConfig {
484    /// Create a new `MemorySourceConfig` for reading in-memory record batches
485    /// The provided `schema` should not have the projection applied.
486    pub fn try_new(
487        partitions: &[Vec<RecordBatch>],
488        schema: SchemaRef,
489        projection: Option<Vec<usize>>,
490    ) -> Result<Self> {
491        let projected_schema = project_schema(&schema, projection.as_ref())?;
492        Ok(Self {
493            partitions: partitions.to_vec(),
494            schema,
495            projected_schema,
496            projection,
497            sort_information: vec![],
498            show_sizes: true,
499            fetch: None,
500        })
501    }
502
503    /// Create a new `DataSourceExec` plan for reading in-memory record batches
504    /// The provided `schema` should not have the projection applied.
505    pub fn try_new_exec(
506        partitions: &[Vec<RecordBatch>],
507        schema: SchemaRef,
508        projection: Option<Vec<usize>>,
509    ) -> Result<Arc<DataSourceExec>> {
510        let source = Self::try_new(partitions, schema, projection)?;
511        Ok(Arc::new(DataSourceExec::new(Arc::new(source))))
512    }
513
514    /// Create a new execution plan from a list of constant values (`ValuesExec`)
515    pub fn try_new_as_values(
516        schema: SchemaRef,
517        data: Vec<Vec<Arc<dyn PhysicalExpr>>>,
518    ) -> Result<Arc<DataSourceExec>> {
519        if data.is_empty() {
520            return plan_err!("Values list cannot be empty");
521        }
522
523        let n_row = data.len();
524        let n_col = schema.fields().len();
525
526        // We have this single row batch as a placeholder to satisfy evaluation argument
527        // and generate a single output row
528        let placeholder_schema = Arc::new(Schema::empty());
529        let placeholder_batch = RecordBatch::try_new_with_options(
530            Arc::clone(&placeholder_schema),
531            vec![],
532            &RecordBatchOptions::new().with_row_count(Some(1)),
533        )?;
534
535        // Evaluate each column
536        let arrays = (0..n_col)
537            .map(|j| {
538                (0..n_row)
539                    .map(|i| {
540                        let expr = &data[i][j];
541                        let result = expr.evaluate(&placeholder_batch)?;
542
543                        match result {
544                            ColumnarValue::Scalar(scalar) => Ok(scalar),
545                            ColumnarValue::Array(array) if array.len() == 1 => {
546                                ScalarValue::try_from_array(&array, 0)
547                            }
548                            ColumnarValue::Array(_) => {
549                                plan_err!("Cannot have array values in a values list")
550                            }
551                        }
552                    })
553                    .collect::<Result<Vec<_>>>()
554                    .and_then(ScalarValue::iter_to_array)
555            })
556            .collect::<Result<Vec<_>>>()?;
557
558        let batch = RecordBatch::try_new_with_options(
559            Arc::clone(&schema),
560            arrays,
561            &RecordBatchOptions::new().with_row_count(Some(n_row)),
562        )?;
563
564        let partitions = vec![batch];
565        Self::try_new_from_batches(Arc::clone(&schema), partitions)
566    }
567
568    /// Create a new plan using the provided schema and batches.
569    ///
570    /// Errors if any of the batches don't match the provided schema, or if no
571    /// batches are provided.
572    pub fn try_new_from_batches(
573        schema: SchemaRef,
574        batches: Vec<RecordBatch>,
575    ) -> Result<Arc<DataSourceExec>> {
576        if batches.is_empty() {
577            return plan_err!("Values list cannot be empty");
578        }
579
580        for batch in &batches {
581            let batch_schema = batch.schema();
582            if batch_schema != schema {
583                return plan_err!(
584                    "Batch has invalid schema. Expected: {}, got: {}",
585                    schema,
586                    batch_schema
587                );
588            }
589        }
590
591        let partitions = vec![batches];
592        let source = Self {
593            partitions,
594            schema: Arc::clone(&schema),
595            projected_schema: Arc::clone(&schema),
596            projection: None,
597            sort_information: vec![],
598            show_sizes: true,
599            fetch: None,
600        };
601        Ok(Arc::new(DataSourceExec::new(Arc::new(source))))
602    }
603
604    /// Set the limit of the files
605    pub fn with_limit(mut self, limit: Option<usize>) -> Self {
606        self.fetch = limit;
607        self
608    }
609
610    /// Set `show_sizes` to determine whether to display partition sizes
611    pub fn with_show_sizes(mut self, show_sizes: bool) -> Self {
612        self.show_sizes = show_sizes;
613        self
614    }
615
616    /// Ref to partitions
617    pub fn partitions(&self) -> &[Vec<RecordBatch>] {
618        &self.partitions
619    }
620
621    /// Ref to projection
622    pub fn projection(&self) -> &Option<Vec<usize>> {
623        &self.projection
624    }
625
626    /// Show sizes
627    pub fn show_sizes(&self) -> bool {
628        self.show_sizes
629    }
630
631    /// Ref to sort information
632    pub fn sort_information(&self) -> &[LexOrdering] {
633        &self.sort_information
634    }
635
636    /// A memory table can be ordered by multiple expressions simultaneously.
637    /// [`EquivalenceProperties`] keeps track of expressions that describe the
638    /// global ordering of the schema. These columns are not necessarily same; e.g.
639    /// ```text
640    /// ┌-------┐
641    /// | a | b |
642    /// |---|---|
643    /// | 1 | 9 |
644    /// | 2 | 8 |
645    /// | 3 | 7 |
646    /// | 5 | 5 |
647    /// └---┴---┘
648    /// ```
649    /// where both `a ASC` and `b DESC` can describe the table ordering. With
650    /// [`EquivalenceProperties`], we can keep track of these equivalences
651    /// and treat `a ASC` and `b DESC` as the same ordering requirement.
652    ///
653    /// Note that if there is an internal projection, that projection will be
654    /// also applied to the given `sort_information`.
655    pub fn try_with_sort_information(
656        mut self,
657        mut sort_information: Vec<LexOrdering>,
658    ) -> Result<Self> {
659        // All sort expressions must refer to the original schema
660        let fields = self.schema.fields();
661        let ambiguous_column = sort_information
662            .iter()
663            .flat_map(|ordering| ordering.clone())
664            .flat_map(|expr| collect_columns(&expr.expr))
665            .find(|col| {
666                fields
667                    .get(col.index())
668                    .map(|field| field.name() != col.name())
669                    .unwrap_or(true)
670            });
671        if let Some(col) = ambiguous_column {
672            return internal_err!(
673                "Column {:?} is not found in the original schema of the MemorySourceConfig",
674                col
675            );
676        }
677
678        // If there is a projection on the source, we also need to project orderings
679        if let Some(projection) = &self.projection {
680            let base_eqp = EquivalenceProperties::new_with_orderings(
681                self.original_schema(),
682                &sort_information,
683            );
684            let proj_exprs = projection
685                .iter()
686                .map(|idx| {
687                    let base_schema = self.original_schema();
688                    let name = base_schema.field(*idx).name();
689                    (Arc::new(Column::new(name, *idx)) as _, name.to_string())
690                })
691                .collect::<Vec<_>>();
692            let projection_mapping =
693                ProjectionMapping::try_new(&proj_exprs, &self.original_schema())?;
694            sort_information = base_eqp
695                .project(&projection_mapping, Arc::clone(&self.projected_schema))
696                .into_oeq_class()
697                .into_inner();
698        }
699
700        self.sort_information = sort_information;
701        Ok(self)
702    }
703
704    /// Arc clone of ref to original schema
705    pub fn original_schema(&self) -> SchemaRef {
706        Arc::clone(&self.schema)
707    }
708}
709
710#[cfg(test)]
711mod memory_source_tests {
712    use std::sync::Arc;
713
714    use crate::memory::MemorySourceConfig;
715    use crate::source::DataSourceExec;
716    use datafusion_physical_plan::ExecutionPlan;
717
718    use arrow::compute::SortOptions;
719    use arrow::datatypes::{DataType, Field, Schema};
720    use datafusion_physical_expr::expressions::col;
721    use datafusion_physical_expr::PhysicalSortExpr;
722    use datafusion_physical_expr_common::sort_expr::LexOrdering;
723
724    #[test]
725    fn test_memory_order_eq() -> datafusion_common::Result<()> {
726        let schema = Arc::new(Schema::new(vec![
727            Field::new("a", DataType::Int64, false),
728            Field::new("b", DataType::Int64, false),
729            Field::new("c", DataType::Int64, false),
730        ]));
731        let sort1 = LexOrdering::new(vec![
732            PhysicalSortExpr {
733                expr: col("a", &schema)?,
734                options: SortOptions::default(),
735            },
736            PhysicalSortExpr {
737                expr: col("b", &schema)?,
738                options: SortOptions::default(),
739            },
740        ]);
741        let sort2 = LexOrdering::new(vec![PhysicalSortExpr {
742            expr: col("c", &schema)?,
743            options: SortOptions::default(),
744        }]);
745        let mut expected_output_order = LexOrdering::default();
746        expected_output_order.extend(sort1.clone());
747        expected_output_order.extend(sort2.clone());
748
749        let sort_information = vec![sort1.clone(), sort2.clone()];
750        let mem_exec = Arc::new(DataSourceExec::new(Arc::new(
751            MemorySourceConfig::try_new(&[vec![]], schema, None)?
752                .try_with_sort_information(sort_information)?,
753        )));
754
755        assert_eq!(
756            mem_exec.properties().output_ordering().unwrap(),
757            &expected_output_order
758        );
759        let eq_properties = mem_exec.properties().equivalence_properties();
760        assert!(eq_properties.oeq_class().contains(&sort1));
761        assert!(eq_properties.oeq_class().contains(&sort2));
762        Ok(())
763    }
764}
765
766#[cfg(test)]
767mod tests {
768    use crate::tests::{aggr_test_schema, make_partition};
769
770    use super::*;
771
772    use datafusion_physical_plan::expressions::lit;
773
774    use arrow::datatypes::{DataType, Field};
775    use datafusion_common::assert_batches_eq;
776    use datafusion_common::stats::{ColumnStatistics, Precision};
777    use futures::StreamExt;
778
779    #[tokio::test]
780    async fn exec_with_limit() -> Result<()> {
781        let task_ctx = Arc::new(TaskContext::default());
782        let batch = make_partition(7);
783        let schema = batch.schema();
784        let batches = vec![batch.clone(), batch];
785
786        let exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap();
787        assert_eq!(exec.fetch(), None);
788
789        let exec = exec.with_fetch(Some(4)).unwrap();
790        assert_eq!(exec.fetch(), Some(4));
791
792        let mut it = exec.execute(0, task_ctx)?;
793        let mut results = vec![];
794        while let Some(batch) = it.next().await {
795            results.push(batch?);
796        }
797
798        let expected = [
799            "+---+", "| i |", "+---+", "| 0 |", "| 1 |", "| 2 |", "| 3 |", "+---+",
800        ];
801        assert_batches_eq!(expected, &results);
802        Ok(())
803    }
804
805    #[tokio::test]
806    async fn values_empty_case() -> Result<()> {
807        let schema = aggr_test_schema();
808        let empty = MemorySourceConfig::try_new_as_values(schema, vec![]);
809        assert!(empty.is_err());
810        Ok(())
811    }
812
813    #[test]
814    fn new_exec_with_batches() {
815        let batch = make_partition(7);
816        let schema = batch.schema();
817        let batches = vec![batch.clone(), batch];
818        let _exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap();
819    }
820
821    #[test]
822    fn new_exec_with_batches_empty() {
823        let batch = make_partition(7);
824        let schema = batch.schema();
825        let _ = MemorySourceConfig::try_new_from_batches(schema, Vec::new()).unwrap_err();
826    }
827
828    #[test]
829    fn new_exec_with_batches_invalid_schema() {
830        let batch = make_partition(7);
831        let batches = vec![batch.clone(), batch];
832
833        let invalid_schema = Arc::new(Schema::new(vec![
834            Field::new("col0", DataType::UInt32, false),
835            Field::new("col1", DataType::Utf8, false),
836        ]));
837        let _ = MemorySourceConfig::try_new_from_batches(invalid_schema, batches)
838            .unwrap_err();
839    }
840
841    // Test issue: https://github.com/apache/datafusion/issues/8763
842    #[test]
843    fn new_exec_with_non_nullable_schema() {
844        let schema = Arc::new(Schema::new(vec![Field::new(
845            "col0",
846            DataType::UInt32,
847            false,
848        )]));
849        let _ = MemorySourceConfig::try_new_as_values(
850            Arc::clone(&schema),
851            vec![vec![lit(1u32)]],
852        )
853        .unwrap();
854        // Test that a null value is rejected
855        let _ = MemorySourceConfig::try_new_as_values(
856            schema,
857            vec![vec![lit(ScalarValue::UInt32(None))]],
858        )
859        .unwrap_err();
860    }
861
862    #[test]
863    fn values_stats_with_nulls_only() -> Result<()> {
864        let data = vec![
865            vec![lit(ScalarValue::Null)],
866            vec![lit(ScalarValue::Null)],
867            vec![lit(ScalarValue::Null)],
868        ];
869        let rows = data.len();
870        let values = MemorySourceConfig::try_new_as_values(
871            Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)])),
872            data,
873        )?;
874
875        assert_eq!(
876            values.statistics()?,
877            Statistics {
878                num_rows: Precision::Exact(rows),
879                total_byte_size: Precision::Exact(8), // not important
880                column_statistics: vec![ColumnStatistics {
881                    null_count: Precision::Exact(rows), // there are only nulls
882                    distinct_count: Precision::Absent,
883                    max_value: Precision::Absent,
884                    min_value: Precision::Absent,
885                    sum_value: Precision::Absent,
886                },],
887            }
888        );
889
890        Ok(())
891    }
892}