datafusion_datasource/
memory.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::cmp::Ordering;
20use std::collections::BinaryHeap;
21use std::fmt;
22use std::fmt::Debug;
23use std::ops::Deref;
24use std::slice::from_ref;
25use std::sync::Arc;
26
27use crate::sink::DataSink;
28use crate::source::{DataSource, DataSourceExec};
29
30use arrow::array::{RecordBatch, RecordBatchOptions};
31use arrow::datatypes::{Schema, SchemaRef};
32use datafusion_common::{
33    Result, ScalarValue, assert_or_internal_err, plan_err, project_schema,
34};
35use datafusion_execution::TaskContext;
36use datafusion_physical_expr::equivalence::project_orderings;
37use datafusion_physical_expr::projection::ProjectionExprs;
38use datafusion_physical_expr::utils::collect_columns;
39use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
40use datafusion_physical_plan::memory::MemoryStream;
41use datafusion_physical_plan::projection::{
42    all_alias_free_columns, new_projections_for_columns,
43};
44use datafusion_physical_plan::{
45    ColumnarValue, DisplayAs, DisplayFormatType, Partitioning, PhysicalExpr,
46    SendableRecordBatchStream, Statistics, common,
47};
48
49use async_trait::async_trait;
50use datafusion_physical_plan::coop::cooperative;
51use datafusion_physical_plan::execution_plan::SchedulingType;
52use futures::StreamExt;
53use itertools::Itertools;
54use tokio::sync::RwLock;
55
56/// Data source configuration for reading in-memory batches of data
57#[derive(Clone, Debug)]
58pub struct MemorySourceConfig {
59    /// The partitions to query.
60    ///
61    /// Each partition is a `Vec<RecordBatch>`.
62    partitions: Vec<Vec<RecordBatch>>,
63    /// Schema representing the data before projection
64    schema: SchemaRef,
65    /// Schema representing the data after the optional projection is applied
66    projected_schema: SchemaRef,
67    /// Optional projection
68    projection: Option<Vec<usize>>,
69    /// Sort information: one or more equivalent orderings
70    sort_information: Vec<LexOrdering>,
71    /// if partition sizes should be displayed
72    show_sizes: bool,
73    /// The maximum number of records to read from this plan. If `None`,
74    /// all records after filtering are returned.
75    fetch: Option<usize>,
76}
77
78impl DataSource for MemorySourceConfig {
79    fn open(
80        &self,
81        partition: usize,
82        _context: Arc<TaskContext>,
83    ) -> Result<SendableRecordBatchStream> {
84        Ok(Box::pin(cooperative(
85            MemoryStream::try_new(
86                self.partitions[partition].clone(),
87                Arc::clone(&self.projected_schema),
88                self.projection.clone(),
89            )?
90            .with_fetch(self.fetch),
91        )))
92    }
93
94    fn as_any(&self) -> &dyn Any {
95        self
96    }
97
98    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
99        match t {
100            DisplayFormatType::Default | DisplayFormatType::Verbose => {
101                let partition_sizes: Vec<_> =
102                    self.partitions.iter().map(|b| b.len()).collect();
103
104                let output_ordering = self
105                    .sort_information
106                    .first()
107                    .map(|output_ordering| format!(", output_ordering={output_ordering}"))
108                    .unwrap_or_default();
109
110                let eq_properties = self.eq_properties();
111                let constraints = eq_properties.constraints();
112                let constraints = if constraints.is_empty() {
113                    String::new()
114                } else {
115                    format!(", {constraints}")
116                };
117
118                let limit = self
119                    .fetch
120                    .map_or(String::new(), |limit| format!(", fetch={limit}"));
121                if self.show_sizes {
122                    write!(
123                        f,
124                        "partitions={}, partition_sizes={partition_sizes:?}{limit}{output_ordering}{constraints}",
125                        partition_sizes.len(),
126                    )
127                } else {
128                    write!(
129                        f,
130                        "partitions={}{limit}{output_ordering}{constraints}",
131                        partition_sizes.len(),
132                    )
133                }
134            }
135            DisplayFormatType::TreeRender => {
136                let total_rows = self.partitions.iter().map(|b| b.len()).sum::<usize>();
137                let total_bytes: usize = self
138                    .partitions
139                    .iter()
140                    .flatten()
141                    .map(|batch| batch.get_array_memory_size())
142                    .sum();
143                writeln!(f, "format=memory")?;
144                writeln!(f, "rows={total_rows}")?;
145                writeln!(f, "bytes={total_bytes}")?;
146                Ok(())
147            }
148        }
149    }
150
151    /// If possible, redistribute batches across partitions according to their size.
152    ///
153    /// Returns `Ok(None)` if unable to repartition. Preserve output ordering if exists.
154    /// Refer to [`DataSource::repartitioned`] for further details.
155    fn repartitioned(
156        &self,
157        target_partitions: usize,
158        _repartition_file_min_size: usize,
159        output_ordering: Option<LexOrdering>,
160    ) -> Result<Option<Arc<dyn DataSource>>> {
161        if self.partitions.is_empty() || self.partitions.len() >= target_partitions
162        // if have no partitions, or already have more partitions than desired, do not repartition
163        {
164            return Ok(None);
165        }
166
167        let maybe_repartitioned = if let Some(output_ordering) = output_ordering {
168            self.repartition_preserving_order(target_partitions, output_ordering)?
169        } else {
170            self.repartition_evenly_by_size(target_partitions)?
171        };
172
173        if let Some(repartitioned) = maybe_repartitioned {
174            Ok(Some(Arc::new(Self::try_new(
175                &repartitioned,
176                self.original_schema(),
177                self.projection.clone(),
178            )?)))
179        } else {
180            Ok(None)
181        }
182    }
183
184    fn output_partitioning(&self) -> Partitioning {
185        Partitioning::UnknownPartitioning(self.partitions.len())
186    }
187
188    fn eq_properties(&self) -> EquivalenceProperties {
189        EquivalenceProperties::new_with_orderings(
190            Arc::clone(&self.projected_schema),
191            self.sort_information.clone(),
192        )
193    }
194
195    fn scheduling_type(&self) -> SchedulingType {
196        SchedulingType::Cooperative
197    }
198
199    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
200        if let Some(partition) = partition {
201            // Compute statistics for a specific partition
202            if let Some(batches) = self.partitions.get(partition) {
203                Ok(common::compute_record_batch_statistics(
204                    from_ref(batches),
205                    &self.schema,
206                    self.projection.clone(),
207                ))
208            } else {
209                // Invalid partition index
210                Ok(Statistics::new_unknown(&self.projected_schema))
211            }
212        } else {
213            // Compute statistics across all partitions
214            Ok(common::compute_record_batch_statistics(
215                &self.partitions,
216                &self.schema,
217                self.projection.clone(),
218            ))
219        }
220    }
221
222    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
223        let source = self.clone();
224        Some(Arc::new(source.with_limit(limit)))
225    }
226
227    fn fetch(&self) -> Option<usize> {
228        self.fetch
229    }
230
231    fn try_swapping_with_projection(
232        &self,
233        projection: &ProjectionExprs,
234    ) -> Result<Option<Arc<dyn DataSource>>> {
235        // If there is any non-column or alias-carrier expression, Projection should not be removed.
236        // This process can be moved into MemoryExec, but it would be an overlap of their responsibility.
237        let exprs = projection.iter().cloned().collect_vec();
238        all_alias_free_columns(exprs.as_slice())
239            .then(|| {
240                let all_projections = (0..self.schema.fields().len()).collect();
241                let new_projections = new_projections_for_columns(
242                    &exprs,
243                    self.projection().as_ref().unwrap_or(&all_projections),
244                );
245
246                MemorySourceConfig::try_new(
247                    self.partitions(),
248                    self.original_schema(),
249                    Some(new_projections),
250                )
251                .map(|s| Arc::new(s) as Arc<dyn DataSource>)
252            })
253            .transpose()
254    }
255}
256
257impl MemorySourceConfig {
258    /// Create a new `MemorySourceConfig` for reading in-memory record batches
259    /// The provided `schema` should not have the projection applied.
260    pub fn try_new(
261        partitions: &[Vec<RecordBatch>],
262        schema: SchemaRef,
263        projection: Option<Vec<usize>>,
264    ) -> Result<Self> {
265        let projected_schema = project_schema(&schema, projection.as_ref())?;
266        Ok(Self {
267            partitions: partitions.to_vec(),
268            schema,
269            projected_schema,
270            projection,
271            sort_information: vec![],
272            show_sizes: true,
273            fetch: None,
274        })
275    }
276
277    /// Create a new `DataSourceExec` plan for reading in-memory record batches
278    /// The provided `schema` should not have the projection applied.
279    pub fn try_new_exec(
280        partitions: &[Vec<RecordBatch>],
281        schema: SchemaRef,
282        projection: Option<Vec<usize>>,
283    ) -> Result<Arc<DataSourceExec>> {
284        let source = Self::try_new(partitions, schema, projection)?;
285        Ok(DataSourceExec::from_data_source(source))
286    }
287
288    /// Create a new execution plan from a list of constant values (`ValuesExec`)
289    #[expect(clippy::needless_pass_by_value)]
290    pub fn try_new_as_values(
291        schema: SchemaRef,
292        data: Vec<Vec<Arc<dyn PhysicalExpr>>>,
293    ) -> Result<Arc<DataSourceExec>> {
294        if data.is_empty() {
295            return plan_err!("Values list cannot be empty");
296        }
297
298        let n_row = data.len();
299        let n_col = schema.fields().len();
300
301        // We have this single row batch as a placeholder to satisfy evaluation argument
302        // and generate a single output row
303        let placeholder_schema = Arc::new(Schema::empty());
304        let placeholder_batch = RecordBatch::try_new_with_options(
305            Arc::clone(&placeholder_schema),
306            vec![],
307            &RecordBatchOptions::new().with_row_count(Some(1)),
308        )?;
309
310        // Evaluate each column
311        let arrays = (0..n_col)
312            .map(|j| {
313                (0..n_row)
314                    .map(|i| {
315                        let expr = &data[i][j];
316                        let result = expr.evaluate(&placeholder_batch)?;
317
318                        match result {
319                            ColumnarValue::Scalar(scalar) => Ok(scalar),
320                            ColumnarValue::Array(array) if array.len() == 1 => {
321                                ScalarValue::try_from_array(&array, 0)
322                            }
323                            ColumnarValue::Array(_) => {
324                                plan_err!("Cannot have array values in a values list")
325                            }
326                        }
327                    })
328                    .collect::<Result<Vec<_>>>()
329                    .and_then(ScalarValue::iter_to_array)
330            })
331            .collect::<Result<Vec<_>>>()?;
332
333        let batch = RecordBatch::try_new_with_options(
334            Arc::clone(&schema),
335            arrays,
336            &RecordBatchOptions::new().with_row_count(Some(n_row)),
337        )?;
338
339        let partitions = vec![batch];
340        Self::try_new_from_batches(Arc::clone(&schema), partitions)
341    }
342
343    /// Create a new plan using the provided schema and batches.
344    ///
345    /// Errors if any of the batches don't match the provided schema, or if no
346    /// batches are provided.
347    #[expect(clippy::needless_pass_by_value)]
348    pub fn try_new_from_batches(
349        schema: SchemaRef,
350        batches: Vec<RecordBatch>,
351    ) -> Result<Arc<DataSourceExec>> {
352        if batches.is_empty() {
353            return plan_err!("Values list cannot be empty");
354        }
355
356        for batch in &batches {
357            let batch_schema = batch.schema();
358            if batch_schema != schema {
359                return plan_err!(
360                    "Batch has invalid schema. Expected: {}, got: {}",
361                    schema,
362                    batch_schema
363                );
364            }
365        }
366
367        let partitions = vec![batches];
368        let source = Self {
369            partitions,
370            schema: Arc::clone(&schema),
371            projected_schema: Arc::clone(&schema),
372            projection: None,
373            sort_information: vec![],
374            show_sizes: true,
375            fetch: None,
376        };
377        Ok(DataSourceExec::from_data_source(source))
378    }
379
380    /// Set the limit of the files
381    pub fn with_limit(mut self, limit: Option<usize>) -> Self {
382        self.fetch = limit;
383        self
384    }
385
386    /// Set `show_sizes` to determine whether to display partition sizes
387    pub fn with_show_sizes(mut self, show_sizes: bool) -> Self {
388        self.show_sizes = show_sizes;
389        self
390    }
391
392    /// Ref to partitions
393    pub fn partitions(&self) -> &[Vec<RecordBatch>] {
394        &self.partitions
395    }
396
397    /// Ref to projection
398    pub fn projection(&self) -> &Option<Vec<usize>> {
399        &self.projection
400    }
401
402    /// Show sizes
403    pub fn show_sizes(&self) -> bool {
404        self.show_sizes
405    }
406
407    /// Ref to sort information
408    pub fn sort_information(&self) -> &[LexOrdering] {
409        &self.sort_information
410    }
411
412    /// A memory table can be ordered by multiple expressions simultaneously.
413    /// [`EquivalenceProperties`] keeps track of expressions that describe the
414    /// global ordering of the schema. These columns are not necessarily same; e.g.
415    /// ```text
416    /// ┌-------┐
417    /// | a | b |
418    /// |---|---|
419    /// | 1 | 9 |
420    /// | 2 | 8 |
421    /// | 3 | 7 |
422    /// | 5 | 5 |
423    /// └---┴---┘
424    /// ```
425    /// where both `a ASC` and `b DESC` can describe the table ordering. With
426    /// [`EquivalenceProperties`], we can keep track of these equivalences
427    /// and treat `a ASC` and `b DESC` as the same ordering requirement.
428    ///
429    /// Note that if there is an internal projection, that projection will be
430    /// also applied to the given `sort_information`.
431    pub fn try_with_sort_information(
432        mut self,
433        mut sort_information: Vec<LexOrdering>,
434    ) -> Result<Self> {
435        // All sort expressions must refer to the original schema
436        let fields = self.schema.fields();
437        let ambiguous_column = sort_information
438            .iter()
439            .flat_map(|ordering| ordering.clone())
440            .flat_map(|expr| collect_columns(&expr.expr))
441            .find(|col| {
442                fields
443                    .get(col.index())
444                    .map(|field| field.name() != col.name())
445                    .unwrap_or(true)
446            });
447        assert_or_internal_err!(
448            ambiguous_column.is_none(),
449            "Column {:?} is not found in the original schema of the MemorySourceConfig",
450            ambiguous_column.as_ref().unwrap()
451        );
452
453        // If there is a projection on the source, we also need to project orderings
454        if self.projection.is_some() {
455            sort_information =
456                project_orderings(&sort_information, &self.projected_schema);
457        }
458
459        self.sort_information = sort_information;
460        Ok(self)
461    }
462
463    /// Arc clone of ref to original schema
464    pub fn original_schema(&self) -> SchemaRef {
465        Arc::clone(&self.schema)
466    }
467
468    /// Repartition while preserving order.
469    ///
470    /// Returns `Ok(None)` if cannot fulfill the requested repartitioning, such
471    /// as having too few batches to fulfill the `target_partitions` or if unable
472    /// to preserve output ordering.
473    fn repartition_preserving_order(
474        &self,
475        target_partitions: usize,
476        output_ordering: LexOrdering,
477    ) -> Result<Option<Vec<Vec<RecordBatch>>>> {
478        if !self.eq_properties().ordering_satisfy(output_ordering)? {
479            Ok(None)
480        } else {
481            let total_num_batches =
482                self.partitions.iter().map(|b| b.len()).sum::<usize>();
483            if total_num_batches < target_partitions {
484                // no way to create the desired repartitioning
485                return Ok(None);
486            }
487
488            let cnt_to_repartition = target_partitions - self.partitions.len();
489
490            // Label the current partitions and their order.
491            // Such that when we later split up the partitions into smaller sizes, we are maintaining the order.
492            let to_repartition = self
493                .partitions
494                .iter()
495                .enumerate()
496                .map(|(idx, batches)| RePartition {
497                    idx: idx + (cnt_to_repartition * idx), // make space in ordering for split partitions
498                    row_count: batches.iter().map(|batch| batch.num_rows()).sum(),
499                    batches: batches.clone(),
500                })
501                .collect_vec();
502
503            // Put all of the partitions into a heap ordered by `RePartition::partial_cmp`, which sizes
504            // by count of rows.
505            let mut max_heap = BinaryHeap::with_capacity(target_partitions);
506            for rep in to_repartition {
507                max_heap.push(CompareByRowCount(rep));
508            }
509
510            // Split the largest partitions into smaller partitions. Maintaining the output
511            // order of the partitions & newly created partitions.
512            let mut cannot_split_further = Vec::with_capacity(target_partitions);
513            for _ in 0..cnt_to_repartition {
514                // triggers loop for the cnt_to_repartition. So if need another 4 partitions, it attempts to split 4 times.
515                loop {
516                    // Take the largest item off the heap, and attempt to split.
517                    let Some(to_split) = max_heap.pop() else {
518                        // Nothing left to attempt repartition. Break inner loop.
519                        break;
520                    };
521
522                    // Split the partition. The new partitions will be ordered with idx and idx+1.
523                    let mut new_partitions = to_split.into_inner().split();
524                    if new_partitions.len() > 1 {
525                        for new_partition in new_partitions {
526                            max_heap.push(CompareByRowCount(new_partition));
527                        }
528                        // Successful repartition. Break inner loop, and return to outer `cnt_to_repartition` loop.
529                        break;
530                    } else {
531                        cannot_split_further.push(new_partitions.remove(0));
532                    }
533                }
534            }
535            let mut partitions = max_heap
536                .drain()
537                .map(CompareByRowCount::into_inner)
538                .collect_vec();
539            partitions.extend(cannot_split_further);
540
541            // Finally, sort all partitions by the output ordering.
542            // This was the original ordering of the batches within the partition. We are maintaining this ordering.
543            partitions.sort_by_key(|p| p.idx);
544            let partitions = partitions.into_iter().map(|rep| rep.batches).collect_vec();
545
546            Ok(Some(partitions))
547        }
548    }
549
550    /// Repartition into evenly sized chunks (as much as possible without batch splitting),
551    /// disregarding any ordering.
552    ///
553    /// Current implementation uses a first-fit-decreasing bin packing, modified to enable
554    /// us to still return the desired count of `target_partitions`.
555    ///
556    /// Returns `Ok(None)` if cannot fulfill the requested repartitioning, such
557    /// as having too few batches to fulfill the `target_partitions`.
558    fn repartition_evenly_by_size(
559        &self,
560        target_partitions: usize,
561    ) -> Result<Option<Vec<Vec<RecordBatch>>>> {
562        // determine if we have enough total batches to fulfill request
563        let mut flatten_batches =
564            self.partitions.clone().into_iter().flatten().collect_vec();
565        if flatten_batches.len() < target_partitions {
566            return Ok(None);
567        }
568
569        // Take all flattened batches (all in 1 partititon/vec) and divide evenly into the desired number of `target_partitions`.
570        let total_num_rows = flatten_batches.iter().map(|b| b.num_rows()).sum::<usize>();
571        // sort by size, so we pack multiple smaller batches into the same partition
572        flatten_batches.sort_by_key(|b| std::cmp::Reverse(b.num_rows()));
573
574        // Divide.
575        let mut partitions =
576            vec![Vec::with_capacity(flatten_batches.len()); target_partitions];
577        let mut target_partition_size = total_num_rows.div_ceil(target_partitions);
578        let mut total_rows_seen = 0;
579        let mut curr_bin_row_count = 0;
580        let mut idx = 0;
581        for batch in flatten_batches {
582            let row_cnt = batch.num_rows();
583            idx = std::cmp::min(idx, target_partitions - 1);
584
585            partitions[idx].push(batch);
586            curr_bin_row_count += row_cnt;
587            total_rows_seen += row_cnt;
588
589            if curr_bin_row_count >= target_partition_size {
590                idx += 1;
591                curr_bin_row_count = 0;
592
593                // update target_partition_size, to handle very lopsided batch distributions
594                // while still returning the count of `target_partitions`
595                if total_rows_seen < total_num_rows {
596                    target_partition_size = (total_num_rows - total_rows_seen)
597                        .div_ceil(target_partitions - idx);
598                }
599            }
600        }
601
602        Ok(Some(partitions))
603    }
604}
605
606/// For use in repartitioning, track the total size and original partition index.
607///
608/// Do not implement clone, in order to avoid unnecessary copying during repartitioning.
609struct RePartition {
610    /// Original output ordering for the partition.
611    idx: usize,
612    /// Total size of the partition, for use in heap ordering
613    /// (a.k.a. splitting up the largest partitions).
614    row_count: usize,
615    /// A partition containing record batches.
616    batches: Vec<RecordBatch>,
617}
618
619impl RePartition {
620    /// Split [`RePartition`] into 2 pieces, consuming self.
621    ///
622    /// Returns only 1 partition if cannot be split further.
623    fn split(self) -> Vec<Self> {
624        if self.batches.len() == 1 {
625            return vec![self];
626        }
627
628        let new_0 = RePartition {
629            idx: self.idx, // output ordering
630            row_count: 0,
631            batches: vec![],
632        };
633        let new_1 = RePartition {
634            idx: self.idx + 1, // output ordering +1
635            row_count: 0,
636            batches: vec![],
637        };
638        let split_pt = self.row_count / 2;
639
640        let [new_0, new_1] = self.batches.into_iter().fold(
641            [new_0, new_1],
642            |[mut new0, mut new1], batch| {
643                if new0.row_count < split_pt {
644                    new0.add_batch(batch);
645                } else {
646                    new1.add_batch(batch);
647                }
648                [new0, new1]
649            },
650        );
651        vec![new_0, new_1]
652    }
653
654    fn add_batch(&mut self, batch: RecordBatch) {
655        self.row_count += batch.num_rows();
656        self.batches.push(batch);
657    }
658}
659
660impl fmt::Display for RePartition {
661    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
662        write!(
663            f,
664            "{}rows-in-{}batches@{}",
665            self.row_count,
666            self.batches.len(),
667            self.idx
668        )
669    }
670}
671
672struct CompareByRowCount(RePartition);
673impl CompareByRowCount {
674    fn into_inner(self) -> RePartition {
675        self.0
676    }
677}
678impl Ord for CompareByRowCount {
679    fn cmp(&self, other: &Self) -> Ordering {
680        self.0.row_count.cmp(&other.0.row_count)
681    }
682}
683impl PartialOrd for CompareByRowCount {
684    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
685        Some(self.cmp(other))
686    }
687}
688impl PartialEq for CompareByRowCount {
689    fn eq(&self, other: &Self) -> bool {
690        // PartialEq must be consistent with PartialOrd
691        self.cmp(other) == Ordering::Equal
692    }
693}
694impl Eq for CompareByRowCount {}
695impl Deref for CompareByRowCount {
696    type Target = RePartition;
697    fn deref(&self) -> &Self::Target {
698        &self.0
699    }
700}
701
702/// Type alias for partition data
703pub type PartitionData = Arc<RwLock<Vec<RecordBatch>>>;
704
705/// Implements for writing to a [`MemTable`]
706///
707/// [`MemTable`]: <https://docs.rs/datafusion/latest/datafusion/datasource/memory/struct.MemTable.html>
708pub struct MemSink {
709    /// Target locations for writing data
710    batches: Vec<PartitionData>,
711    schema: SchemaRef,
712}
713
714impl Debug for MemSink {
715    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
716        f.debug_struct("MemSink")
717            .field("num_partitions", &self.batches.len())
718            .finish()
719    }
720}
721
722impl DisplayAs for MemSink {
723    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
724        match t {
725            DisplayFormatType::Default | DisplayFormatType::Verbose => {
726                let partition_count = self.batches.len();
727                write!(f, "MemoryTable (partitions={partition_count})")
728            }
729            DisplayFormatType::TreeRender => {
730                // TODO: collect info
731                write!(f, "")
732            }
733        }
734    }
735}
736
737impl MemSink {
738    /// Creates a new [`MemSink`].
739    ///
740    /// The caller is responsible for ensuring that there is at least one partition to insert into.
741    pub fn try_new(batches: Vec<PartitionData>, schema: SchemaRef) -> Result<Self> {
742        if batches.is_empty() {
743            return plan_err!("Cannot insert into MemTable with zero partitions");
744        }
745        Ok(Self { batches, schema })
746    }
747}
748
749#[async_trait]
750impl DataSink for MemSink {
751    fn as_any(&self) -> &dyn Any {
752        self
753    }
754
755    fn schema(&self) -> &SchemaRef {
756        &self.schema
757    }
758
759    async fn write_all(
760        &self,
761        mut data: SendableRecordBatchStream,
762        _context: &Arc<TaskContext>,
763    ) -> Result<u64> {
764        let num_partitions = self.batches.len();
765
766        // buffer up the data round robin style into num_partitions
767
768        let mut new_batches = vec![vec![]; num_partitions];
769        let mut i = 0;
770        let mut row_count = 0;
771        while let Some(batch) = data.next().await.transpose()? {
772            row_count += batch.num_rows();
773            new_batches[i].push(batch);
774            i = (i + 1) % num_partitions;
775        }
776
777        // write the outputs into the batches
778        for (target, mut batches) in self.batches.iter().zip(new_batches.into_iter()) {
779            // Append all the new batches in one go to minimize locking overhead
780            target.write().await.append(&mut batches);
781        }
782
783        Ok(row_count as u64)
784    }
785}
786
787#[cfg(test)]
788mod memory_source_tests {
789    use std::sync::Arc;
790
791    use crate::memory::MemorySourceConfig;
792    use crate::source::DataSourceExec;
793
794    use arrow::compute::SortOptions;
795    use arrow::datatypes::{DataType, Field, Schema};
796    use datafusion_common::Result;
797    use datafusion_physical_expr::expressions::col;
798    use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
799    use datafusion_physical_plan::ExecutionPlan;
800
801    #[test]
802    fn test_memory_order_eq() -> Result<()> {
803        let schema = Arc::new(Schema::new(vec![
804            Field::new("a", DataType::Int64, false),
805            Field::new("b", DataType::Int64, false),
806            Field::new("c", DataType::Int64, false),
807        ]));
808        let sort1: LexOrdering = [
809            PhysicalSortExpr {
810                expr: col("a", &schema)?,
811                options: SortOptions::default(),
812            },
813            PhysicalSortExpr {
814                expr: col("b", &schema)?,
815                options: SortOptions::default(),
816            },
817        ]
818        .into();
819        let sort2: LexOrdering = [PhysicalSortExpr {
820            expr: col("c", &schema)?,
821            options: SortOptions::default(),
822        }]
823        .into();
824        let mut expected_output_order = sort1.clone();
825        expected_output_order.extend(sort2.clone());
826
827        let sort_information = vec![sort1.clone(), sort2.clone()];
828        let mem_exec = DataSourceExec::from_data_source(
829            MemorySourceConfig::try_new(&[vec![]], schema, None)?
830                .try_with_sort_information(sort_information)?,
831        );
832
833        assert_eq!(
834            mem_exec.properties().output_ordering().unwrap(),
835            &expected_output_order
836        );
837        let eq_properties = mem_exec.properties().equivalence_properties();
838        assert!(eq_properties.oeq_class().contains(&sort1));
839        assert!(eq_properties.oeq_class().contains(&sort2));
840        Ok(())
841    }
842}
843
844#[cfg(test)]
845mod tests {
846    use super::*;
847    use crate::test_util::col;
848    use crate::tests::{aggr_test_schema, make_partition};
849
850    use arrow::array::{ArrayRef, Int32Array, Int64Array, StringArray};
851    use arrow::datatypes::{DataType, Field};
852    use datafusion_common::assert_batches_eq;
853    use datafusion_common::stats::{ColumnStatistics, Precision};
854    use datafusion_physical_expr::PhysicalSortExpr;
855    use datafusion_physical_plan::expressions::lit;
856
857    use datafusion_physical_plan::ExecutionPlan;
858    use futures::StreamExt;
859
860    #[tokio::test]
861    async fn exec_with_limit() -> Result<()> {
862        let task_ctx = Arc::new(TaskContext::default());
863        let batch = make_partition(7);
864        let schema = batch.schema();
865        let batches = vec![batch.clone(), batch];
866
867        let exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap();
868        assert_eq!(exec.fetch(), None);
869
870        let exec = exec.with_fetch(Some(4)).unwrap();
871        assert_eq!(exec.fetch(), Some(4));
872
873        let mut it = exec.execute(0, task_ctx)?;
874        let mut results = vec![];
875        while let Some(batch) = it.next().await {
876            results.push(batch?);
877        }
878
879        let expected = [
880            "+---+", "| i |", "+---+", "| 0 |", "| 1 |", "| 2 |", "| 3 |", "+---+",
881        ];
882        assert_batches_eq!(expected, &results);
883        Ok(())
884    }
885
886    #[tokio::test]
887    async fn values_empty_case() -> Result<()> {
888        let schema = aggr_test_schema();
889        let empty = MemorySourceConfig::try_new_as_values(schema, vec![]);
890        assert!(empty.is_err());
891        Ok(())
892    }
893
894    #[test]
895    fn new_exec_with_batches() {
896        let batch = make_partition(7);
897        let schema = batch.schema();
898        let batches = vec![batch.clone(), batch];
899        let _exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap();
900    }
901
902    #[test]
903    fn new_exec_with_batches_empty() {
904        let batch = make_partition(7);
905        let schema = batch.schema();
906        let _ = MemorySourceConfig::try_new_from_batches(schema, Vec::new()).unwrap_err();
907    }
908
909    #[test]
910    fn new_exec_with_batches_invalid_schema() {
911        let batch = make_partition(7);
912        let batches = vec![batch.clone(), batch];
913
914        let invalid_schema = Arc::new(Schema::new(vec![
915            Field::new("col0", DataType::UInt32, false),
916            Field::new("col1", DataType::Utf8, false),
917        ]));
918        let _ = MemorySourceConfig::try_new_from_batches(invalid_schema, batches)
919            .unwrap_err();
920    }
921
922    // Test issue: https://github.com/apache/datafusion/issues/8763
923    #[test]
924    fn new_exec_with_non_nullable_schema() {
925        let schema = Arc::new(Schema::new(vec![Field::new(
926            "col0",
927            DataType::UInt32,
928            false,
929        )]));
930        let _ = MemorySourceConfig::try_new_as_values(
931            Arc::clone(&schema),
932            vec![vec![lit(1u32)]],
933        )
934        .unwrap();
935        // Test that a null value is rejected
936        let _ = MemorySourceConfig::try_new_as_values(
937            schema,
938            vec![vec![lit(ScalarValue::UInt32(None))]],
939        )
940        .unwrap_err();
941    }
942
943    #[test]
944    fn values_stats_with_nulls_only() -> Result<()> {
945        let data = vec![
946            vec![lit(ScalarValue::Null)],
947            vec![lit(ScalarValue::Null)],
948            vec![lit(ScalarValue::Null)],
949        ];
950        let rows = data.len();
951        let schema =
952            Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)]));
953        let values = MemorySourceConfig::try_new_as_values(schema, data)?;
954
955        assert_eq!(
956            values.partition_statistics(None)?,
957            Statistics {
958                num_rows: Precision::Exact(rows),
959                total_byte_size: Precision::Exact(8), // not important
960                column_statistics: vec![ColumnStatistics {
961                    null_count: Precision::Exact(rows), // there are only nulls
962                    distinct_count: Precision::Absent,
963                    max_value: Precision::Absent,
964                    min_value: Precision::Absent,
965                    sum_value: Precision::Absent,
966                    byte_size: Precision::Absent,
967                },],
968            }
969        );
970
971        Ok(())
972    }
973
974    fn batch(row_size: usize) -> RecordBatch {
975        let a: ArrayRef = Arc::new(Int32Array::from(vec![1; row_size]));
976        let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("foo"); row_size]));
977        let c: ArrayRef = Arc::new(Int64Array::from_iter(vec![1; row_size]));
978        RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
979    }
980
981    fn schema() -> SchemaRef {
982        batch(1).schema()
983    }
984
985    fn memorysrcconfig_no_partitions(
986        sort_information: Vec<LexOrdering>,
987    ) -> Result<MemorySourceConfig> {
988        let partitions = vec![];
989        MemorySourceConfig::try_new(&partitions, schema(), None)?
990            .try_with_sort_information(sort_information)
991    }
992
993    fn memorysrcconfig_1_partition_1_batch(
994        sort_information: Vec<LexOrdering>,
995    ) -> Result<MemorySourceConfig> {
996        let partitions = vec![vec![batch(100)]];
997        MemorySourceConfig::try_new(&partitions, schema(), None)?
998            .try_with_sort_information(sort_information)
999    }
1000
1001    fn memorysrcconfig_3_partitions_1_batch_each(
1002        sort_information: Vec<LexOrdering>,
1003    ) -> Result<MemorySourceConfig> {
1004        let partitions = vec![vec![batch(100)], vec![batch(100)], vec![batch(100)]];
1005        MemorySourceConfig::try_new(&partitions, schema(), None)?
1006            .try_with_sort_information(sort_information)
1007    }
1008
1009    fn memorysrcconfig_3_partitions_with_2_batches_each(
1010        sort_information: Vec<LexOrdering>,
1011    ) -> Result<MemorySourceConfig> {
1012        let partitions = vec![
1013            vec![batch(100), batch(100)],
1014            vec![batch(100), batch(100)],
1015            vec![batch(100), batch(100)],
1016        ];
1017        MemorySourceConfig::try_new(&partitions, schema(), None)?
1018            .try_with_sort_information(sort_information)
1019    }
1020
1021    /// Batches of different sizes, with batches ordered by size (100_000, 10_000, 100, 1)
1022    /// in the Memtable partition (a.k.a. vector of batches).
1023    fn memorysrcconfig_1_partition_with_different_sized_batches(
1024        sort_information: Vec<LexOrdering>,
1025    ) -> Result<MemorySourceConfig> {
1026        let partitions = vec![vec![batch(100_000), batch(10_000), batch(100), batch(1)]];
1027        MemorySourceConfig::try_new(&partitions, schema(), None)?
1028            .try_with_sort_information(sort_information)
1029    }
1030
1031    /// Same as [`memorysrcconfig_1_partition_with_different_sized_batches`],
1032    /// but the batches are ordered differently (not by size)
1033    /// in the Memtable partition (a.k.a. vector of batches).
1034    fn memorysrcconfig_1_partition_with_ordering_not_matching_size(
1035        sort_information: Vec<LexOrdering>,
1036    ) -> Result<MemorySourceConfig> {
1037        let partitions = vec![vec![batch(100_000), batch(1), batch(100), batch(10_000)]];
1038        MemorySourceConfig::try_new(&partitions, schema(), None)?
1039            .try_with_sort_information(sort_information)
1040    }
1041
1042    fn memorysrcconfig_2_partition_with_different_sized_batches(
1043        sort_information: Vec<LexOrdering>,
1044    ) -> Result<MemorySourceConfig> {
1045        let partitions = vec![
1046            vec![batch(100_000), batch(10_000), batch(1_000)],
1047            vec![batch(2_000), batch(20)],
1048        ];
1049        MemorySourceConfig::try_new(&partitions, schema(), None)?
1050            .try_with_sort_information(sort_information)
1051    }
1052
1053    fn memorysrcconfig_2_partition_with_extreme_sized_batches(
1054        sort_information: Vec<LexOrdering>,
1055    ) -> Result<MemorySourceConfig> {
1056        let partitions = vec![
1057            vec![
1058                batch(100_000),
1059                batch(1),
1060                batch(1),
1061                batch(1),
1062                batch(1),
1063                batch(0),
1064            ],
1065            vec![batch(1), batch(1), batch(1), batch(1), batch(0), batch(100)],
1066        ];
1067        MemorySourceConfig::try_new(&partitions, schema(), None)?
1068            .try_with_sort_information(sort_information)
1069    }
1070
1071    /// Assert that we get the expected count of partitions after repartitioning.
1072    ///
1073    /// If None, then we expected the [`DataSource::repartitioned`] to return None.
1074    fn assert_partitioning(
1075        partitioned_datasrc: Option<Arc<dyn DataSource>>,
1076        partition_cnt: Option<usize>,
1077    ) {
1078        let should_exist = if let Some(partition_cnt) = partition_cnt {
1079            format!("new datasource should exist and have {partition_cnt:?} partitions")
1080        } else {
1081            "new datasource should not exist".into()
1082        };
1083
1084        let actual = partitioned_datasrc
1085            .map(|datasrc| datasrc.output_partitioning().partition_count());
1086        assert_eq!(
1087            actual, partition_cnt,
1088            "partitioned datasrc does not match expected, we expected {should_exist}, instead found {actual:?}"
1089        );
1090    }
1091
1092    fn run_all_test_scenarios(
1093        output_ordering: Option<LexOrdering>,
1094        sort_information_on_config: Vec<LexOrdering>,
1095    ) -> Result<()> {
1096        let not_used = usize::MAX;
1097
1098        // src has no partitions
1099        let mem_src_config =
1100            memorysrcconfig_no_partitions(sort_information_on_config.clone())?;
1101        let partitioned_datasrc =
1102            mem_src_config.repartitioned(1, not_used, output_ordering.clone())?;
1103        assert_partitioning(partitioned_datasrc, None);
1104
1105        // src has partitions == target partitions (=1)
1106        let target_partitions = 1;
1107        let mem_src_config =
1108            memorysrcconfig_1_partition_1_batch(sort_information_on_config.clone())?;
1109        let partitioned_datasrc = mem_src_config.repartitioned(
1110            target_partitions,
1111            not_used,
1112            output_ordering.clone(),
1113        )?;
1114        assert_partitioning(partitioned_datasrc, None);
1115
1116        // src has partitions == target partitions (=3)
1117        let target_partitions = 3;
1118        let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
1119            sort_information_on_config.clone(),
1120        )?;
1121        let partitioned_datasrc = mem_src_config.repartitioned(
1122            target_partitions,
1123            not_used,
1124            output_ordering.clone(),
1125        )?;
1126        assert_partitioning(partitioned_datasrc, None);
1127
1128        // src has partitions > target partitions, but we don't merge them
1129        let target_partitions = 2;
1130        let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
1131            sort_information_on_config.clone(),
1132        )?;
1133        let partitioned_datasrc = mem_src_config.repartitioned(
1134            target_partitions,
1135            not_used,
1136            output_ordering.clone(),
1137        )?;
1138        assert_partitioning(partitioned_datasrc, None);
1139
1140        // src has partitions < target partitions, but not enough batches (per partition) to split into more partitions
1141        let target_partitions = 4;
1142        let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
1143            sort_information_on_config.clone(),
1144        )?;
1145        let partitioned_datasrc = mem_src_config.repartitioned(
1146            target_partitions,
1147            not_used,
1148            output_ordering.clone(),
1149        )?;
1150        assert_partitioning(partitioned_datasrc, None);
1151
1152        // src has partitions < target partitions, and can split to sufficient amount
1153        // has 6 batches across 3 partitions. Will need to split 2 of it's partitions.
1154        let target_partitions = 5;
1155        let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
1156            sort_information_on_config.clone(),
1157        )?;
1158        let partitioned_datasrc = mem_src_config.repartitioned(
1159            target_partitions,
1160            not_used,
1161            output_ordering.clone(),
1162        )?;
1163        assert_partitioning(partitioned_datasrc, Some(5));
1164
1165        // src has partitions < target partitions, and can split to sufficient amount
1166        // has 6 batches across 3 partitions. Will need to split all of it's partitions.
1167        let target_partitions = 6;
1168        let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
1169            sort_information_on_config.clone(),
1170        )?;
1171        let partitioned_datasrc = mem_src_config.repartitioned(
1172            target_partitions,
1173            not_used,
1174            output_ordering.clone(),
1175        )?;
1176        assert_partitioning(partitioned_datasrc, Some(6));
1177
1178        // src has partitions < target partitions, but not enough total batches to fulfill the split (desired target_partitions)
1179        let target_partitions = 3 * 2 + 1;
1180        let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
1181            sort_information_on_config.clone(),
1182        )?;
1183        let partitioned_datasrc = mem_src_config.repartitioned(
1184            target_partitions,
1185            not_used,
1186            output_ordering.clone(),
1187        )?;
1188        assert_partitioning(partitioned_datasrc, None);
1189
1190        // src has 1 partition with many batches of lopsided sizes
1191        // make sure it handles the split properly
1192        let target_partitions = 2;
1193        let mem_src_config = memorysrcconfig_1_partition_with_different_sized_batches(
1194            sort_information_on_config,
1195        )?;
1196        let partitioned_datasrc = mem_src_config.clone().repartitioned(
1197            target_partitions,
1198            not_used,
1199            output_ordering,
1200        )?;
1201        assert_partitioning(partitioned_datasrc.clone(), Some(2));
1202        // Starting = batch(100_000), batch(10_000), batch(100), batch(1).
1203        // It should have split as p1=batch(100_000), p2=[batch(10_000), batch(100), batch(1)]
1204        let partitioned_datasrc = partitioned_datasrc.unwrap();
1205        let Some(mem_src_config) = partitioned_datasrc
1206            .as_any()
1207            .downcast_ref::<MemorySourceConfig>()
1208        else {
1209            unreachable!()
1210        };
1211        let repartitioned_raw_batches = mem_src_config.partitions.clone();
1212        assert_eq!(repartitioned_raw_batches.len(), 2);
1213        let [ref p1, ref p2] = repartitioned_raw_batches[..] else {
1214            unreachable!()
1215        };
1216        // p1=batch(100_000)
1217        assert_eq!(p1.len(), 1);
1218        assert_eq!(p1[0].num_rows(), 100_000);
1219        // p2=[batch(10_000), batch(100), batch(1)]
1220        assert_eq!(p2.len(), 3);
1221        assert_eq!(p2[0].num_rows(), 10_000);
1222        assert_eq!(p2[1].num_rows(), 100);
1223        assert_eq!(p2[2].num_rows(), 1);
1224
1225        Ok(())
1226    }
1227
1228    #[test]
1229    fn test_repartition_no_sort_information_no_output_ordering() -> Result<()> {
1230        let no_sort = vec![];
1231        let no_output_ordering = None;
1232
1233        // Test: common set of functionality
1234        run_all_test_scenarios(no_output_ordering.clone(), no_sort.clone())?;
1235
1236        // Test: how no-sort-order divides differently.
1237        //    * does not preserve separate partitions (with own internal ordering) on even split,
1238        //    * nor does it preserve ordering (re-orders batch(2_000) vs batch(1_000)).
1239        let target_partitions = 3;
1240        let mem_src_config =
1241            memorysrcconfig_2_partition_with_different_sized_batches(no_sort)?;
1242        let partitioned_datasrc = mem_src_config.clone().repartitioned(
1243            target_partitions,
1244            usize::MAX,
1245            no_output_ordering,
1246        )?;
1247        assert_partitioning(partitioned_datasrc.clone(), Some(3));
1248        // Starting = batch(100_000), batch(10_000), batch(1_000), batch(2_000), batch(20)
1249        // It should have split as p1=batch(100_000), p2=batch(10_000),  p3=rest(mixed across original partitions)
1250        let repartitioned_raw_batches = mem_src_config
1251            .repartition_evenly_by_size(target_partitions)?
1252            .unwrap();
1253        assert_eq!(repartitioned_raw_batches.len(), 3);
1254        let [ref p1, ref p2, ref p3] = repartitioned_raw_batches[..] else {
1255            unreachable!()
1256        };
1257        // p1=batch(100_000)
1258        assert_eq!(p1.len(), 1);
1259        assert_eq!(p1[0].num_rows(), 100_000);
1260        // p2=batch(10_000)
1261        assert_eq!(p2.len(), 1);
1262        assert_eq!(p2[0].num_rows(), 10_000);
1263        // p3= batch(2_000), batch(1_000), batch(20)
1264        assert_eq!(p3.len(), 3);
1265        assert_eq!(p3[0].num_rows(), 2_000);
1266        assert_eq!(p3[1].num_rows(), 1_000);
1267        assert_eq!(p3[2].num_rows(), 20);
1268
1269        Ok(())
1270    }
1271
1272    #[test]
1273    fn test_repartition_no_sort_information_no_output_ordering_lopsized_batches()
1274    -> Result<()> {
1275        let no_sort = vec![];
1276        let no_output_ordering = None;
1277
1278        // Test: case has two input partitions:
1279        //     b(100_000), b(1), b(1), b(1), b(1), b(0)
1280        //     b(1), b(1), b(1), b(1), b(0), b(100)
1281        //
1282        // We want an output with target_partitions=5, which means the ideal division is:
1283        //     b(100_000)
1284        //     b(100)
1285        //     b(1), b(1), b(1)
1286        //     b(1), b(1), b(1)
1287        //     b(1), b(1), b(0)
1288        let target_partitions = 5;
1289        let mem_src_config =
1290            memorysrcconfig_2_partition_with_extreme_sized_batches(no_sort)?;
1291        let partitioned_datasrc = mem_src_config.clone().repartitioned(
1292            target_partitions,
1293            usize::MAX,
1294            no_output_ordering,
1295        )?;
1296        assert_partitioning(partitioned_datasrc.clone(), Some(5));
1297        // Starting partition 1 = batch(100_000), batch(1), batch(1), batch(1), batch(1), batch(0)
1298        // Starting partition 1 = batch(1), batch(1), batch(1), batch(1), batch(0), batch(100)
1299        // It should have split as p1=batch(100_000), p2=batch(100), p3=[batch(1),batch(1)], p4=[batch(1),batch(1)], p5=[batch(1),batch(1),batch(0),batch(0)]
1300        let repartitioned_raw_batches = mem_src_config
1301            .repartition_evenly_by_size(target_partitions)?
1302            .unwrap();
1303        assert_eq!(repartitioned_raw_batches.len(), 5);
1304        let [ref p1, ref p2, ref p3, ref p4, ref p5] = repartitioned_raw_batches[..]
1305        else {
1306            unreachable!()
1307        };
1308        // p1=batch(100_000)
1309        assert_eq!(p1.len(), 1);
1310        assert_eq!(p1[0].num_rows(), 100_000);
1311        // p2=batch(100)
1312        assert_eq!(p2.len(), 1);
1313        assert_eq!(p2[0].num_rows(), 100);
1314        // p3=[batch(1),batch(1),batch(1)]
1315        assert_eq!(p3.len(), 3);
1316        assert_eq!(p3[0].num_rows(), 1);
1317        assert_eq!(p3[1].num_rows(), 1);
1318        assert_eq!(p3[2].num_rows(), 1);
1319        // p4=[batch(1),batch(1),batch(1)]
1320        assert_eq!(p4.len(), 3);
1321        assert_eq!(p4[0].num_rows(), 1);
1322        assert_eq!(p4[1].num_rows(), 1);
1323        assert_eq!(p4[2].num_rows(), 1);
1324        // p5=[batch(1),batch(1),batch(0),batch(0)]
1325        assert_eq!(p5.len(), 4);
1326        assert_eq!(p5[0].num_rows(), 1);
1327        assert_eq!(p5[1].num_rows(), 1);
1328        assert_eq!(p5[2].num_rows(), 0);
1329        assert_eq!(p5[3].num_rows(), 0);
1330
1331        Ok(())
1332    }
1333
1334    #[test]
1335    fn test_repartition_with_sort_information() -> Result<()> {
1336        let schema = schema();
1337        let sort_key: LexOrdering =
1338            [PhysicalSortExpr::new_default(col("c", &schema)?)].into();
1339        let has_sort = vec![sort_key.clone()];
1340        let output_ordering = Some(sort_key);
1341
1342        // Test: common set of functionality
1343        run_all_test_scenarios(output_ordering.clone(), has_sort.clone())?;
1344
1345        // Test: DOES preserve separate partitions (with own internal ordering)
1346        let target_partitions = 3;
1347        let mem_src_config =
1348            memorysrcconfig_2_partition_with_different_sized_batches(has_sort)?;
1349        let partitioned_datasrc = mem_src_config.clone().repartitioned(
1350            target_partitions,
1351            usize::MAX,
1352            output_ordering.clone(),
1353        )?;
1354        assert_partitioning(partitioned_datasrc.clone(), Some(3));
1355        // Starting = batch(100_000), batch(10_000), batch(1_000), batch(2_000), batch(20)
1356        // It should have split as p1=batch(100_000), p2=[batch(10_000),batch(1_000)],  p3=<other_partition>
1357        let Some(output_ord) = output_ordering else {
1358            unreachable!()
1359        };
1360        let repartitioned_raw_batches = mem_src_config
1361            .repartition_preserving_order(target_partitions, output_ord)?
1362            .unwrap();
1363        assert_eq!(repartitioned_raw_batches.len(), 3);
1364        let [ref p1, ref p2, ref p3] = repartitioned_raw_batches[..] else {
1365            unreachable!()
1366        };
1367        // p1=batch(100_000)
1368        assert_eq!(p1.len(), 1);
1369        assert_eq!(p1[0].num_rows(), 100_000);
1370        // p2=[batch(10_000),batch(1_000)]
1371        assert_eq!(p2.len(), 2);
1372        assert_eq!(p2[0].num_rows(), 10_000);
1373        assert_eq!(p2[1].num_rows(), 1_000);
1374        // p3=batch(2_000), batch(20)
1375        assert_eq!(p3.len(), 2);
1376        assert_eq!(p3[0].num_rows(), 2_000);
1377        assert_eq!(p3[1].num_rows(), 20);
1378
1379        Ok(())
1380    }
1381
1382    #[test]
1383    fn test_repartition_with_batch_ordering_not_matching_sizing() -> Result<()> {
1384        let schema = schema();
1385        let sort_key: LexOrdering =
1386            [PhysicalSortExpr::new_default(col("c", &schema)?)].into();
1387        let has_sort = vec![sort_key.clone()];
1388        let output_ordering = Some(sort_key);
1389
1390        // src has 1 partition with many batches of lopsided sizes
1391        // note that the input vector of batches are not ordered by decreasing size
1392        let target_partitions = 2;
1393        let mem_src_config =
1394            memorysrcconfig_1_partition_with_ordering_not_matching_size(has_sort)?;
1395        let partitioned_datasrc = mem_src_config.clone().repartitioned(
1396            target_partitions,
1397            usize::MAX,
1398            output_ordering,
1399        )?;
1400        assert_partitioning(partitioned_datasrc.clone(), Some(2));
1401        // Starting = batch(100_000), batch(1), batch(100), batch(10_000).
1402        // It should have split as p1=batch(100_000), p2=[batch(1), batch(100), batch(10_000)]
1403        let partitioned_datasrc = partitioned_datasrc.unwrap();
1404        let Some(mem_src_config) = partitioned_datasrc
1405            .as_any()
1406            .downcast_ref::<MemorySourceConfig>()
1407        else {
1408            unreachable!()
1409        };
1410        let repartitioned_raw_batches = mem_src_config.partitions.clone();
1411        assert_eq!(repartitioned_raw_batches.len(), 2);
1412        let [ref p1, ref p2] = repartitioned_raw_batches[..] else {
1413            unreachable!()
1414        };
1415        // p1=batch(100_000)
1416        assert_eq!(p1.len(), 1);
1417        assert_eq!(p1[0].num_rows(), 100_000);
1418        // p2=[batch(1), batch(100), batch(10_000)] -- **this is preserving the partition order**
1419        assert_eq!(p2.len(), 3);
1420        assert_eq!(p2[0].num_rows(), 1);
1421        assert_eq!(p2[1].num_rows(), 100);
1422        assert_eq!(p2[2].num_rows(), 10_000);
1423
1424        Ok(())
1425    }
1426}