datafusion_datasource/
memory.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Execution plan for reading in-memory batches of data
19
20use std::any::Any;
21use std::fmt;
22use std::fmt::Debug;
23use std::sync::Arc;
24
25use crate::sink::DataSink;
26use crate::source::{DataSource, DataSourceExec};
27use async_trait::async_trait;
28use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
29use datafusion_physical_plan::memory::MemoryStream;
30use datafusion_physical_plan::projection::{
31    all_alias_free_columns, new_projections_for_columns, ProjectionExec,
32};
33use datafusion_physical_plan::{
34    common, ColumnarValue, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
35    PhysicalExpr, PlanProperties, SendableRecordBatchStream, Statistics,
36};
37
38use arrow::array::{RecordBatch, RecordBatchOptions};
39use arrow::datatypes::{Schema, SchemaRef};
40use datafusion_common::{
41    internal_err, plan_err, project_schema, Constraints, Result, ScalarValue,
42};
43use datafusion_execution::TaskContext;
44use datafusion_physical_expr::equivalence::ProjectionMapping;
45use datafusion_physical_expr::expressions::Column;
46use datafusion_physical_expr::utils::collect_columns;
47use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
48use futures::StreamExt;
49use tokio::sync::RwLock;
50
51/// Execution plan for reading in-memory batches of data
52#[derive(Clone)]
53#[deprecated(
54    since = "46.0.0",
55    note = "use MemorySourceConfig and DataSourceExec instead"
56)]
57pub struct MemoryExec {
58    inner: DataSourceExec,
59    /// The partitions to query
60    partitions: Vec<Vec<RecordBatch>>,
61    /// Optional projection
62    projection: Option<Vec<usize>>,
63    // Sort information: one or more equivalent orderings
64    sort_information: Vec<LexOrdering>,
65    /// if partition sizes should be displayed
66    show_sizes: bool,
67}
68
69#[allow(unused, deprecated)]
70impl Debug for MemoryExec {
71    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
72        self.inner.fmt_as(DisplayFormatType::Default, f)
73    }
74}
75
76#[allow(unused, deprecated)]
77impl DisplayAs for MemoryExec {
78    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
79        self.inner.fmt_as(t, f)
80    }
81}
82
83#[allow(unused, deprecated)]
84impl ExecutionPlan for MemoryExec {
85    fn name(&self) -> &'static str {
86        "MemoryExec"
87    }
88
89    /// Return a reference to Any that can be used for downcasting
90    fn as_any(&self) -> &dyn Any {
91        self
92    }
93
94    fn properties(&self) -> &PlanProperties {
95        self.inner.properties()
96    }
97
98    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
99        // This is a leaf node and has no children
100        vec![]
101    }
102
103    fn with_new_children(
104        self: Arc<Self>,
105        children: Vec<Arc<dyn ExecutionPlan>>,
106    ) -> Result<Arc<dyn ExecutionPlan>> {
107        // MemoryExec has no children
108        if children.is_empty() {
109            Ok(self)
110        } else {
111            internal_err!("Children cannot be replaced in {self:?}")
112        }
113    }
114
115    fn execute(
116        &self,
117        partition: usize,
118        context: Arc<TaskContext>,
119    ) -> Result<SendableRecordBatchStream> {
120        self.inner.execute(partition, context)
121    }
122
123    /// We recompute the statistics dynamically from the arrow metadata as it is pretty cheap to do so
124    fn statistics(&self) -> Result<Statistics> {
125        self.inner.statistics()
126    }
127
128    fn try_swapping_with_projection(
129        &self,
130        projection: &ProjectionExec,
131    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
132        self.inner.try_swapping_with_projection(projection)
133    }
134}
135
136#[allow(unused, deprecated)]
137impl MemoryExec {
138    /// Create a new execution plan for reading in-memory record batches
139    /// The provided `schema` should not have the projection applied.
140    pub fn try_new(
141        partitions: &[Vec<RecordBatch>],
142        schema: SchemaRef,
143        projection: Option<Vec<usize>>,
144    ) -> Result<Self> {
145        let source = MemorySourceConfig::try_new(partitions, schema, projection.clone())?;
146        let data_source = DataSourceExec::new(Arc::new(source));
147        Ok(Self {
148            inner: data_source,
149            partitions: partitions.to_vec(),
150            projection,
151            sort_information: vec![],
152            show_sizes: true,
153        })
154    }
155
156    /// Create a new execution plan from a list of constant values (`ValuesExec`)
157    pub fn try_new_as_values(
158        schema: SchemaRef,
159        data: Vec<Vec<Arc<dyn PhysicalExpr>>>,
160    ) -> Result<Self> {
161        if data.is_empty() {
162            return plan_err!("Values list cannot be empty");
163        }
164
165        let n_row = data.len();
166        let n_col = schema.fields().len();
167
168        // We have this single row batch as a placeholder to satisfy evaluation argument
169        // and generate a single output row
170        let placeholder_schema = Arc::new(Schema::empty());
171        let placeholder_batch = RecordBatch::try_new_with_options(
172            Arc::clone(&placeholder_schema),
173            vec![],
174            &RecordBatchOptions::new().with_row_count(Some(1)),
175        )?;
176
177        // Evaluate each column
178        let arrays = (0..n_col)
179            .map(|j| {
180                (0..n_row)
181                    .map(|i| {
182                        let expr = &data[i][j];
183                        let result = expr.evaluate(&placeholder_batch)?;
184
185                        match result {
186                            ColumnarValue::Scalar(scalar) => Ok(scalar),
187                            ColumnarValue::Array(array) if array.len() == 1 => {
188                                ScalarValue::try_from_array(&array, 0)
189                            }
190                            ColumnarValue::Array(_) => {
191                                plan_err!("Cannot have array values in a values list")
192                            }
193                        }
194                    })
195                    .collect::<Result<Vec<_>>>()
196                    .and_then(ScalarValue::iter_to_array)
197            })
198            .collect::<Result<Vec<_>>>()?;
199
200        let batch = RecordBatch::try_new_with_options(
201            Arc::clone(&schema),
202            arrays,
203            &RecordBatchOptions::new().with_row_count(Some(n_row)),
204        )?;
205
206        let partitions = vec![batch];
207        Self::try_new_from_batches(Arc::clone(&schema), partitions)
208    }
209
210    /// Create a new plan using the provided schema and batches.
211    ///
212    /// Errors if any of the batches don't match the provided schema, or if no
213    /// batches are provided.
214    pub fn try_new_from_batches(
215        schema: SchemaRef,
216        batches: Vec<RecordBatch>,
217    ) -> Result<Self> {
218        if batches.is_empty() {
219            return plan_err!("Values list cannot be empty");
220        }
221
222        for batch in &batches {
223            let batch_schema = batch.schema();
224            if batch_schema != schema {
225                return plan_err!(
226                    "Batch has invalid schema. Expected: {}, got: {}",
227                    schema,
228                    batch_schema
229                );
230            }
231        }
232
233        let partitions = vec![batches];
234        let source = MemorySourceConfig {
235            partitions: partitions.clone(),
236            schema: Arc::clone(&schema),
237            projected_schema: Arc::clone(&schema),
238            projection: None,
239            sort_information: vec![],
240            show_sizes: true,
241            fetch: None,
242        };
243        let data_source = DataSourceExec::new(Arc::new(source));
244        Ok(Self {
245            inner: data_source,
246            partitions,
247            projection: None,
248            sort_information: vec![],
249            show_sizes: true,
250        })
251    }
252
253    fn memory_source_config(&self) -> MemorySourceConfig {
254        self.inner
255            .data_source()
256            .as_any()
257            .downcast_ref::<MemorySourceConfig>()
258            .unwrap()
259            .clone()
260    }
261
262    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
263        self.inner = self.inner.with_constraints(constraints);
264        self
265    }
266
267    /// Set `show_sizes` to determine whether to display partition sizes
268    pub fn with_show_sizes(mut self, show_sizes: bool) -> Self {
269        let mut memory_source = self.memory_source_config();
270        memory_source.show_sizes = show_sizes;
271        self.show_sizes = show_sizes;
272        self.inner = DataSourceExec::new(Arc::new(memory_source));
273        self
274    }
275
276    /// Ref to constraints
277    pub fn constraints(&self) -> &Constraints {
278        self.properties().equivalence_properties().constraints()
279    }
280
281    /// Ref to partitions
282    pub fn partitions(&self) -> &[Vec<RecordBatch>] {
283        &self.partitions
284    }
285
286    /// Ref to projection
287    pub fn projection(&self) -> &Option<Vec<usize>> {
288        &self.projection
289    }
290
291    /// Show sizes
292    pub fn show_sizes(&self) -> bool {
293        self.show_sizes
294    }
295
296    /// Ref to sort information
297    pub fn sort_information(&self) -> &[LexOrdering] {
298        &self.sort_information
299    }
300
301    /// A memory table can be ordered by multiple expressions simultaneously.
302    /// [`EquivalenceProperties`] keeps track of expressions that describe the
303    /// global ordering of the schema. These columns are not necessarily same; e.g.
304    /// ```text
305    /// ┌-------┐
306    /// | a | b |
307    /// |---|---|
308    /// | 1 | 9 |
309    /// | 2 | 8 |
310    /// | 3 | 7 |
311    /// | 5 | 5 |
312    /// └---┴---┘
313    /// ```
314    /// where both `a ASC` and `b DESC` can describe the table ordering. With
315    /// [`EquivalenceProperties`], we can keep track of these equivalences
316    /// and treat `a ASC` and `b DESC` as the same ordering requirement.
317    ///
318    /// Note that if there is an internal projection, that projection will be
319    /// also applied to the given `sort_information`.
320    pub fn try_with_sort_information(
321        mut self,
322        sort_information: Vec<LexOrdering>,
323    ) -> Result<Self> {
324        self.sort_information = sort_information.clone();
325        let mut memory_source = self.memory_source_config();
326        memory_source = memory_source.try_with_sort_information(sort_information)?;
327        self.inner = DataSourceExec::new(Arc::new(memory_source));
328        Ok(self)
329    }
330
331    /// Arc clone of ref to original schema
332    pub fn original_schema(&self) -> SchemaRef {
333        Arc::clone(&self.inner.schema())
334    }
335
336    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
337    fn compute_properties(
338        schema: SchemaRef,
339        orderings: &[LexOrdering],
340        constraints: Constraints,
341        partitions: &[Vec<RecordBatch>],
342    ) -> PlanProperties {
343        PlanProperties::new(
344            EquivalenceProperties::new_with_orderings(schema, orderings)
345                .with_constraints(constraints),
346            Partitioning::UnknownPartitioning(partitions.len()),
347            EmissionType::Incremental,
348            Boundedness::Bounded,
349        )
350    }
351}
352
353/// Data source configuration for reading in-memory batches of data
354#[derive(Clone, Debug)]
355pub struct MemorySourceConfig {
356    /// The partitions to query
357    partitions: Vec<Vec<RecordBatch>>,
358    /// Schema representing the data before projection
359    schema: SchemaRef,
360    /// Schema representing the data after the optional projection is applied
361    projected_schema: SchemaRef,
362    /// Optional projection
363    projection: Option<Vec<usize>>,
364    /// Sort information: one or more equivalent orderings
365    sort_information: Vec<LexOrdering>,
366    /// if partition sizes should be displayed
367    show_sizes: bool,
368    /// The maximum number of records to read from this plan. If `None`,
369    /// all records after filtering are returned.
370    fetch: Option<usize>,
371}
372
373impl DataSource for MemorySourceConfig {
374    fn open(
375        &self,
376        partition: usize,
377        _context: Arc<TaskContext>,
378    ) -> Result<SendableRecordBatchStream> {
379        Ok(Box::pin(
380            MemoryStream::try_new(
381                self.partitions[partition].clone(),
382                Arc::clone(&self.projected_schema),
383                self.projection.clone(),
384            )?
385            .with_fetch(self.fetch),
386        ))
387    }
388
389    fn as_any(&self) -> &dyn Any {
390        self
391    }
392
393    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
394        match t {
395            DisplayFormatType::Default | DisplayFormatType::Verbose => {
396                let partition_sizes: Vec<_> =
397                    self.partitions.iter().map(|b| b.len()).collect();
398
399                let output_ordering = self
400                    .sort_information
401                    .first()
402                    .map(|output_ordering| {
403                        format!(", output_ordering={}", output_ordering)
404                    })
405                    .unwrap_or_default();
406
407                let eq_properties = self.eq_properties();
408                let constraints = eq_properties.constraints();
409                let constraints = if constraints.is_empty() {
410                    String::new()
411                } else {
412                    format!(", {}", constraints)
413                };
414
415                let limit = self
416                    .fetch
417                    .map_or(String::new(), |limit| format!(", fetch={}", limit));
418                if self.show_sizes {
419                    write!(
420                                f,
421                                "partitions={}, partition_sizes={partition_sizes:?}{limit}{output_ordering}{constraints}",
422                                partition_sizes.len(),
423                            )
424                } else {
425                    write!(
426                        f,
427                        "partitions={}{limit}{output_ordering}{constraints}",
428                        partition_sizes.len(),
429                    )
430                }
431            }
432            DisplayFormatType::TreeRender => {
433                let total_rows = self.partitions.iter().map(|b| b.len()).sum::<usize>();
434                let total_bytes: usize = self
435                    .partitions
436                    .iter()
437                    .flatten()
438                    .map(|batch| batch.get_array_memory_size())
439                    .sum();
440                writeln!(f, "format=memory")?;
441                writeln!(f, "rows={total_rows}")?;
442                writeln!(f, "bytes={total_bytes}")?;
443                Ok(())
444            }
445        }
446    }
447
448    fn output_partitioning(&self) -> Partitioning {
449        Partitioning::UnknownPartitioning(self.partitions.len())
450    }
451
452    fn eq_properties(&self) -> EquivalenceProperties {
453        EquivalenceProperties::new_with_orderings(
454            Arc::clone(&self.projected_schema),
455            self.sort_information.as_slice(),
456        )
457    }
458
459    fn statistics(&self) -> Result<Statistics> {
460        Ok(common::compute_record_batch_statistics(
461            &self.partitions,
462            &self.schema,
463            self.projection.clone(),
464        ))
465    }
466
467    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
468        let source = self.clone();
469        Some(Arc::new(source.with_limit(limit)))
470    }
471
472    fn fetch(&self) -> Option<usize> {
473        self.fetch
474    }
475
476    fn try_swapping_with_projection(
477        &self,
478        projection: &ProjectionExec,
479    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
480        // If there is any non-column or alias-carrier expression, Projection should not be removed.
481        // This process can be moved into MemoryExec, but it would be an overlap of their responsibility.
482        all_alias_free_columns(projection.expr())
483            .then(|| {
484                let all_projections = (0..self.schema.fields().len()).collect();
485                let new_projections = new_projections_for_columns(
486                    projection,
487                    self.projection().as_ref().unwrap_or(&all_projections),
488                );
489
490                MemorySourceConfig::try_new_exec(
491                    self.partitions(),
492                    self.original_schema(),
493                    Some(new_projections),
494                )
495                .map(|e| e as _)
496            })
497            .transpose()
498    }
499}
500
501impl MemorySourceConfig {
502    /// Create a new `MemorySourceConfig` for reading in-memory record batches
503    /// The provided `schema` should not have the projection applied.
504    pub fn try_new(
505        partitions: &[Vec<RecordBatch>],
506        schema: SchemaRef,
507        projection: Option<Vec<usize>>,
508    ) -> Result<Self> {
509        let projected_schema = project_schema(&schema, projection.as_ref())?;
510        Ok(Self {
511            partitions: partitions.to_vec(),
512            schema,
513            projected_schema,
514            projection,
515            sort_information: vec![],
516            show_sizes: true,
517            fetch: None,
518        })
519    }
520
521    /// Create a new `DataSourceExec` plan for reading in-memory record batches
522    /// The provided `schema` should not have the projection applied.
523    pub fn try_new_exec(
524        partitions: &[Vec<RecordBatch>],
525        schema: SchemaRef,
526        projection: Option<Vec<usize>>,
527    ) -> Result<Arc<DataSourceExec>> {
528        let source = Self::try_new(partitions, schema, projection)?;
529        Ok(DataSourceExec::from_data_source(source))
530    }
531
532    /// Create a new execution plan from a list of constant values (`ValuesExec`)
533    pub fn try_new_as_values(
534        schema: SchemaRef,
535        data: Vec<Vec<Arc<dyn PhysicalExpr>>>,
536    ) -> Result<Arc<DataSourceExec>> {
537        if data.is_empty() {
538            return plan_err!("Values list cannot be empty");
539        }
540
541        let n_row = data.len();
542        let n_col = schema.fields().len();
543
544        // We have this single row batch as a placeholder to satisfy evaluation argument
545        // and generate a single output row
546        let placeholder_schema = Arc::new(Schema::empty());
547        let placeholder_batch = RecordBatch::try_new_with_options(
548            Arc::clone(&placeholder_schema),
549            vec![],
550            &RecordBatchOptions::new().with_row_count(Some(1)),
551        )?;
552
553        // Evaluate each column
554        let arrays = (0..n_col)
555            .map(|j| {
556                (0..n_row)
557                    .map(|i| {
558                        let expr = &data[i][j];
559                        let result = expr.evaluate(&placeholder_batch)?;
560
561                        match result {
562                            ColumnarValue::Scalar(scalar) => Ok(scalar),
563                            ColumnarValue::Array(array) if array.len() == 1 => {
564                                ScalarValue::try_from_array(&array, 0)
565                            }
566                            ColumnarValue::Array(_) => {
567                                plan_err!("Cannot have array values in a values list")
568                            }
569                        }
570                    })
571                    .collect::<Result<Vec<_>>>()
572                    .and_then(ScalarValue::iter_to_array)
573            })
574            .collect::<Result<Vec<_>>>()?;
575
576        let batch = RecordBatch::try_new_with_options(
577            Arc::clone(&schema),
578            arrays,
579            &RecordBatchOptions::new().with_row_count(Some(n_row)),
580        )?;
581
582        let partitions = vec![batch];
583        Self::try_new_from_batches(Arc::clone(&schema), partitions)
584    }
585
586    /// Create a new plan using the provided schema and batches.
587    ///
588    /// Errors if any of the batches don't match the provided schema, or if no
589    /// batches are provided.
590    pub fn try_new_from_batches(
591        schema: SchemaRef,
592        batches: Vec<RecordBatch>,
593    ) -> Result<Arc<DataSourceExec>> {
594        if batches.is_empty() {
595            return plan_err!("Values list cannot be empty");
596        }
597
598        for batch in &batches {
599            let batch_schema = batch.schema();
600            if batch_schema != schema {
601                return plan_err!(
602                    "Batch has invalid schema. Expected: {}, got: {}",
603                    schema,
604                    batch_schema
605                );
606            }
607        }
608
609        let partitions = vec![batches];
610        let source = Self {
611            partitions,
612            schema: Arc::clone(&schema),
613            projected_schema: Arc::clone(&schema),
614            projection: None,
615            sort_information: vec![],
616            show_sizes: true,
617            fetch: None,
618        };
619        Ok(DataSourceExec::from_data_source(source))
620    }
621
622    /// Set the limit of the files
623    pub fn with_limit(mut self, limit: Option<usize>) -> Self {
624        self.fetch = limit;
625        self
626    }
627
628    /// Set `show_sizes` to determine whether to display partition sizes
629    pub fn with_show_sizes(mut self, show_sizes: bool) -> Self {
630        self.show_sizes = show_sizes;
631        self
632    }
633
634    /// Ref to partitions
635    pub fn partitions(&self) -> &[Vec<RecordBatch>] {
636        &self.partitions
637    }
638
639    /// Ref to projection
640    pub fn projection(&self) -> &Option<Vec<usize>> {
641        &self.projection
642    }
643
644    /// Show sizes
645    pub fn show_sizes(&self) -> bool {
646        self.show_sizes
647    }
648
649    /// Ref to sort information
650    pub fn sort_information(&self) -> &[LexOrdering] {
651        &self.sort_information
652    }
653
654    /// A memory table can be ordered by multiple expressions simultaneously.
655    /// [`EquivalenceProperties`] keeps track of expressions that describe the
656    /// global ordering of the schema. These columns are not necessarily same; e.g.
657    /// ```text
658    /// ┌-------┐
659    /// | a | b |
660    /// |---|---|
661    /// | 1 | 9 |
662    /// | 2 | 8 |
663    /// | 3 | 7 |
664    /// | 5 | 5 |
665    /// └---┴---┘
666    /// ```
667    /// where both `a ASC` and `b DESC` can describe the table ordering. With
668    /// [`EquivalenceProperties`], we can keep track of these equivalences
669    /// and treat `a ASC` and `b DESC` as the same ordering requirement.
670    ///
671    /// Note that if there is an internal projection, that projection will be
672    /// also applied to the given `sort_information`.
673    pub fn try_with_sort_information(
674        mut self,
675        mut sort_information: Vec<LexOrdering>,
676    ) -> Result<Self> {
677        // All sort expressions must refer to the original schema
678        let fields = self.schema.fields();
679        let ambiguous_column = sort_information
680            .iter()
681            .flat_map(|ordering| ordering.clone())
682            .flat_map(|expr| collect_columns(&expr.expr))
683            .find(|col| {
684                fields
685                    .get(col.index())
686                    .map(|field| field.name() != col.name())
687                    .unwrap_or(true)
688            });
689        if let Some(col) = ambiguous_column {
690            return internal_err!(
691                "Column {:?} is not found in the original schema of the MemorySourceConfig",
692                col
693            );
694        }
695
696        // If there is a projection on the source, we also need to project orderings
697        if let Some(projection) = &self.projection {
698            let base_eqp = EquivalenceProperties::new_with_orderings(
699                self.original_schema(),
700                &sort_information,
701            );
702            let proj_exprs = projection
703                .iter()
704                .map(|idx| {
705                    let base_schema = self.original_schema();
706                    let name = base_schema.field(*idx).name();
707                    (Arc::new(Column::new(name, *idx)) as _, name.to_string())
708                })
709                .collect::<Vec<_>>();
710            let projection_mapping =
711                ProjectionMapping::try_new(&proj_exprs, &self.original_schema())?;
712            sort_information = base_eqp
713                .project(&projection_mapping, Arc::clone(&self.projected_schema))
714                .into_oeq_class()
715                .into_inner();
716        }
717
718        self.sort_information = sort_information;
719        Ok(self)
720    }
721
722    /// Arc clone of ref to original schema
723    pub fn original_schema(&self) -> SchemaRef {
724        Arc::clone(&self.schema)
725    }
726}
727
728/// Type alias for partition data
729pub type PartitionData = Arc<RwLock<Vec<RecordBatch>>>;
730
731/// Implements for writing to a [`MemTable`]
732///
733/// [`MemTable`]: <https://docs.rs/datafusion/latest/datafusion/datasource/memory/struct.MemTable.html>
734pub struct MemSink {
735    /// Target locations for writing data
736    batches: Vec<PartitionData>,
737    schema: SchemaRef,
738}
739
740impl Debug for MemSink {
741    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
742        f.debug_struct("MemSink")
743            .field("num_partitions", &self.batches.len())
744            .finish()
745    }
746}
747
748impl DisplayAs for MemSink {
749    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
750        match t {
751            DisplayFormatType::Default | DisplayFormatType::Verbose => {
752                let partition_count = self.batches.len();
753                write!(f, "MemoryTable (partitions={partition_count})")
754            }
755            DisplayFormatType::TreeRender => {
756                // TODO: collect info
757                write!(f, "")
758            }
759        }
760    }
761}
762
763impl MemSink {
764    /// Creates a new [`MemSink`].
765    ///
766    /// The caller is responsible for ensuring that there is at least one partition to insert into.
767    pub fn try_new(batches: Vec<PartitionData>, schema: SchemaRef) -> Result<Self> {
768        if batches.is_empty() {
769            return plan_err!("Cannot insert into MemTable with zero partitions");
770        }
771        Ok(Self { batches, schema })
772    }
773}
774
775#[async_trait]
776impl DataSink for MemSink {
777    fn as_any(&self) -> &dyn Any {
778        self
779    }
780
781    fn schema(&self) -> &SchemaRef {
782        &self.schema
783    }
784
785    async fn write_all(
786        &self,
787        mut data: SendableRecordBatchStream,
788        _context: &Arc<TaskContext>,
789    ) -> Result<u64> {
790        let num_partitions = self.batches.len();
791
792        // buffer up the data round robin style into num_partitions
793
794        let mut new_batches = vec![vec![]; num_partitions];
795        let mut i = 0;
796        let mut row_count = 0;
797        while let Some(batch) = data.next().await.transpose()? {
798            row_count += batch.num_rows();
799            new_batches[i].push(batch);
800            i = (i + 1) % num_partitions;
801        }
802
803        // write the outputs into the batches
804        for (target, mut batches) in self.batches.iter().zip(new_batches.into_iter()) {
805            // Append all the new batches in one go to minimize locking overhead
806            target.write().await.append(&mut batches);
807        }
808
809        Ok(row_count as u64)
810    }
811}
812
813#[cfg(test)]
814mod memory_source_tests {
815    use std::sync::Arc;
816
817    use crate::memory::MemorySourceConfig;
818    use crate::source::DataSourceExec;
819    use datafusion_physical_plan::ExecutionPlan;
820
821    use arrow::compute::SortOptions;
822    use arrow::datatypes::{DataType, Field, Schema};
823    use datafusion_physical_expr::expressions::col;
824    use datafusion_physical_expr::PhysicalSortExpr;
825    use datafusion_physical_expr_common::sort_expr::LexOrdering;
826
827    #[test]
828    fn test_memory_order_eq() -> datafusion_common::Result<()> {
829        let schema = Arc::new(Schema::new(vec![
830            Field::new("a", DataType::Int64, false),
831            Field::new("b", DataType::Int64, false),
832            Field::new("c", DataType::Int64, false),
833        ]));
834        let sort1 = LexOrdering::new(vec![
835            PhysicalSortExpr {
836                expr: col("a", &schema)?,
837                options: SortOptions::default(),
838            },
839            PhysicalSortExpr {
840                expr: col("b", &schema)?,
841                options: SortOptions::default(),
842            },
843        ]);
844        let sort2 = LexOrdering::new(vec![PhysicalSortExpr {
845            expr: col("c", &schema)?,
846            options: SortOptions::default(),
847        }]);
848        let mut expected_output_order = LexOrdering::default();
849        expected_output_order.extend(sort1.clone());
850        expected_output_order.extend(sort2.clone());
851
852        let sort_information = vec![sort1.clone(), sort2.clone()];
853        let mem_exec = DataSourceExec::from_data_source(
854            MemorySourceConfig::try_new(&[vec![]], schema, None)?
855                .try_with_sort_information(sort_information)?,
856        );
857
858        assert_eq!(
859            mem_exec.properties().output_ordering().unwrap(),
860            &expected_output_order
861        );
862        let eq_properties = mem_exec.properties().equivalence_properties();
863        assert!(eq_properties.oeq_class().contains(&sort1));
864        assert!(eq_properties.oeq_class().contains(&sort2));
865        Ok(())
866    }
867}
868
869#[cfg(test)]
870mod tests {
871    use crate::tests::{aggr_test_schema, make_partition};
872
873    use super::*;
874
875    use datafusion_physical_plan::expressions::lit;
876
877    use arrow::datatypes::{DataType, Field};
878    use datafusion_common::assert_batches_eq;
879    use datafusion_common::stats::{ColumnStatistics, Precision};
880    use futures::StreamExt;
881
882    #[tokio::test]
883    async fn exec_with_limit() -> Result<()> {
884        let task_ctx = Arc::new(TaskContext::default());
885        let batch = make_partition(7);
886        let schema = batch.schema();
887        let batches = vec![batch.clone(), batch];
888
889        let exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap();
890        assert_eq!(exec.fetch(), None);
891
892        let exec = exec.with_fetch(Some(4)).unwrap();
893        assert_eq!(exec.fetch(), Some(4));
894
895        let mut it = exec.execute(0, task_ctx)?;
896        let mut results = vec![];
897        while let Some(batch) = it.next().await {
898            results.push(batch?);
899        }
900
901        let expected = [
902            "+---+", "| i |", "+---+", "| 0 |", "| 1 |", "| 2 |", "| 3 |", "+---+",
903        ];
904        assert_batches_eq!(expected, &results);
905        Ok(())
906    }
907
908    #[tokio::test]
909    async fn values_empty_case() -> Result<()> {
910        let schema = aggr_test_schema();
911        let empty = MemorySourceConfig::try_new_as_values(schema, vec![]);
912        assert!(empty.is_err());
913        Ok(())
914    }
915
916    #[test]
917    fn new_exec_with_batches() {
918        let batch = make_partition(7);
919        let schema = batch.schema();
920        let batches = vec![batch.clone(), batch];
921        let _exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap();
922    }
923
924    #[test]
925    fn new_exec_with_batches_empty() {
926        let batch = make_partition(7);
927        let schema = batch.schema();
928        let _ = MemorySourceConfig::try_new_from_batches(schema, Vec::new()).unwrap_err();
929    }
930
931    #[test]
932    fn new_exec_with_batches_invalid_schema() {
933        let batch = make_partition(7);
934        let batches = vec![batch.clone(), batch];
935
936        let invalid_schema = Arc::new(Schema::new(vec![
937            Field::new("col0", DataType::UInt32, false),
938            Field::new("col1", DataType::Utf8, false),
939        ]));
940        let _ = MemorySourceConfig::try_new_from_batches(invalid_schema, batches)
941            .unwrap_err();
942    }
943
944    // Test issue: https://github.com/apache/datafusion/issues/8763
945    #[test]
946    fn new_exec_with_non_nullable_schema() {
947        let schema = Arc::new(Schema::new(vec![Field::new(
948            "col0",
949            DataType::UInt32,
950            false,
951        )]));
952        let _ = MemorySourceConfig::try_new_as_values(
953            Arc::clone(&schema),
954            vec![vec![lit(1u32)]],
955        )
956        .unwrap();
957        // Test that a null value is rejected
958        let _ = MemorySourceConfig::try_new_as_values(
959            schema,
960            vec![vec![lit(ScalarValue::UInt32(None))]],
961        )
962        .unwrap_err();
963    }
964
965    #[test]
966    fn values_stats_with_nulls_only() -> Result<()> {
967        let data = vec![
968            vec![lit(ScalarValue::Null)],
969            vec![lit(ScalarValue::Null)],
970            vec![lit(ScalarValue::Null)],
971        ];
972        let rows = data.len();
973        let values = MemorySourceConfig::try_new_as_values(
974            Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)])),
975            data,
976        )?;
977
978        assert_eq!(
979            values.statistics()?,
980            Statistics {
981                num_rows: Precision::Exact(rows),
982                total_byte_size: Precision::Exact(8), // not important
983                column_statistics: vec![ColumnStatistics {
984                    null_count: Precision::Exact(rows), // there are only nulls
985                    distinct_count: Precision::Absent,
986                    max_value: Precision::Absent,
987                    min_value: Precision::Absent,
988                    sum_value: Precision::Absent,
989                },],
990            }
991        );
992
993        Ok(())
994    }
995}