datafusion_datasource/
memory.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::cmp::Ordering;
20use std::collections::BinaryHeap;
21use std::fmt;
22use std::fmt::Debug;
23use std::ops::Deref;
24use std::sync::Arc;
25
26use crate::sink::DataSink;
27use crate::source::{DataSource, DataSourceExec};
28
29use arrow::array::{RecordBatch, RecordBatchOptions};
30use arrow::datatypes::{Schema, SchemaRef};
31use datafusion_common::{internal_err, plan_err, project_schema, Result, ScalarValue};
32use datafusion_execution::TaskContext;
33use datafusion_physical_expr::equivalence::{
34    OrderingEquivalenceClass, ProjectionMapping,
35};
36use datafusion_physical_expr::expressions::Column;
37use datafusion_physical_expr::utils::collect_columns;
38use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
39use datafusion_physical_plan::memory::MemoryStream;
40use datafusion_physical_plan::projection::{
41    all_alias_free_columns, new_projections_for_columns, ProjectionExpr,
42};
43use datafusion_physical_plan::{
44    common, ColumnarValue, DisplayAs, DisplayFormatType, Partitioning, PhysicalExpr,
45    SendableRecordBatchStream, Statistics,
46};
47
48use async_trait::async_trait;
49use datafusion_physical_plan::coop::cooperative;
50use datafusion_physical_plan::execution_plan::SchedulingType;
51use futures::StreamExt;
52use itertools::Itertools;
53use tokio::sync::RwLock;
54
55/// Data source configuration for reading in-memory batches of data
56#[derive(Clone, Debug)]
57pub struct MemorySourceConfig {
58    /// The partitions to query.
59    ///
60    /// Each partition is a `Vec<RecordBatch>`.
61    partitions: Vec<Vec<RecordBatch>>,
62    /// Schema representing the data before projection
63    schema: SchemaRef,
64    /// Schema representing the data after the optional projection is applied
65    projected_schema: SchemaRef,
66    /// Optional projection
67    projection: Option<Vec<usize>>,
68    /// Sort information: one or more equivalent orderings
69    sort_information: Vec<LexOrdering>,
70    /// if partition sizes should be displayed
71    show_sizes: bool,
72    /// The maximum number of records to read from this plan. If `None`,
73    /// all records after filtering are returned.
74    fetch: Option<usize>,
75}
76
77impl DataSource for MemorySourceConfig {
78    fn open(
79        &self,
80        partition: usize,
81        _context: Arc<TaskContext>,
82    ) -> Result<SendableRecordBatchStream> {
83        Ok(Box::pin(cooperative(
84            MemoryStream::try_new(
85                self.partitions[partition].clone(),
86                Arc::clone(&self.projected_schema),
87                self.projection.clone(),
88            )?
89            .with_fetch(self.fetch),
90        )))
91    }
92
93    fn as_any(&self) -> &dyn Any {
94        self
95    }
96
97    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
98        match t {
99            DisplayFormatType::Default | DisplayFormatType::Verbose => {
100                let partition_sizes: Vec<_> =
101                    self.partitions.iter().map(|b| b.len()).collect();
102
103                let output_ordering = self
104                    .sort_information
105                    .first()
106                    .map(|output_ordering| format!(", output_ordering={output_ordering}"))
107                    .unwrap_or_default();
108
109                let eq_properties = self.eq_properties();
110                let constraints = eq_properties.constraints();
111                let constraints = if constraints.is_empty() {
112                    String::new()
113                } else {
114                    format!(", {constraints}")
115                };
116
117                let limit = self
118                    .fetch
119                    .map_or(String::new(), |limit| format!(", fetch={limit}"));
120                if self.show_sizes {
121                    write!(
122                                f,
123                                "partitions={}, partition_sizes={partition_sizes:?}{limit}{output_ordering}{constraints}",
124                                partition_sizes.len(),
125                            )
126                } else {
127                    write!(
128                        f,
129                        "partitions={}{limit}{output_ordering}{constraints}",
130                        partition_sizes.len(),
131                    )
132                }
133            }
134            DisplayFormatType::TreeRender => {
135                let total_rows = self.partitions.iter().map(|b| b.len()).sum::<usize>();
136                let total_bytes: usize = self
137                    .partitions
138                    .iter()
139                    .flatten()
140                    .map(|batch| batch.get_array_memory_size())
141                    .sum();
142                writeln!(f, "format=memory")?;
143                writeln!(f, "rows={total_rows}")?;
144                writeln!(f, "bytes={total_bytes}")?;
145                Ok(())
146            }
147        }
148    }
149
150    /// If possible, redistribute batches across partitions according to their size.
151    ///
152    /// Returns `Ok(None)` if unable to repartition. Preserve output ordering if exists.
153    /// Refer to [`DataSource::repartitioned`] for further details.
154    fn repartitioned(
155        &self,
156        target_partitions: usize,
157        _repartition_file_min_size: usize,
158        output_ordering: Option<LexOrdering>,
159    ) -> Result<Option<Arc<dyn DataSource>>> {
160        if self.partitions.is_empty() || self.partitions.len() >= target_partitions
161        // if have no partitions, or already have more partitions than desired, do not repartition
162        {
163            return Ok(None);
164        }
165
166        let maybe_repartitioned = if let Some(output_ordering) = output_ordering {
167            self.repartition_preserving_order(target_partitions, output_ordering)?
168        } else {
169            self.repartition_evenly_by_size(target_partitions)?
170        };
171
172        if let Some(repartitioned) = maybe_repartitioned {
173            Ok(Some(Arc::new(Self::try_new(
174                &repartitioned,
175                self.original_schema(),
176                self.projection.clone(),
177            )?)))
178        } else {
179            Ok(None)
180        }
181    }
182
183    fn output_partitioning(&self) -> Partitioning {
184        Partitioning::UnknownPartitioning(self.partitions.len())
185    }
186
187    fn eq_properties(&self) -> EquivalenceProperties {
188        EquivalenceProperties::new_with_orderings(
189            Arc::clone(&self.projected_schema),
190            self.sort_information.clone(),
191        )
192    }
193
194    fn scheduling_type(&self) -> SchedulingType {
195        SchedulingType::Cooperative
196    }
197
198    fn statistics(&self) -> Result<Statistics> {
199        Ok(common::compute_record_batch_statistics(
200            &self.partitions,
201            &self.schema,
202            self.projection.clone(),
203        ))
204    }
205
206    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
207        let source = self.clone();
208        Some(Arc::new(source.with_limit(limit)))
209    }
210
211    fn fetch(&self) -> Option<usize> {
212        self.fetch
213    }
214
215    fn try_swapping_with_projection(
216        &self,
217        projection: &[ProjectionExpr],
218    ) -> Result<Option<Arc<dyn DataSource>>> {
219        // If there is any non-column or alias-carrier expression, Projection should not be removed.
220        // This process can be moved into MemoryExec, but it would be an overlap of their responsibility.
221        all_alias_free_columns(projection)
222            .then(|| {
223                let all_projections = (0..self.schema.fields().len()).collect();
224                let new_projections = new_projections_for_columns(
225                    projection,
226                    self.projection().as_ref().unwrap_or(&all_projections),
227                );
228
229                MemorySourceConfig::try_new(
230                    self.partitions(),
231                    self.original_schema(),
232                    Some(new_projections),
233                )
234                .map(|s| Arc::new(s) as Arc<dyn DataSource>)
235            })
236            .transpose()
237    }
238}
239
240impl MemorySourceConfig {
241    /// Create a new `MemorySourceConfig` for reading in-memory record batches
242    /// The provided `schema` should not have the projection applied.
243    pub fn try_new(
244        partitions: &[Vec<RecordBatch>],
245        schema: SchemaRef,
246        projection: Option<Vec<usize>>,
247    ) -> Result<Self> {
248        let projected_schema = project_schema(&schema, projection.as_ref())?;
249        Ok(Self {
250            partitions: partitions.to_vec(),
251            schema,
252            projected_schema,
253            projection,
254            sort_information: vec![],
255            show_sizes: true,
256            fetch: None,
257        })
258    }
259
260    /// Create a new `DataSourceExec` plan for reading in-memory record batches
261    /// The provided `schema` should not have the projection applied.
262    pub fn try_new_exec(
263        partitions: &[Vec<RecordBatch>],
264        schema: SchemaRef,
265        projection: Option<Vec<usize>>,
266    ) -> Result<Arc<DataSourceExec>> {
267        let source = Self::try_new(partitions, schema, projection)?;
268        Ok(DataSourceExec::from_data_source(source))
269    }
270
271    /// Create a new execution plan from a list of constant values (`ValuesExec`)
272    pub fn try_new_as_values(
273        schema: SchemaRef,
274        data: Vec<Vec<Arc<dyn PhysicalExpr>>>,
275    ) -> Result<Arc<DataSourceExec>> {
276        if data.is_empty() {
277            return plan_err!("Values list cannot be empty");
278        }
279
280        let n_row = data.len();
281        let n_col = schema.fields().len();
282
283        // We have this single row batch as a placeholder to satisfy evaluation argument
284        // and generate a single output row
285        let placeholder_schema = Arc::new(Schema::empty());
286        let placeholder_batch = RecordBatch::try_new_with_options(
287            Arc::clone(&placeholder_schema),
288            vec![],
289            &RecordBatchOptions::new().with_row_count(Some(1)),
290        )?;
291
292        // Evaluate each column
293        let arrays = (0..n_col)
294            .map(|j| {
295                (0..n_row)
296                    .map(|i| {
297                        let expr = &data[i][j];
298                        let result = expr.evaluate(&placeholder_batch)?;
299
300                        match result {
301                            ColumnarValue::Scalar(scalar) => Ok(scalar),
302                            ColumnarValue::Array(array) if array.len() == 1 => {
303                                ScalarValue::try_from_array(&array, 0)
304                            }
305                            ColumnarValue::Array(_) => {
306                                plan_err!("Cannot have array values in a values list")
307                            }
308                        }
309                    })
310                    .collect::<Result<Vec<_>>>()
311                    .and_then(ScalarValue::iter_to_array)
312            })
313            .collect::<Result<Vec<_>>>()?;
314
315        let batch = RecordBatch::try_new_with_options(
316            Arc::clone(&schema),
317            arrays,
318            &RecordBatchOptions::new().with_row_count(Some(n_row)),
319        )?;
320
321        let partitions = vec![batch];
322        Self::try_new_from_batches(Arc::clone(&schema), partitions)
323    }
324
325    /// Create a new plan using the provided schema and batches.
326    ///
327    /// Errors if any of the batches don't match the provided schema, or if no
328    /// batches are provided.
329    pub fn try_new_from_batches(
330        schema: SchemaRef,
331        batches: Vec<RecordBatch>,
332    ) -> Result<Arc<DataSourceExec>> {
333        if batches.is_empty() {
334            return plan_err!("Values list cannot be empty");
335        }
336
337        for batch in &batches {
338            let batch_schema = batch.schema();
339            if batch_schema != schema {
340                return plan_err!(
341                    "Batch has invalid schema. Expected: {}, got: {}",
342                    schema,
343                    batch_schema
344                );
345            }
346        }
347
348        let partitions = vec![batches];
349        let source = Self {
350            partitions,
351            schema: Arc::clone(&schema),
352            projected_schema: Arc::clone(&schema),
353            projection: None,
354            sort_information: vec![],
355            show_sizes: true,
356            fetch: None,
357        };
358        Ok(DataSourceExec::from_data_source(source))
359    }
360
361    /// Set the limit of the files
362    pub fn with_limit(mut self, limit: Option<usize>) -> Self {
363        self.fetch = limit;
364        self
365    }
366
367    /// Set `show_sizes` to determine whether to display partition sizes
368    pub fn with_show_sizes(mut self, show_sizes: bool) -> Self {
369        self.show_sizes = show_sizes;
370        self
371    }
372
373    /// Ref to partitions
374    pub fn partitions(&self) -> &[Vec<RecordBatch>] {
375        &self.partitions
376    }
377
378    /// Ref to projection
379    pub fn projection(&self) -> &Option<Vec<usize>> {
380        &self.projection
381    }
382
383    /// Show sizes
384    pub fn show_sizes(&self) -> bool {
385        self.show_sizes
386    }
387
388    /// Ref to sort information
389    pub fn sort_information(&self) -> &[LexOrdering] {
390        &self.sort_information
391    }
392
393    /// A memory table can be ordered by multiple expressions simultaneously.
394    /// [`EquivalenceProperties`] keeps track of expressions that describe the
395    /// global ordering of the schema. These columns are not necessarily same; e.g.
396    /// ```text
397    /// ┌-------┐
398    /// | a | b |
399    /// |---|---|
400    /// | 1 | 9 |
401    /// | 2 | 8 |
402    /// | 3 | 7 |
403    /// | 5 | 5 |
404    /// └---┴---┘
405    /// ```
406    /// where both `a ASC` and `b DESC` can describe the table ordering. With
407    /// [`EquivalenceProperties`], we can keep track of these equivalences
408    /// and treat `a ASC` and `b DESC` as the same ordering requirement.
409    ///
410    /// Note that if there is an internal projection, that projection will be
411    /// also applied to the given `sort_information`.
412    pub fn try_with_sort_information(
413        mut self,
414        mut sort_information: Vec<LexOrdering>,
415    ) -> Result<Self> {
416        // All sort expressions must refer to the original schema
417        let fields = self.schema.fields();
418        let ambiguous_column = sort_information
419            .iter()
420            .flat_map(|ordering| ordering.clone())
421            .flat_map(|expr| collect_columns(&expr.expr))
422            .find(|col| {
423                fields
424                    .get(col.index())
425                    .map(|field| field.name() != col.name())
426                    .unwrap_or(true)
427            });
428        if let Some(col) = ambiguous_column {
429            return internal_err!(
430                "Column {:?} is not found in the original schema of the MemorySourceConfig",
431                col
432            );
433        }
434
435        // If there is a projection on the source, we also need to project orderings
436        if let Some(projection) = &self.projection {
437            let base_schema = self.original_schema();
438            let proj_exprs = projection.iter().map(|idx| {
439                let name = base_schema.field(*idx).name();
440                (Arc::new(Column::new(name, *idx)) as _, name.to_string())
441            });
442            let projection_mapping =
443                ProjectionMapping::try_new(proj_exprs, &base_schema)?;
444            let base_eqp = EquivalenceProperties::new_with_orderings(
445                Arc::clone(&base_schema),
446                sort_information,
447            );
448            let proj_eqp =
449                base_eqp.project(&projection_mapping, Arc::clone(&self.projected_schema));
450            let oeq_class: OrderingEquivalenceClass = proj_eqp.into();
451            sort_information = oeq_class.into();
452        }
453
454        self.sort_information = sort_information;
455        Ok(self)
456    }
457
458    /// Arc clone of ref to original schema
459    pub fn original_schema(&self) -> SchemaRef {
460        Arc::clone(&self.schema)
461    }
462
463    /// Repartition while preserving order.
464    ///
465    /// Returns `Ok(None)` if cannot fulfill the requested repartitioning, such
466    /// as having too few batches to fulfill the `target_partitions` or if unable
467    /// to preserve output ordering.
468    fn repartition_preserving_order(
469        &self,
470        target_partitions: usize,
471        output_ordering: LexOrdering,
472    ) -> Result<Option<Vec<Vec<RecordBatch>>>> {
473        if !self.eq_properties().ordering_satisfy(output_ordering)? {
474            Ok(None)
475        } else {
476            let total_num_batches =
477                self.partitions.iter().map(|b| b.len()).sum::<usize>();
478            if total_num_batches < target_partitions {
479                // no way to create the desired repartitioning
480                return Ok(None);
481            }
482
483            let cnt_to_repartition = target_partitions - self.partitions.len();
484
485            // Label the current partitions and their order.
486            // Such that when we later split up the partitions into smaller sizes, we are maintaining the order.
487            let to_repartition = self
488                .partitions
489                .iter()
490                .enumerate()
491                .map(|(idx, batches)| RePartition {
492                    idx: idx + (cnt_to_repartition * idx), // make space in ordering for split partitions
493                    row_count: batches.iter().map(|batch| batch.num_rows()).sum(),
494                    batches: batches.clone(),
495                })
496                .collect_vec();
497
498            // Put all of the partitions into a heap ordered by `RePartition::partial_cmp`, which sizes
499            // by count of rows.
500            let mut max_heap = BinaryHeap::with_capacity(target_partitions);
501            for rep in to_repartition {
502                max_heap.push(CompareByRowCount(rep));
503            }
504
505            // Split the largest partitions into smaller partitions. Maintaining the output
506            // order of the partitions & newly created partitions.
507            let mut cannot_split_further = Vec::with_capacity(target_partitions);
508            for _ in 0..cnt_to_repartition {
509                // triggers loop for the cnt_to_repartition. So if need another 4 partitions, it attempts to split 4 times.
510                loop {
511                    // Take the largest item off the heap, and attempt to split.
512                    let Some(to_split) = max_heap.pop() else {
513                        // Nothing left to attempt repartition. Break inner loop.
514                        break;
515                    };
516
517                    // Split the partition. The new partitions will be ordered with idx and idx+1.
518                    let mut new_partitions = to_split.into_inner().split();
519                    if new_partitions.len() > 1 {
520                        for new_partition in new_partitions {
521                            max_heap.push(CompareByRowCount(new_partition));
522                        }
523                        // Successful repartition. Break inner loop, and return to outer `cnt_to_repartition` loop.
524                        break;
525                    } else {
526                        cannot_split_further.push(new_partitions.remove(0));
527                    }
528                }
529            }
530            let mut partitions = max_heap
531                .drain()
532                .map(CompareByRowCount::into_inner)
533                .collect_vec();
534            partitions.extend(cannot_split_further);
535
536            // Finally, sort all partitions by the output ordering.
537            // This was the original ordering of the batches within the partition. We are maintaining this ordering.
538            partitions.sort_by_key(|p| p.idx);
539            let partitions = partitions.into_iter().map(|rep| rep.batches).collect_vec();
540
541            Ok(Some(partitions))
542        }
543    }
544
545    /// Repartition into evenly sized chunks (as much as possible without batch splitting),
546    /// disregarding any ordering.
547    ///
548    /// Current implementation uses a first-fit-decreasing bin packing, modified to enable
549    /// us to still return the desired count of `target_partitions`.
550    ///
551    /// Returns `Ok(None)` if cannot fulfill the requested repartitioning, such
552    /// as having too few batches to fulfill the `target_partitions`.
553    fn repartition_evenly_by_size(
554        &self,
555        target_partitions: usize,
556    ) -> Result<Option<Vec<Vec<RecordBatch>>>> {
557        // determine if we have enough total batches to fulfill request
558        let mut flatten_batches =
559            self.partitions.clone().into_iter().flatten().collect_vec();
560        if flatten_batches.len() < target_partitions {
561            return Ok(None);
562        }
563
564        // Take all flattened batches (all in 1 partititon/vec) and divide evenly into the desired number of `target_partitions`.
565        let total_num_rows = flatten_batches.iter().map(|b| b.num_rows()).sum::<usize>();
566        // sort by size, so we pack multiple smaller batches into the same partition
567        flatten_batches.sort_by_key(|b| std::cmp::Reverse(b.num_rows()));
568
569        // Divide.
570        let mut partitions =
571            vec![Vec::with_capacity(flatten_batches.len()); target_partitions];
572        let mut target_partition_size = total_num_rows.div_ceil(target_partitions);
573        let mut total_rows_seen = 0;
574        let mut curr_bin_row_count = 0;
575        let mut idx = 0;
576        for batch in flatten_batches {
577            let row_cnt = batch.num_rows();
578            idx = std::cmp::min(idx, target_partitions - 1);
579
580            partitions[idx].push(batch);
581            curr_bin_row_count += row_cnt;
582            total_rows_seen += row_cnt;
583
584            if curr_bin_row_count >= target_partition_size {
585                idx += 1;
586                curr_bin_row_count = 0;
587
588                // update target_partition_size, to handle very lopsided batch distributions
589                // while still returning the count of `target_partitions`
590                if total_rows_seen < total_num_rows {
591                    target_partition_size = (total_num_rows - total_rows_seen)
592                        .div_ceil(target_partitions - idx);
593                }
594            }
595        }
596
597        Ok(Some(partitions))
598    }
599}
600
601/// For use in repartitioning, track the total size and original partition index.
602///
603/// Do not implement clone, in order to avoid unnecessary copying during repartitioning.
604struct RePartition {
605    /// Original output ordering for the partition.
606    idx: usize,
607    /// Total size of the partition, for use in heap ordering
608    /// (a.k.a. splitting up the largest partitions).
609    row_count: usize,
610    /// A partition containing record batches.
611    batches: Vec<RecordBatch>,
612}
613
614impl RePartition {
615    /// Split [`RePartition`] into 2 pieces, consuming self.
616    ///
617    /// Returns only 1 partition if cannot be split further.
618    fn split(self) -> Vec<Self> {
619        if self.batches.len() == 1 {
620            return vec![self];
621        }
622
623        let new_0 = RePartition {
624            idx: self.idx, // output ordering
625            row_count: 0,
626            batches: vec![],
627        };
628        let new_1 = RePartition {
629            idx: self.idx + 1, // output ordering +1
630            row_count: 0,
631            batches: vec![],
632        };
633        let split_pt = self.row_count / 2;
634
635        let [new_0, new_1] = self.batches.into_iter().fold(
636            [new_0, new_1],
637            |[mut new0, mut new1], batch| {
638                if new0.row_count < split_pt {
639                    new0.add_batch(batch);
640                } else {
641                    new1.add_batch(batch);
642                }
643                [new0, new1]
644            },
645        );
646        vec![new_0, new_1]
647    }
648
649    fn add_batch(&mut self, batch: RecordBatch) {
650        self.row_count += batch.num_rows();
651        self.batches.push(batch);
652    }
653}
654
655impl fmt::Display for RePartition {
656    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
657        write!(
658            f,
659            "{}rows-in-{}batches@{}",
660            self.row_count,
661            self.batches.len(),
662            self.idx
663        )
664    }
665}
666
667struct CompareByRowCount(RePartition);
668impl CompareByRowCount {
669    fn into_inner(self) -> RePartition {
670        self.0
671    }
672}
673impl Ord for CompareByRowCount {
674    fn cmp(&self, other: &Self) -> Ordering {
675        self.0.row_count.cmp(&other.0.row_count)
676    }
677}
678impl PartialOrd for CompareByRowCount {
679    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
680        Some(self.cmp(other))
681    }
682}
683impl PartialEq for CompareByRowCount {
684    fn eq(&self, other: &Self) -> bool {
685        // PartialEq must be consistent with PartialOrd
686        self.cmp(other) == Ordering::Equal
687    }
688}
689impl Eq for CompareByRowCount {}
690impl Deref for CompareByRowCount {
691    type Target = RePartition;
692    fn deref(&self) -> &Self::Target {
693        &self.0
694    }
695}
696
697/// Type alias for partition data
698pub type PartitionData = Arc<RwLock<Vec<RecordBatch>>>;
699
700/// Implements for writing to a [`MemTable`]
701///
702/// [`MemTable`]: <https://docs.rs/datafusion/latest/datafusion/datasource/memory/struct.MemTable.html>
703pub struct MemSink {
704    /// Target locations for writing data
705    batches: Vec<PartitionData>,
706    schema: SchemaRef,
707}
708
709impl Debug for MemSink {
710    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
711        f.debug_struct("MemSink")
712            .field("num_partitions", &self.batches.len())
713            .finish()
714    }
715}
716
717impl DisplayAs for MemSink {
718    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
719        match t {
720            DisplayFormatType::Default | DisplayFormatType::Verbose => {
721                let partition_count = self.batches.len();
722                write!(f, "MemoryTable (partitions={partition_count})")
723            }
724            DisplayFormatType::TreeRender => {
725                // TODO: collect info
726                write!(f, "")
727            }
728        }
729    }
730}
731
732impl MemSink {
733    /// Creates a new [`MemSink`].
734    ///
735    /// The caller is responsible for ensuring that there is at least one partition to insert into.
736    pub fn try_new(batches: Vec<PartitionData>, schema: SchemaRef) -> Result<Self> {
737        if batches.is_empty() {
738            return plan_err!("Cannot insert into MemTable with zero partitions");
739        }
740        Ok(Self { batches, schema })
741    }
742}
743
744#[async_trait]
745impl DataSink for MemSink {
746    fn as_any(&self) -> &dyn Any {
747        self
748    }
749
750    fn schema(&self) -> &SchemaRef {
751        &self.schema
752    }
753
754    async fn write_all(
755        &self,
756        mut data: SendableRecordBatchStream,
757        _context: &Arc<TaskContext>,
758    ) -> Result<u64> {
759        let num_partitions = self.batches.len();
760
761        // buffer up the data round robin style into num_partitions
762
763        let mut new_batches = vec![vec![]; num_partitions];
764        let mut i = 0;
765        let mut row_count = 0;
766        while let Some(batch) = data.next().await.transpose()? {
767            row_count += batch.num_rows();
768            new_batches[i].push(batch);
769            i = (i + 1) % num_partitions;
770        }
771
772        // write the outputs into the batches
773        for (target, mut batches) in self.batches.iter().zip(new_batches.into_iter()) {
774            // Append all the new batches in one go to minimize locking overhead
775            target.write().await.append(&mut batches);
776        }
777
778        Ok(row_count as u64)
779    }
780}
781
782#[cfg(test)]
783mod memory_source_tests {
784    use std::sync::Arc;
785
786    use crate::memory::MemorySourceConfig;
787    use crate::source::DataSourceExec;
788
789    use arrow::compute::SortOptions;
790    use arrow::datatypes::{DataType, Field, Schema};
791    use datafusion_common::Result;
792    use datafusion_physical_expr::expressions::col;
793    use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
794    use datafusion_physical_plan::ExecutionPlan;
795
796    #[test]
797    fn test_memory_order_eq() -> Result<()> {
798        let schema = Arc::new(Schema::new(vec![
799            Field::new("a", DataType::Int64, false),
800            Field::new("b", DataType::Int64, false),
801            Field::new("c", DataType::Int64, false),
802        ]));
803        let sort1: LexOrdering = [
804            PhysicalSortExpr {
805                expr: col("a", &schema)?,
806                options: SortOptions::default(),
807            },
808            PhysicalSortExpr {
809                expr: col("b", &schema)?,
810                options: SortOptions::default(),
811            },
812        ]
813        .into();
814        let sort2: LexOrdering = [PhysicalSortExpr {
815            expr: col("c", &schema)?,
816            options: SortOptions::default(),
817        }]
818        .into();
819        let mut expected_output_order = sort1.clone();
820        expected_output_order.extend(sort2.clone());
821
822        let sort_information = vec![sort1.clone(), sort2.clone()];
823        let mem_exec = DataSourceExec::from_data_source(
824            MemorySourceConfig::try_new(&[vec![]], schema, None)?
825                .try_with_sort_information(sort_information)?,
826        );
827
828        assert_eq!(
829            mem_exec.properties().output_ordering().unwrap(),
830            &expected_output_order
831        );
832        let eq_properties = mem_exec.properties().equivalence_properties();
833        assert!(eq_properties.oeq_class().contains(&sort1));
834        assert!(eq_properties.oeq_class().contains(&sort2));
835        Ok(())
836    }
837}
838
839#[cfg(test)]
840mod tests {
841    use super::*;
842    use crate::test_util::col;
843    use crate::tests::{aggr_test_schema, make_partition};
844
845    use arrow::array::{ArrayRef, Int32Array, Int64Array, StringArray};
846    use arrow::datatypes::{DataType, Field};
847    use datafusion_common::assert_batches_eq;
848    use datafusion_common::stats::{ColumnStatistics, Precision};
849    use datafusion_physical_expr::PhysicalSortExpr;
850    use datafusion_physical_plan::expressions::lit;
851
852    use datafusion_physical_plan::ExecutionPlan;
853    use futures::StreamExt;
854
855    #[tokio::test]
856    async fn exec_with_limit() -> Result<()> {
857        let task_ctx = Arc::new(TaskContext::default());
858        let batch = make_partition(7);
859        let schema = batch.schema();
860        let batches = vec![batch.clone(), batch];
861
862        let exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap();
863        assert_eq!(exec.fetch(), None);
864
865        let exec = exec.with_fetch(Some(4)).unwrap();
866        assert_eq!(exec.fetch(), Some(4));
867
868        let mut it = exec.execute(0, task_ctx)?;
869        let mut results = vec![];
870        while let Some(batch) = it.next().await {
871            results.push(batch?);
872        }
873
874        let expected = [
875            "+---+", "| i |", "+---+", "| 0 |", "| 1 |", "| 2 |", "| 3 |", "+---+",
876        ];
877        assert_batches_eq!(expected, &results);
878        Ok(())
879    }
880
881    #[tokio::test]
882    async fn values_empty_case() -> Result<()> {
883        let schema = aggr_test_schema();
884        let empty = MemorySourceConfig::try_new_as_values(schema, vec![]);
885        assert!(empty.is_err());
886        Ok(())
887    }
888
889    #[test]
890    fn new_exec_with_batches() {
891        let batch = make_partition(7);
892        let schema = batch.schema();
893        let batches = vec![batch.clone(), batch];
894        let _exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap();
895    }
896
897    #[test]
898    fn new_exec_with_batches_empty() {
899        let batch = make_partition(7);
900        let schema = batch.schema();
901        let _ = MemorySourceConfig::try_new_from_batches(schema, Vec::new()).unwrap_err();
902    }
903
904    #[test]
905    fn new_exec_with_batches_invalid_schema() {
906        let batch = make_partition(7);
907        let batches = vec![batch.clone(), batch];
908
909        let invalid_schema = Arc::new(Schema::new(vec![
910            Field::new("col0", DataType::UInt32, false),
911            Field::new("col1", DataType::Utf8, false),
912        ]));
913        let _ = MemorySourceConfig::try_new_from_batches(invalid_schema, batches)
914            .unwrap_err();
915    }
916
917    // Test issue: https://github.com/apache/datafusion/issues/8763
918    #[test]
919    fn new_exec_with_non_nullable_schema() {
920        let schema = Arc::new(Schema::new(vec![Field::new(
921            "col0",
922            DataType::UInt32,
923            false,
924        )]));
925        let _ = MemorySourceConfig::try_new_as_values(
926            Arc::clone(&schema),
927            vec![vec![lit(1u32)]],
928        )
929        .unwrap();
930        // Test that a null value is rejected
931        let _ = MemorySourceConfig::try_new_as_values(
932            schema,
933            vec![vec![lit(ScalarValue::UInt32(None))]],
934        )
935        .unwrap_err();
936    }
937
938    #[test]
939    fn values_stats_with_nulls_only() -> Result<()> {
940        let data = vec![
941            vec![lit(ScalarValue::Null)],
942            vec![lit(ScalarValue::Null)],
943            vec![lit(ScalarValue::Null)],
944        ];
945        let rows = data.len();
946        let values = MemorySourceConfig::try_new_as_values(
947            Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)])),
948            data,
949        )?;
950
951        assert_eq!(
952            values.partition_statistics(None)?,
953            Statistics {
954                num_rows: Precision::Exact(rows),
955                total_byte_size: Precision::Exact(8), // not important
956                column_statistics: vec![ColumnStatistics {
957                    null_count: Precision::Exact(rows), // there are only nulls
958                    distinct_count: Precision::Absent,
959                    max_value: Precision::Absent,
960                    min_value: Precision::Absent,
961                    sum_value: Precision::Absent,
962                },],
963            }
964        );
965
966        Ok(())
967    }
968
969    fn batch(row_size: usize) -> RecordBatch {
970        let a: ArrayRef = Arc::new(Int32Array::from(vec![1; row_size]));
971        let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("foo"); row_size]));
972        let c: ArrayRef = Arc::new(Int64Array::from_iter(vec![1; row_size]));
973        RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
974    }
975
976    fn schema() -> SchemaRef {
977        batch(1).schema()
978    }
979
980    fn memorysrcconfig_no_partitions(
981        sort_information: Vec<LexOrdering>,
982    ) -> Result<MemorySourceConfig> {
983        let partitions = vec![];
984        MemorySourceConfig::try_new(&partitions, schema(), None)?
985            .try_with_sort_information(sort_information)
986    }
987
988    fn memorysrcconfig_1_partition_1_batch(
989        sort_information: Vec<LexOrdering>,
990    ) -> Result<MemorySourceConfig> {
991        let partitions = vec![vec![batch(100)]];
992        MemorySourceConfig::try_new(&partitions, schema(), None)?
993            .try_with_sort_information(sort_information)
994    }
995
996    fn memorysrcconfig_3_partitions_1_batch_each(
997        sort_information: Vec<LexOrdering>,
998    ) -> Result<MemorySourceConfig> {
999        let partitions = vec![vec![batch(100)], vec![batch(100)], vec![batch(100)]];
1000        MemorySourceConfig::try_new(&partitions, schema(), None)?
1001            .try_with_sort_information(sort_information)
1002    }
1003
1004    fn memorysrcconfig_3_partitions_with_2_batches_each(
1005        sort_information: Vec<LexOrdering>,
1006    ) -> Result<MemorySourceConfig> {
1007        let partitions = vec![
1008            vec![batch(100), batch(100)],
1009            vec![batch(100), batch(100)],
1010            vec![batch(100), batch(100)],
1011        ];
1012        MemorySourceConfig::try_new(&partitions, schema(), None)?
1013            .try_with_sort_information(sort_information)
1014    }
1015
1016    /// Batches of different sizes, with batches ordered by size (100_000, 10_000, 100, 1)
1017    /// in the Memtable partition (a.k.a. vector of batches).
1018    fn memorysrcconfig_1_partition_with_different_sized_batches(
1019        sort_information: Vec<LexOrdering>,
1020    ) -> Result<MemorySourceConfig> {
1021        let partitions = vec![vec![batch(100_000), batch(10_000), batch(100), batch(1)]];
1022        MemorySourceConfig::try_new(&partitions, schema(), None)?
1023            .try_with_sort_information(sort_information)
1024    }
1025
1026    /// Same as [`memorysrcconfig_1_partition_with_different_sized_batches`],
1027    /// but the batches are ordered differently (not by size)
1028    /// in the Memtable partition (a.k.a. vector of batches).
1029    fn memorysrcconfig_1_partition_with_ordering_not_matching_size(
1030        sort_information: Vec<LexOrdering>,
1031    ) -> Result<MemorySourceConfig> {
1032        let partitions = vec![vec![batch(100_000), batch(1), batch(100), batch(10_000)]];
1033        MemorySourceConfig::try_new(&partitions, schema(), None)?
1034            .try_with_sort_information(sort_information)
1035    }
1036
1037    fn memorysrcconfig_2_partition_with_different_sized_batches(
1038        sort_information: Vec<LexOrdering>,
1039    ) -> Result<MemorySourceConfig> {
1040        let partitions = vec![
1041            vec![batch(100_000), batch(10_000), batch(1_000)],
1042            vec![batch(2_000), batch(20)],
1043        ];
1044        MemorySourceConfig::try_new(&partitions, schema(), None)?
1045            .try_with_sort_information(sort_information)
1046    }
1047
1048    fn memorysrcconfig_2_partition_with_extreme_sized_batches(
1049        sort_information: Vec<LexOrdering>,
1050    ) -> Result<MemorySourceConfig> {
1051        let partitions = vec![
1052            vec![
1053                batch(100_000),
1054                batch(1),
1055                batch(1),
1056                batch(1),
1057                batch(1),
1058                batch(0),
1059            ],
1060            vec![batch(1), batch(1), batch(1), batch(1), batch(0), batch(100)],
1061        ];
1062        MemorySourceConfig::try_new(&partitions, schema(), None)?
1063            .try_with_sort_information(sort_information)
1064    }
1065
1066    /// Assert that we get the expected count of partitions after repartitioning.
1067    ///
1068    /// If None, then we expected the [`DataSource::repartitioned`] to return None.
1069    fn assert_partitioning(
1070        partitioned_datasrc: Option<Arc<dyn DataSource>>,
1071        partition_cnt: Option<usize>,
1072    ) {
1073        let should_exist = if let Some(partition_cnt) = partition_cnt {
1074            format!("new datasource should exist and have {partition_cnt:?} partitions")
1075        } else {
1076            "new datasource should not exist".into()
1077        };
1078
1079        let actual = partitioned_datasrc
1080            .map(|datasrc| datasrc.output_partitioning().partition_count());
1081        assert_eq!(
1082            actual,
1083            partition_cnt,
1084            "partitioned datasrc does not match expected, we expected {should_exist}, instead found {actual:?}"
1085        );
1086    }
1087
1088    fn run_all_test_scenarios(
1089        output_ordering: Option<LexOrdering>,
1090        sort_information_on_config: Vec<LexOrdering>,
1091    ) -> Result<()> {
1092        let not_used = usize::MAX;
1093
1094        // src has no partitions
1095        let mem_src_config =
1096            memorysrcconfig_no_partitions(sort_information_on_config.clone())?;
1097        let partitioned_datasrc =
1098            mem_src_config.repartitioned(1, not_used, output_ordering.clone())?;
1099        assert_partitioning(partitioned_datasrc, None);
1100
1101        // src has partitions == target partitions (=1)
1102        let target_partitions = 1;
1103        let mem_src_config =
1104            memorysrcconfig_1_partition_1_batch(sort_information_on_config.clone())?;
1105        let partitioned_datasrc = mem_src_config.repartitioned(
1106            target_partitions,
1107            not_used,
1108            output_ordering.clone(),
1109        )?;
1110        assert_partitioning(partitioned_datasrc, None);
1111
1112        // src has partitions == target partitions (=3)
1113        let target_partitions = 3;
1114        let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
1115            sort_information_on_config.clone(),
1116        )?;
1117        let partitioned_datasrc = mem_src_config.repartitioned(
1118            target_partitions,
1119            not_used,
1120            output_ordering.clone(),
1121        )?;
1122        assert_partitioning(partitioned_datasrc, None);
1123
1124        // src has partitions > target partitions, but we don't merge them
1125        let target_partitions = 2;
1126        let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
1127            sort_information_on_config.clone(),
1128        )?;
1129        let partitioned_datasrc = mem_src_config.repartitioned(
1130            target_partitions,
1131            not_used,
1132            output_ordering.clone(),
1133        )?;
1134        assert_partitioning(partitioned_datasrc, None);
1135
1136        // src has partitions < target partitions, but not enough batches (per partition) to split into more partitions
1137        let target_partitions = 4;
1138        let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
1139            sort_information_on_config.clone(),
1140        )?;
1141        let partitioned_datasrc = mem_src_config.repartitioned(
1142            target_partitions,
1143            not_used,
1144            output_ordering.clone(),
1145        )?;
1146        assert_partitioning(partitioned_datasrc, None);
1147
1148        // src has partitions < target partitions, and can split to sufficient amount
1149        // has 6 batches across 3 partitions. Will need to split 2 of it's partitions.
1150        let target_partitions = 5;
1151        let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
1152            sort_information_on_config.clone(),
1153        )?;
1154        let partitioned_datasrc = mem_src_config.repartitioned(
1155            target_partitions,
1156            not_used,
1157            output_ordering.clone(),
1158        )?;
1159        assert_partitioning(partitioned_datasrc, Some(5));
1160
1161        // src has partitions < target partitions, and can split to sufficient amount
1162        // has 6 batches across 3 partitions. Will need to split all of it's partitions.
1163        let target_partitions = 6;
1164        let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
1165            sort_information_on_config.clone(),
1166        )?;
1167        let partitioned_datasrc = mem_src_config.repartitioned(
1168            target_partitions,
1169            not_used,
1170            output_ordering.clone(),
1171        )?;
1172        assert_partitioning(partitioned_datasrc, Some(6));
1173
1174        // src has partitions < target partitions, but not enough total batches to fulfill the split (desired target_partitions)
1175        let target_partitions = 3 * 2 + 1;
1176        let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
1177            sort_information_on_config.clone(),
1178        )?;
1179        let partitioned_datasrc = mem_src_config.repartitioned(
1180            target_partitions,
1181            not_used,
1182            output_ordering.clone(),
1183        )?;
1184        assert_partitioning(partitioned_datasrc, None);
1185
1186        // src has 1 partition with many batches of lopsided sizes
1187        // make sure it handles the split properly
1188        let target_partitions = 2;
1189        let mem_src_config = memorysrcconfig_1_partition_with_different_sized_batches(
1190            sort_information_on_config,
1191        )?;
1192        let partitioned_datasrc = mem_src_config.clone().repartitioned(
1193            target_partitions,
1194            not_used,
1195            output_ordering,
1196        )?;
1197        assert_partitioning(partitioned_datasrc.clone(), Some(2));
1198        // Starting = batch(100_000), batch(10_000), batch(100), batch(1).
1199        // It should have split as p1=batch(100_000), p2=[batch(10_000), batch(100), batch(1)]
1200        let partitioned_datasrc = partitioned_datasrc.unwrap();
1201        let Some(mem_src_config) = partitioned_datasrc
1202            .as_any()
1203            .downcast_ref::<MemorySourceConfig>()
1204        else {
1205            unreachable!()
1206        };
1207        let repartitioned_raw_batches = mem_src_config.partitions.clone();
1208        assert_eq!(repartitioned_raw_batches.len(), 2);
1209        let [ref p1, ref p2] = repartitioned_raw_batches[..] else {
1210            unreachable!()
1211        };
1212        // p1=batch(100_000)
1213        assert_eq!(p1.len(), 1);
1214        assert_eq!(p1[0].num_rows(), 100_000);
1215        // p2=[batch(10_000), batch(100), batch(1)]
1216        assert_eq!(p2.len(), 3);
1217        assert_eq!(p2[0].num_rows(), 10_000);
1218        assert_eq!(p2[1].num_rows(), 100);
1219        assert_eq!(p2[2].num_rows(), 1);
1220
1221        Ok(())
1222    }
1223
1224    #[test]
1225    fn test_repartition_no_sort_information_no_output_ordering() -> Result<()> {
1226        let no_sort = vec![];
1227        let no_output_ordering = None;
1228
1229        // Test: common set of functionality
1230        run_all_test_scenarios(no_output_ordering.clone(), no_sort.clone())?;
1231
1232        // Test: how no-sort-order divides differently.
1233        //    * does not preserve separate partitions (with own internal ordering) on even split,
1234        //    * nor does it preserve ordering (re-orders batch(2_000) vs batch(1_000)).
1235        let target_partitions = 3;
1236        let mem_src_config =
1237            memorysrcconfig_2_partition_with_different_sized_batches(no_sort)?;
1238        let partitioned_datasrc = mem_src_config.clone().repartitioned(
1239            target_partitions,
1240            usize::MAX,
1241            no_output_ordering,
1242        )?;
1243        assert_partitioning(partitioned_datasrc.clone(), Some(3));
1244        // Starting = batch(100_000), batch(10_000), batch(1_000), batch(2_000), batch(20)
1245        // It should have split as p1=batch(100_000), p2=batch(10_000),  p3=rest(mixed across original partitions)
1246        let repartitioned_raw_batches = mem_src_config
1247            .repartition_evenly_by_size(target_partitions)?
1248            .unwrap();
1249        assert_eq!(repartitioned_raw_batches.len(), 3);
1250        let [ref p1, ref p2, ref p3] = repartitioned_raw_batches[..] else {
1251            unreachable!()
1252        };
1253        // p1=batch(100_000)
1254        assert_eq!(p1.len(), 1);
1255        assert_eq!(p1[0].num_rows(), 100_000);
1256        // p2=batch(10_000)
1257        assert_eq!(p2.len(), 1);
1258        assert_eq!(p2[0].num_rows(), 10_000);
1259        // p3= batch(2_000), batch(1_000), batch(20)
1260        assert_eq!(p3.len(), 3);
1261        assert_eq!(p3[0].num_rows(), 2_000);
1262        assert_eq!(p3[1].num_rows(), 1_000);
1263        assert_eq!(p3[2].num_rows(), 20);
1264
1265        Ok(())
1266    }
1267
1268    #[test]
1269    fn test_repartition_no_sort_information_no_output_ordering_lopsized_batches(
1270    ) -> Result<()> {
1271        let no_sort = vec![];
1272        let no_output_ordering = None;
1273
1274        // Test: case has two input partitions:
1275        //     b(100_000), b(1), b(1), b(1), b(1), b(0)
1276        //     b(1), b(1), b(1), b(1), b(0), b(100)
1277        //
1278        // We want an output with target_partitions=5, which means the ideal division is:
1279        //     b(100_000)
1280        //     b(100)
1281        //     b(1), b(1), b(1)
1282        //     b(1), b(1), b(1)
1283        //     b(1), b(1), b(0)
1284        let target_partitions = 5;
1285        let mem_src_config =
1286            memorysrcconfig_2_partition_with_extreme_sized_batches(no_sort)?;
1287        let partitioned_datasrc = mem_src_config.clone().repartitioned(
1288            target_partitions,
1289            usize::MAX,
1290            no_output_ordering,
1291        )?;
1292        assert_partitioning(partitioned_datasrc.clone(), Some(5));
1293        // Starting partition 1 = batch(100_000), batch(1), batch(1), batch(1), batch(1), batch(0)
1294        // Starting partition 1 = batch(1), batch(1), batch(1), batch(1), batch(0), batch(100)
1295        // It should have split as p1=batch(100_000), p2=batch(100), p3=[batch(1),batch(1)], p4=[batch(1),batch(1)], p5=[batch(1),batch(1),batch(0),batch(0)]
1296        let repartitioned_raw_batches = mem_src_config
1297            .repartition_evenly_by_size(target_partitions)?
1298            .unwrap();
1299        assert_eq!(repartitioned_raw_batches.len(), 5);
1300        let [ref p1, ref p2, ref p3, ref p4, ref p5] = repartitioned_raw_batches[..]
1301        else {
1302            unreachable!()
1303        };
1304        // p1=batch(100_000)
1305        assert_eq!(p1.len(), 1);
1306        assert_eq!(p1[0].num_rows(), 100_000);
1307        // p2=batch(100)
1308        assert_eq!(p2.len(), 1);
1309        assert_eq!(p2[0].num_rows(), 100);
1310        // p3=[batch(1),batch(1),batch(1)]
1311        assert_eq!(p3.len(), 3);
1312        assert_eq!(p3[0].num_rows(), 1);
1313        assert_eq!(p3[1].num_rows(), 1);
1314        assert_eq!(p3[2].num_rows(), 1);
1315        // p4=[batch(1),batch(1),batch(1)]
1316        assert_eq!(p4.len(), 3);
1317        assert_eq!(p4[0].num_rows(), 1);
1318        assert_eq!(p4[1].num_rows(), 1);
1319        assert_eq!(p4[2].num_rows(), 1);
1320        // p5=[batch(1),batch(1),batch(0),batch(0)]
1321        assert_eq!(p5.len(), 4);
1322        assert_eq!(p5[0].num_rows(), 1);
1323        assert_eq!(p5[1].num_rows(), 1);
1324        assert_eq!(p5[2].num_rows(), 0);
1325        assert_eq!(p5[3].num_rows(), 0);
1326
1327        Ok(())
1328    }
1329
1330    #[test]
1331    fn test_repartition_with_sort_information() -> Result<()> {
1332        let schema = schema();
1333        let sort_key: LexOrdering =
1334            [PhysicalSortExpr::new_default(col("c", &schema)?)].into();
1335        let has_sort = vec![sort_key.clone()];
1336        let output_ordering = Some(sort_key);
1337
1338        // Test: common set of functionality
1339        run_all_test_scenarios(output_ordering.clone(), has_sort.clone())?;
1340
1341        // Test: DOES preserve separate partitions (with own internal ordering)
1342        let target_partitions = 3;
1343        let mem_src_config =
1344            memorysrcconfig_2_partition_with_different_sized_batches(has_sort)?;
1345        let partitioned_datasrc = mem_src_config.clone().repartitioned(
1346            target_partitions,
1347            usize::MAX,
1348            output_ordering.clone(),
1349        )?;
1350        assert_partitioning(partitioned_datasrc.clone(), Some(3));
1351        // Starting = batch(100_000), batch(10_000), batch(1_000), batch(2_000), batch(20)
1352        // It should have split as p1=batch(100_000), p2=[batch(10_000),batch(1_000)],  p3=<other_partition>
1353        let Some(output_ord) = output_ordering else {
1354            unreachable!()
1355        };
1356        let repartitioned_raw_batches = mem_src_config
1357            .repartition_preserving_order(target_partitions, output_ord)?
1358            .unwrap();
1359        assert_eq!(repartitioned_raw_batches.len(), 3);
1360        let [ref p1, ref p2, ref p3] = repartitioned_raw_batches[..] else {
1361            unreachable!()
1362        };
1363        // p1=batch(100_000)
1364        assert_eq!(p1.len(), 1);
1365        assert_eq!(p1[0].num_rows(), 100_000);
1366        // p2=[batch(10_000),batch(1_000)]
1367        assert_eq!(p2.len(), 2);
1368        assert_eq!(p2[0].num_rows(), 10_000);
1369        assert_eq!(p2[1].num_rows(), 1_000);
1370        // p3=batch(2_000), batch(20)
1371        assert_eq!(p3.len(), 2);
1372        assert_eq!(p3[0].num_rows(), 2_000);
1373        assert_eq!(p3[1].num_rows(), 20);
1374
1375        Ok(())
1376    }
1377
1378    #[test]
1379    fn test_repartition_with_batch_ordering_not_matching_sizing() -> Result<()> {
1380        let schema = schema();
1381        let sort_key: LexOrdering =
1382            [PhysicalSortExpr::new_default(col("c", &schema)?)].into();
1383        let has_sort = vec![sort_key.clone()];
1384        let output_ordering = Some(sort_key);
1385
1386        // src has 1 partition with many batches of lopsided sizes
1387        // note that the input vector of batches are not ordered by decreasing size
1388        let target_partitions = 2;
1389        let mem_src_config =
1390            memorysrcconfig_1_partition_with_ordering_not_matching_size(has_sort)?;
1391        let partitioned_datasrc = mem_src_config.clone().repartitioned(
1392            target_partitions,
1393            usize::MAX,
1394            output_ordering,
1395        )?;
1396        assert_partitioning(partitioned_datasrc.clone(), Some(2));
1397        // Starting = batch(100_000), batch(1), batch(100), batch(10_000).
1398        // It should have split as p1=batch(100_000), p2=[batch(1), batch(100), batch(10_000)]
1399        let partitioned_datasrc = partitioned_datasrc.unwrap();
1400        let Some(mem_src_config) = partitioned_datasrc
1401            .as_any()
1402            .downcast_ref::<MemorySourceConfig>()
1403        else {
1404            unreachable!()
1405        };
1406        let repartitioned_raw_batches = mem_src_config.partitions.clone();
1407        assert_eq!(repartitioned_raw_batches.len(), 2);
1408        let [ref p1, ref p2] = repartitioned_raw_batches[..] else {
1409            unreachable!()
1410        };
1411        // p1=batch(100_000)
1412        assert_eq!(p1.len(), 1);
1413        assert_eq!(p1[0].num_rows(), 100_000);
1414        // p2=[batch(1), batch(100), batch(10_000)] -- **this is preserving the partition order**
1415        assert_eq!(p2.len(), 3);
1416        assert_eq!(p2[0].num_rows(), 1);
1417        assert_eq!(p2[1].num_rows(), 100);
1418        assert_eq!(p2[2].num_rows(), 10_000);
1419
1420        Ok(())
1421    }
1422}