Skip to main content

datafusion_datasource/
memory.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::cmp::Ordering;
19use std::collections::BinaryHeap;
20use std::fmt;
21use std::fmt::Debug;
22use std::ops::Deref;
23use std::slice::from_ref;
24use std::sync::Arc;
25
26use crate::sink::DataSink;
27use crate::source::{DataSource, DataSourceExec};
28
29use arrow::array::{RecordBatch, RecordBatchOptions};
30use arrow::datatypes::{Schema, SchemaRef};
31use datafusion_common::{
32    Result, ScalarValue, assert_or_internal_err, plan_err, project_schema,
33};
34use datafusion_execution::TaskContext;
35use datafusion_physical_expr::equivalence::project_orderings;
36use datafusion_physical_expr::projection::ProjectionExprs;
37use datafusion_physical_expr::utils::collect_columns;
38use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
39use datafusion_physical_plan::memory::MemoryStream;
40use datafusion_physical_plan::projection::{
41    all_alias_free_columns, new_projections_for_columns,
42};
43use datafusion_physical_plan::{
44    ColumnarValue, DisplayAs, DisplayFormatType, Partitioning, PhysicalExpr,
45    SendableRecordBatchStream, Statistics, common,
46};
47
48use async_trait::async_trait;
49use datafusion_physical_plan::coop::cooperative;
50use datafusion_physical_plan::execution_plan::SchedulingType;
51use futures::StreamExt;
52use itertools::Itertools;
53use tokio::sync::RwLock;
54
55/// Data source configuration for reading in-memory batches of data
56#[derive(Clone, Debug)]
57pub struct MemorySourceConfig {
58    /// The partitions to query.
59    ///
60    /// Each partition is a `Vec<RecordBatch>`.
61    partitions: Vec<Vec<RecordBatch>>,
62    /// Schema representing the data before projection
63    schema: SchemaRef,
64    /// Schema representing the data after the optional projection is applied
65    projected_schema: SchemaRef,
66    /// Optional projection
67    projection: Option<Vec<usize>>,
68    /// Sort information: one or more equivalent orderings
69    sort_information: Vec<LexOrdering>,
70    /// if partition sizes should be displayed
71    show_sizes: bool,
72    /// The maximum number of records to read from this plan. If `None`,
73    /// all records after filtering are returned.
74    fetch: Option<usize>,
75}
76
77impl DataSource for MemorySourceConfig {
78    fn open(
79        &self,
80        partition: usize,
81        _context: Arc<TaskContext>,
82    ) -> Result<SendableRecordBatchStream> {
83        Ok(Box::pin(cooperative(
84            MemoryStream::try_new(
85                self.partitions[partition].clone(),
86                Arc::clone(&self.projected_schema),
87                self.projection.clone(),
88            )?
89            .with_fetch(self.fetch),
90        )))
91    }
92
93    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
94        match t {
95            DisplayFormatType::Default | DisplayFormatType::Verbose => {
96                let partition_sizes: Vec<_> =
97                    self.partitions.iter().map(|b| b.len()).collect();
98
99                let output_ordering = self
100                    .sort_information
101                    .first()
102                    .map(|output_ordering| format!(", output_ordering={output_ordering}"))
103                    .unwrap_or_default();
104
105                let eq_properties = self.eq_properties();
106                let constraints = eq_properties.constraints();
107                let constraints = if constraints.is_empty() {
108                    String::new()
109                } else {
110                    format!(", {constraints}")
111                };
112
113                let limit = self
114                    .fetch
115                    .map_or(String::new(), |limit| format!(", fetch={limit}"));
116                if self.show_sizes {
117                    write!(
118                        f,
119                        "partitions={}, partition_sizes={partition_sizes:?}{limit}{output_ordering}{constraints}",
120                        partition_sizes.len(),
121                    )
122                } else {
123                    write!(
124                        f,
125                        "partitions={}{limit}{output_ordering}{constraints}",
126                        partition_sizes.len(),
127                    )
128                }
129            }
130            DisplayFormatType::TreeRender => {
131                let total_rows = self.partitions.iter().map(|b| b.len()).sum::<usize>();
132                let total_bytes: usize = self
133                    .partitions
134                    .iter()
135                    .flatten()
136                    .map(|batch| batch.get_array_memory_size())
137                    .sum();
138                writeln!(f, "format=memory")?;
139                writeln!(f, "rows={total_rows}")?;
140                writeln!(f, "bytes={total_bytes}")?;
141                Ok(())
142            }
143        }
144    }
145
146    /// If possible, redistribute batches across partitions according to their size.
147    ///
148    /// Returns `Ok(None)` if unable to repartition. Preserve output ordering if exists.
149    /// Refer to [`DataSource::repartitioned`] for further details.
150    fn repartitioned(
151        &self,
152        target_partitions: usize,
153        _repartition_file_min_size: usize,
154        output_ordering: Option<LexOrdering>,
155    ) -> Result<Option<Arc<dyn DataSource>>> {
156        if self.partitions.is_empty() || self.partitions.len() >= target_partitions
157        // if have no partitions, or already have more partitions than desired, do not repartition
158        {
159            return Ok(None);
160        }
161
162        let maybe_repartitioned = if let Some(output_ordering) = output_ordering {
163            self.repartition_preserving_order(target_partitions, output_ordering)?
164        } else {
165            self.repartition_evenly_by_size(target_partitions)?
166        };
167
168        if let Some(repartitioned) = maybe_repartitioned {
169            Ok(Some(Arc::new(Self::try_new(
170                &repartitioned,
171                self.original_schema(),
172                self.projection.clone(),
173            )?)))
174        } else {
175            Ok(None)
176        }
177    }
178
179    fn output_partitioning(&self) -> Partitioning {
180        Partitioning::UnknownPartitioning(self.partitions.len())
181    }
182
183    fn eq_properties(&self) -> EquivalenceProperties {
184        EquivalenceProperties::new_with_orderings(
185            Arc::clone(&self.projected_schema),
186            self.sort_information.clone(),
187        )
188    }
189
190    fn scheduling_type(&self) -> SchedulingType {
191        SchedulingType::Cooperative
192    }
193
194    fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
195        if let Some(partition) = partition {
196            // Compute statistics for a specific partition
197            if let Some(batches) = self.partitions.get(partition) {
198                Ok(Arc::new(common::compute_record_batch_statistics(
199                    from_ref(batches),
200                    &self.schema,
201                    self.projection.clone(),
202                )))
203            } else {
204                // Invalid partition index
205                Ok(Arc::new(Statistics::new_unknown(&self.projected_schema)))
206            }
207        } else {
208            // Compute statistics across all partitions
209            Ok(Arc::new(common::compute_record_batch_statistics(
210                &self.partitions,
211                &self.schema,
212                self.projection.clone(),
213            )))
214        }
215    }
216
217    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
218        let source = self.clone();
219        Some(Arc::new(source.with_limit(limit)))
220    }
221
222    fn fetch(&self) -> Option<usize> {
223        self.fetch
224    }
225
226    fn try_swapping_with_projection(
227        &self,
228        projection: &ProjectionExprs,
229    ) -> Result<Option<Arc<dyn DataSource>>> {
230        // If there is any non-column or alias-carrier expression, Projection should not be removed.
231        // This process can be moved into MemoryExec, but it would be an overlap of their responsibility.
232        let exprs = projection.iter().cloned().collect_vec();
233        all_alias_free_columns(exprs.as_slice())
234            .then(|| {
235                let all_projections = (0..self.schema.fields().len()).collect();
236                let new_projections = new_projections_for_columns(
237                    &exprs,
238                    self.projection().as_ref().unwrap_or(&all_projections),
239                );
240                let projected_schema =
241                    project_schema(&self.schema, Some(&new_projections));
242
243                projected_schema.map(|projected_schema| {
244                    // Clone self to preserve all metadata (fetch, sort_information,
245                    // show_sizes, etc.) then update only the projection-related fields.
246                    let mut new_source = self.clone();
247                    new_source.projection = Some(new_projections);
248                    new_source.projected_schema = projected_schema;
249                    // Project sort information to match the new projection
250                    new_source.sort_information = project_orderings(
251                        &new_source.sort_information,
252                        &new_source.projected_schema,
253                    );
254                    Arc::new(new_source) as Arc<dyn DataSource>
255                })
256            })
257            .transpose()
258    }
259}
260
261impl MemorySourceConfig {
262    /// Create a new `MemorySourceConfig` for reading in-memory record batches
263    /// The provided `schema` should not have the projection applied.
264    pub fn try_new(
265        partitions: &[Vec<RecordBatch>],
266        schema: SchemaRef,
267        projection: Option<Vec<usize>>,
268    ) -> Result<Self> {
269        let projected_schema = project_schema(&schema, projection.as_ref())?;
270        Ok(Self {
271            partitions: partitions.to_vec(),
272            schema,
273            projected_schema,
274            projection,
275            sort_information: vec![],
276            show_sizes: true,
277            fetch: None,
278        })
279    }
280
281    /// Create a new `DataSourceExec` plan for reading in-memory record batches
282    /// The provided `schema` should not have the projection applied.
283    pub fn try_new_exec(
284        partitions: &[Vec<RecordBatch>],
285        schema: SchemaRef,
286        projection: Option<Vec<usize>>,
287    ) -> Result<Arc<DataSourceExec>> {
288        let source = Self::try_new(partitions, schema, projection)?;
289        Ok(DataSourceExec::from_data_source(source))
290    }
291
292    /// Create a new execution plan from a list of constant values (`ValuesExec`)
293    #[expect(clippy::needless_pass_by_value)]
294    pub fn try_new_as_values(
295        schema: SchemaRef,
296        data: Vec<Vec<Arc<dyn PhysicalExpr>>>,
297    ) -> Result<Arc<DataSourceExec>> {
298        if data.is_empty() {
299            return plan_err!("Values list cannot be empty");
300        }
301
302        let n_row = data.len();
303        let n_col = schema.fields().len();
304
305        // We have this single row batch as a placeholder to satisfy evaluation argument
306        // and generate a single output row
307        let placeholder_schema = Arc::new(Schema::empty());
308        let placeholder_batch = RecordBatch::try_new_with_options(
309            Arc::clone(&placeholder_schema),
310            vec![],
311            &RecordBatchOptions::new().with_row_count(Some(1)),
312        )?;
313
314        // Evaluate each column
315        let arrays = (0..n_col)
316            .map(|j| {
317                (0..n_row)
318                    .map(|i| {
319                        let expr = &data[i][j];
320                        let result = expr.evaluate(&placeholder_batch)?;
321
322                        match result {
323                            ColumnarValue::Scalar(scalar) => Ok(scalar),
324                            ColumnarValue::Array(array) if array.len() == 1 => {
325                                ScalarValue::try_from_array(&array, 0)
326                            }
327                            ColumnarValue::Array(_) => {
328                                plan_err!("Cannot have array values in a values list")
329                            }
330                        }
331                    })
332                    .collect::<Result<Vec<_>>>()
333                    .and_then(ScalarValue::iter_to_array)
334            })
335            .collect::<Result<Vec<_>>>()?;
336
337        let batch = RecordBatch::try_new_with_options(
338            Arc::clone(&schema),
339            arrays,
340            &RecordBatchOptions::new().with_row_count(Some(n_row)),
341        )?;
342
343        let partitions = vec![batch];
344        Self::try_new_from_batches(Arc::clone(&schema), partitions)
345    }
346
347    /// Create a new plan using the provided schema and batches.
348    ///
349    /// Errors if any of the batches don't match the provided schema, or if no
350    /// batches are provided.
351    #[expect(clippy::needless_pass_by_value)]
352    pub fn try_new_from_batches(
353        schema: SchemaRef,
354        batches: Vec<RecordBatch>,
355    ) -> Result<Arc<DataSourceExec>> {
356        if batches.is_empty() {
357            return plan_err!("Values list cannot be empty");
358        }
359
360        for batch in &batches {
361            let batch_schema = batch.schema();
362            if batch_schema != schema {
363                return plan_err!(
364                    "Batch has invalid schema. Expected: {}, got: {}",
365                    schema,
366                    batch_schema
367                );
368            }
369        }
370
371        let partitions = vec![batches];
372        let source = Self {
373            partitions,
374            schema: Arc::clone(&schema),
375            projected_schema: Arc::clone(&schema),
376            projection: None,
377            sort_information: vec![],
378            show_sizes: true,
379            fetch: None,
380        };
381        Ok(DataSourceExec::from_data_source(source))
382    }
383
384    /// Set the limit of the files
385    pub fn with_limit(mut self, limit: Option<usize>) -> Self {
386        self.fetch = limit;
387        self
388    }
389
390    /// Set `show_sizes` to determine whether to display partition sizes
391    pub fn with_show_sizes(mut self, show_sizes: bool) -> Self {
392        self.show_sizes = show_sizes;
393        self
394    }
395
396    /// Ref to partitions
397    pub fn partitions(&self) -> &[Vec<RecordBatch>] {
398        &self.partitions
399    }
400
401    /// Ref to projection
402    pub fn projection(&self) -> &Option<Vec<usize>> {
403        &self.projection
404    }
405
406    /// Show sizes
407    pub fn show_sizes(&self) -> bool {
408        self.show_sizes
409    }
410
411    /// Ref to sort information
412    pub fn sort_information(&self) -> &[LexOrdering] {
413        &self.sort_information
414    }
415
416    /// A memory table can be ordered by multiple expressions simultaneously.
417    /// [`EquivalenceProperties`] keeps track of expressions that describe the
418    /// global ordering of the schema. These columns are not necessarily same; e.g.
419    /// ```text
420    /// ┌-------┐
421    /// | a | b |
422    /// |---|---|
423    /// | 1 | 9 |
424    /// | 2 | 8 |
425    /// | 3 | 7 |
426    /// | 5 | 5 |
427    /// └---┴---┘
428    /// ```
429    /// where both `a ASC` and `b DESC` can describe the table ordering. With
430    /// [`EquivalenceProperties`], we can keep track of these equivalences
431    /// and treat `a ASC` and `b DESC` as the same ordering requirement.
432    ///
433    /// Note that if there is an internal projection, that projection will be
434    /// also applied to the given `sort_information`.
435    pub fn try_with_sort_information(
436        mut self,
437        mut sort_information: Vec<LexOrdering>,
438    ) -> Result<Self> {
439        // All sort expressions must refer to the original schema
440        let fields = self.schema.fields();
441        let ambiguous_column = sort_information
442            .iter()
443            .flat_map(|ordering| ordering.clone())
444            .flat_map(|expr| collect_columns(&expr.expr))
445            .find(|col| {
446                fields
447                    .get(col.index())
448                    .map(|field| field.name() != col.name())
449                    .unwrap_or(true)
450            });
451        assert_or_internal_err!(
452            ambiguous_column.is_none(),
453            "Column {:?} is not found in the original schema of the MemorySourceConfig",
454            ambiguous_column.as_ref().unwrap()
455        );
456
457        // If there is a projection on the source, we also need to project orderings
458        if self.projection.is_some() {
459            sort_information =
460                project_orderings(&sort_information, &self.projected_schema);
461        }
462
463        self.sort_information = sort_information;
464        Ok(self)
465    }
466
467    /// Arc clone of ref to original schema
468    pub fn original_schema(&self) -> SchemaRef {
469        Arc::clone(&self.schema)
470    }
471
472    /// Repartition while preserving order.
473    ///
474    /// Returns `Ok(None)` if cannot fulfill the requested repartitioning, such
475    /// as having too few batches to fulfill the `target_partitions` or if unable
476    /// to preserve output ordering.
477    fn repartition_preserving_order(
478        &self,
479        target_partitions: usize,
480        output_ordering: LexOrdering,
481    ) -> Result<Option<Vec<Vec<RecordBatch>>>> {
482        if !self.eq_properties().ordering_satisfy(output_ordering)? {
483            Ok(None)
484        } else {
485            let total_num_batches =
486                self.partitions.iter().map(|b| b.len()).sum::<usize>();
487            if total_num_batches < target_partitions {
488                // no way to create the desired repartitioning
489                return Ok(None);
490            }
491
492            let cnt_to_repartition = target_partitions - self.partitions.len();
493
494            // Label the current partitions and their order.
495            // Such that when we later split up the partitions into smaller sizes, we are maintaining the order.
496            let to_repartition = self
497                .partitions
498                .iter()
499                .enumerate()
500                .map(|(idx, batches)| RePartition {
501                    idx: idx + (cnt_to_repartition * idx), // make space in ordering for split partitions
502                    row_count: batches.iter().map(|batch| batch.num_rows()).sum(),
503                    batches: batches.clone(),
504                })
505                .collect_vec();
506
507            // Put all of the partitions into a heap ordered by `RePartition::partial_cmp`, which sizes
508            // by count of rows.
509            let mut max_heap = BinaryHeap::with_capacity(target_partitions);
510            for rep in to_repartition {
511                max_heap.push(CompareByRowCount(rep));
512            }
513
514            // Split the largest partitions into smaller partitions. Maintaining the output
515            // order of the partitions & newly created partitions.
516            let mut cannot_split_further = Vec::with_capacity(target_partitions);
517            for _ in 0..cnt_to_repartition {
518                // triggers loop for the cnt_to_repartition. So if need another 4 partitions, it attempts to split 4 times.
519                loop {
520                    // Take the largest item off the heap, and attempt to split.
521                    let Some(to_split) = max_heap.pop() else {
522                        // Nothing left to attempt repartition. Break inner loop.
523                        break;
524                    };
525
526                    // Split the partition. The new partitions will be ordered with idx and idx+1.
527                    let mut new_partitions = to_split.into_inner().split();
528                    if new_partitions.len() > 1 {
529                        for new_partition in new_partitions {
530                            max_heap.push(CompareByRowCount(new_partition));
531                        }
532                        // Successful repartition. Break inner loop, and return to outer `cnt_to_repartition` loop.
533                        break;
534                    } else {
535                        cannot_split_further.push(new_partitions.remove(0));
536                    }
537                }
538            }
539            let mut partitions = max_heap
540                .drain()
541                .map(CompareByRowCount::into_inner)
542                .collect_vec();
543            partitions.extend(cannot_split_further);
544
545            // Finally, sort all partitions by the output ordering.
546            // This was the original ordering of the batches within the partition. We are maintaining this ordering.
547            partitions.sort_by_key(|p| p.idx);
548            let partitions = partitions.into_iter().map(|rep| rep.batches).collect_vec();
549
550            Ok(Some(partitions))
551        }
552    }
553
554    /// Repartition into evenly sized chunks (as much as possible without batch splitting),
555    /// disregarding any ordering.
556    ///
557    /// Current implementation uses a first-fit-decreasing bin packing, modified to enable
558    /// us to still return the desired count of `target_partitions`.
559    ///
560    /// Returns `Ok(None)` if cannot fulfill the requested repartitioning, such
561    /// as having too few batches to fulfill the `target_partitions`.
562    fn repartition_evenly_by_size(
563        &self,
564        target_partitions: usize,
565    ) -> Result<Option<Vec<Vec<RecordBatch>>>> {
566        // determine if we have enough total batches to fulfill request
567        let mut flatten_batches =
568            self.partitions.clone().into_iter().flatten().collect_vec();
569        if flatten_batches.len() < target_partitions {
570            return Ok(None);
571        }
572
573        // Take all flattened batches (all in 1 partititon/vec) and divide evenly into the desired number of `target_partitions`.
574        let total_num_rows = flatten_batches.iter().map(|b| b.num_rows()).sum::<usize>();
575        // sort by size, so we pack multiple smaller batches into the same partition
576        flatten_batches.sort_by_key(|b| std::cmp::Reverse(b.num_rows()));
577
578        // Divide.
579        let mut partitions =
580            vec![Vec::with_capacity(flatten_batches.len()); target_partitions];
581        let mut target_partition_size = total_num_rows.div_ceil(target_partitions);
582        let mut total_rows_seen = 0;
583        let mut curr_bin_row_count = 0;
584        let mut idx = 0;
585        for batch in flatten_batches {
586            let row_cnt = batch.num_rows();
587            idx = std::cmp::min(idx, target_partitions - 1);
588
589            partitions[idx].push(batch);
590            curr_bin_row_count += row_cnt;
591            total_rows_seen += row_cnt;
592
593            if curr_bin_row_count >= target_partition_size {
594                idx += 1;
595                curr_bin_row_count = 0;
596
597                // update target_partition_size, to handle very lopsided batch distributions
598                // while still returning the count of `target_partitions`
599                if total_rows_seen < total_num_rows {
600                    target_partition_size = (total_num_rows - total_rows_seen)
601                        .div_ceil(target_partitions - idx);
602                }
603            }
604        }
605
606        Ok(Some(partitions))
607    }
608}
609
610/// For use in repartitioning, track the total size and original partition index.
611///
612/// Do not implement clone, in order to avoid unnecessary copying during repartitioning.
613struct RePartition {
614    /// Original output ordering for the partition.
615    idx: usize,
616    /// Total size of the partition, for use in heap ordering
617    /// (a.k.a. splitting up the largest partitions).
618    row_count: usize,
619    /// A partition containing record batches.
620    batches: Vec<RecordBatch>,
621}
622
623impl RePartition {
624    /// Split [`RePartition`] into 2 pieces, consuming self.
625    ///
626    /// Returns only 1 partition if cannot be split further.
627    fn split(self) -> Vec<Self> {
628        if self.batches.len() == 1 {
629            return vec![self];
630        }
631
632        let new_0 = RePartition {
633            idx: self.idx, // output ordering
634            row_count: 0,
635            batches: vec![],
636        };
637        let new_1 = RePartition {
638            idx: self.idx + 1, // output ordering +1
639            row_count: 0,
640            batches: vec![],
641        };
642        let split_pt = self.row_count / 2;
643
644        let [new_0, new_1] = self.batches.into_iter().fold(
645            [new_0, new_1],
646            |[mut new0, mut new1], batch| {
647                if new0.row_count < split_pt {
648                    new0.add_batch(batch);
649                } else {
650                    new1.add_batch(batch);
651                }
652                [new0, new1]
653            },
654        );
655        vec![new_0, new_1]
656    }
657
658    fn add_batch(&mut self, batch: RecordBatch) {
659        self.row_count += batch.num_rows();
660        self.batches.push(batch);
661    }
662}
663
664impl fmt::Display for RePartition {
665    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
666        write!(
667            f,
668            "{}rows-in-{}batches@{}",
669            self.row_count,
670            self.batches.len(),
671            self.idx
672        )
673    }
674}
675
676struct CompareByRowCount(RePartition);
677impl CompareByRowCount {
678    fn into_inner(self) -> RePartition {
679        self.0
680    }
681}
682impl Ord for CompareByRowCount {
683    fn cmp(&self, other: &Self) -> Ordering {
684        self.0.row_count.cmp(&other.0.row_count)
685    }
686}
687impl PartialOrd for CompareByRowCount {
688    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
689        Some(self.cmp(other))
690    }
691}
692impl PartialEq for CompareByRowCount {
693    fn eq(&self, other: &Self) -> bool {
694        // PartialEq must be consistent with PartialOrd
695        self.cmp(other) == Ordering::Equal
696    }
697}
698impl Eq for CompareByRowCount {}
699impl Deref for CompareByRowCount {
700    type Target = RePartition;
701    fn deref(&self) -> &Self::Target {
702        &self.0
703    }
704}
705
706/// Type alias for partition data
707pub type PartitionData = Arc<RwLock<Vec<RecordBatch>>>;
708
709/// Implements for writing to a [`MemTable`]
710///
711/// [`MemTable`]: <https://docs.rs/datafusion/latest/datafusion/datasource/memory/struct.MemTable.html>
712pub struct MemSink {
713    /// Target locations for writing data
714    batches: Vec<PartitionData>,
715    schema: SchemaRef,
716}
717
718impl Debug for MemSink {
719    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
720        f.debug_struct("MemSink")
721            .field("num_partitions", &self.batches.len())
722            .finish()
723    }
724}
725
726impl DisplayAs for MemSink {
727    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
728        match t {
729            DisplayFormatType::Default | DisplayFormatType::Verbose => {
730                let partition_count = self.batches.len();
731                write!(f, "MemoryTable (partitions={partition_count})")
732            }
733            DisplayFormatType::TreeRender => {
734                // TODO: collect info
735                write!(f, "")
736            }
737        }
738    }
739}
740
741impl MemSink {
742    /// Creates a new [`MemSink`].
743    ///
744    /// The caller is responsible for ensuring that there is at least one partition to insert into.
745    pub fn try_new(batches: Vec<PartitionData>, schema: SchemaRef) -> Result<Self> {
746        if batches.is_empty() {
747            return plan_err!("Cannot insert into MemTable with zero partitions");
748        }
749        Ok(Self { batches, schema })
750    }
751}
752
753#[async_trait]
754impl DataSink for MemSink {
755    fn schema(&self) -> &SchemaRef {
756        &self.schema
757    }
758
759    async fn write_all(
760        &self,
761        mut data: SendableRecordBatchStream,
762        _context: &Arc<TaskContext>,
763    ) -> Result<u64> {
764        let num_partitions = self.batches.len();
765
766        // buffer up the data round robin style into num_partitions
767
768        let mut new_batches = vec![vec![]; num_partitions];
769        let mut i = 0;
770        let mut row_count = 0;
771        while let Some(batch) = data.next().await.transpose()? {
772            row_count += batch.num_rows();
773            new_batches[i].push(batch);
774            i = (i + 1) % num_partitions;
775        }
776
777        // write the outputs into the batches
778        for (target, mut batches) in self.batches.iter().zip(new_batches) {
779            // Append all the new batches in one go to minimize locking overhead
780            target.write().await.append(&mut batches);
781        }
782
783        Ok(row_count as u64)
784    }
785}
786
787#[cfg(test)]
788mod memory_source_tests {
789    use std::sync::Arc;
790
791    use crate::memory::MemorySourceConfig;
792    use crate::source::DataSourceExec;
793
794    use arrow::compute::SortOptions;
795    use arrow::datatypes::{DataType, Field, Schema};
796    use datafusion_common::Result;
797    use datafusion_physical_expr::expressions::col;
798    use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
799    use datafusion_physical_plan::ExecutionPlan;
800
801    #[test]
802    fn test_memory_order_eq() -> Result<()> {
803        let schema = Arc::new(Schema::new(vec![
804            Field::new("a", DataType::Int64, false),
805            Field::new("b", DataType::Int64, false),
806            Field::new("c", DataType::Int64, false),
807        ]));
808        let sort1: LexOrdering = [
809            PhysicalSortExpr {
810                expr: col("a", &schema)?,
811                options: SortOptions::default(),
812            },
813            PhysicalSortExpr {
814                expr: col("b", &schema)?,
815                options: SortOptions::default(),
816            },
817        ]
818        .into();
819        let sort2: LexOrdering = [PhysicalSortExpr {
820            expr: col("c", &schema)?,
821            options: SortOptions::default(),
822        }]
823        .into();
824        let mut expected_output_order = sort1.clone();
825        expected_output_order.extend(sort2.clone());
826
827        let sort_information = vec![sort1.clone(), sort2.clone()];
828        let mem_exec = DataSourceExec::from_data_source(
829            MemorySourceConfig::try_new(&[vec![]], schema, None)?
830                .try_with_sort_information(sort_information)?,
831        );
832
833        assert_eq!(
834            mem_exec.properties().output_ordering().unwrap(),
835            &expected_output_order
836        );
837        let eq_properties = mem_exec.properties().equivalence_properties();
838        assert!(eq_properties.oeq_class().contains(&sort1));
839        assert!(eq_properties.oeq_class().contains(&sort2));
840        Ok(())
841    }
842}
843
844#[cfg(test)]
845mod tests {
846    use super::*;
847    use crate::test_util::col;
848    use crate::tests::{aggr_test_schema, make_partition};
849
850    use arrow::array::{ArrayRef, Int32Array, Int64Array, StringArray};
851    use arrow::datatypes::{DataType, Field};
852    use datafusion_common::assert_batches_eq;
853    use datafusion_common::stats::{ColumnStatistics, Precision};
854    use datafusion_physical_expr::PhysicalSortExpr;
855    use datafusion_physical_plan::expressions::lit;
856
857    use datafusion_physical_plan::ExecutionPlan;
858
859    #[tokio::test]
860    async fn exec_with_limit() -> Result<()> {
861        let task_ctx = Arc::new(TaskContext::default());
862        let batch = make_partition(7);
863        let schema = batch.schema();
864        let batches = vec![batch.clone(), batch];
865
866        let exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap();
867        assert_eq!(exec.fetch(), None);
868
869        let exec = exec.with_fetch(Some(4)).unwrap();
870        assert_eq!(exec.fetch(), Some(4));
871
872        let mut it = exec.execute(0, task_ctx)?;
873        let mut results = vec![];
874        while let Some(batch) = it.next().await {
875            results.push(batch?);
876        }
877
878        let expected = [
879            "+---+", "| i |", "+---+", "| 0 |", "| 1 |", "| 2 |", "| 3 |", "+---+",
880        ];
881        assert_batches_eq!(expected, &results);
882        Ok(())
883    }
884
885    /// Test that `try_swapping_with_projection` preserves the `fetch` limit.
886    /// Regression test for <https://github.com/apache/datafusion/issues/21176>
887    #[test]
888    fn try_swapping_with_projection_preserves_fetch() {
889        use datafusion_physical_expr::projection::ProjectionExprs;
890
891        let schema = Arc::new(Schema::new(vec![
892            Field::new("a", DataType::Int32, false),
893            Field::new("b", DataType::Utf8, false),
894            Field::new("c", DataType::Int64, false),
895        ]));
896        let partitions: Vec<Vec<RecordBatch>> = vec![vec![batch(10)]];
897        let source = MemorySourceConfig::try_new(&partitions, schema.clone(), None)
898            .unwrap()
899            .with_limit(Some(5));
900
901        assert_eq!(source.fetch, Some(5));
902
903        // Create a projection that reorders columns: [c, a] (indices 2, 0)
904        let projection = ProjectionExprs::from_indices(&[2, 0], &schema);
905        let swapped = source
906            .try_swapping_with_projection(&projection)
907            .unwrap()
908            .unwrap();
909        let new_source = swapped.downcast_ref::<MemorySourceConfig>().unwrap();
910
911        assert_eq!(
912            new_source.fetch,
913            Some(5),
914            "fetch limit must be preserved after projection pushdown"
915        );
916    }
917
918    #[tokio::test]
919    async fn values_empty_case() -> Result<()> {
920        let schema = aggr_test_schema();
921        let empty = MemorySourceConfig::try_new_as_values(schema, vec![]);
922        assert!(empty.is_err());
923        Ok(())
924    }
925
926    #[test]
927    fn new_exec_with_batches() {
928        let batch = make_partition(7);
929        let schema = batch.schema();
930        let batches = vec![batch.clone(), batch];
931        let _exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap();
932    }
933
934    #[test]
935    fn new_exec_with_batches_empty() {
936        let batch = make_partition(7);
937        let schema = batch.schema();
938        let _ = MemorySourceConfig::try_new_from_batches(schema, Vec::new()).unwrap_err();
939    }
940
941    #[test]
942    fn new_exec_with_batches_invalid_schema() {
943        let batch = make_partition(7);
944        let batches = vec![batch.clone(), batch];
945
946        let invalid_schema = Arc::new(Schema::new(vec![
947            Field::new("col0", DataType::UInt32, false),
948            Field::new("col1", DataType::Utf8, false),
949        ]));
950        let _ = MemorySourceConfig::try_new_from_batches(invalid_schema, batches)
951            .unwrap_err();
952    }
953
954    // Test issue: https://github.com/apache/datafusion/issues/8763
955    #[test]
956    fn new_exec_with_non_nullable_schema() {
957        let schema = Arc::new(Schema::new(vec![Field::new(
958            "col0",
959            DataType::UInt32,
960            false,
961        )]));
962        let _ = MemorySourceConfig::try_new_as_values(
963            Arc::clone(&schema),
964            vec![vec![lit(1u32)]],
965        )
966        .unwrap();
967        // Test that a null value is rejected
968        let _ = MemorySourceConfig::try_new_as_values(
969            schema,
970            vec![vec![lit(ScalarValue::UInt32(None))]],
971        )
972        .unwrap_err();
973    }
974
975    #[test]
976    fn values_stats_with_nulls_only() -> Result<()> {
977        let data = vec![
978            vec![lit(ScalarValue::Null)],
979            vec![lit(ScalarValue::Null)],
980            vec![lit(ScalarValue::Null)],
981        ];
982        let rows = data.len();
983        let schema =
984            Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)]));
985        let values = MemorySourceConfig::try_new_as_values(schema, data)?;
986
987        assert_eq!(
988            *values.partition_statistics(None)?,
989            Statistics {
990                num_rows: Precision::Exact(rows),
991                total_byte_size: Precision::Exact(8), // not important
992                column_statistics: vec![ColumnStatistics {
993                    null_count: Precision::Exact(rows), // there are only nulls
994                    distinct_count: Precision::Absent,
995                    max_value: Precision::Absent,
996                    min_value: Precision::Absent,
997                    sum_value: Precision::Absent,
998                    byte_size: Precision::Absent,
999                },],
1000            }
1001        );
1002
1003        Ok(())
1004    }
1005
1006    fn batch(row_size: usize) -> RecordBatch {
1007        let a: ArrayRef = Arc::new(Int32Array::from(vec![1; row_size]));
1008        let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("foo"); row_size]));
1009        let c: ArrayRef = Arc::new(Int64Array::from_iter(vec![1; row_size]));
1010        RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
1011    }
1012
1013    fn schema() -> SchemaRef {
1014        batch(1).schema()
1015    }
1016
1017    fn memorysrcconfig_no_partitions(
1018        sort_information: Vec<LexOrdering>,
1019    ) -> Result<MemorySourceConfig> {
1020        let partitions = vec![];
1021        MemorySourceConfig::try_new(&partitions, schema(), None)?
1022            .try_with_sort_information(sort_information)
1023    }
1024
1025    fn memorysrcconfig_1_partition_1_batch(
1026        sort_information: Vec<LexOrdering>,
1027    ) -> Result<MemorySourceConfig> {
1028        let partitions = vec![vec![batch(100)]];
1029        MemorySourceConfig::try_new(&partitions, schema(), None)?
1030            .try_with_sort_information(sort_information)
1031    }
1032
1033    fn memorysrcconfig_3_partitions_1_batch_each(
1034        sort_information: Vec<LexOrdering>,
1035    ) -> Result<MemorySourceConfig> {
1036        let partitions = vec![vec![batch(100)], vec![batch(100)], vec![batch(100)]];
1037        MemorySourceConfig::try_new(&partitions, schema(), None)?
1038            .try_with_sort_information(sort_information)
1039    }
1040
1041    fn memorysrcconfig_3_partitions_with_2_batches_each(
1042        sort_information: Vec<LexOrdering>,
1043    ) -> Result<MemorySourceConfig> {
1044        let partitions = vec![
1045            vec![batch(100), batch(100)],
1046            vec![batch(100), batch(100)],
1047            vec![batch(100), batch(100)],
1048        ];
1049        MemorySourceConfig::try_new(&partitions, schema(), None)?
1050            .try_with_sort_information(sort_information)
1051    }
1052
1053    /// Batches of different sizes, with batches ordered by size (100_000, 10_000, 100, 1)
1054    /// in the Memtable partition (a.k.a. vector of batches).
1055    fn memorysrcconfig_1_partition_with_different_sized_batches(
1056        sort_information: Vec<LexOrdering>,
1057    ) -> Result<MemorySourceConfig> {
1058        let partitions = vec![vec![batch(100_000), batch(10_000), batch(100), batch(1)]];
1059        MemorySourceConfig::try_new(&partitions, schema(), None)?
1060            .try_with_sort_information(sort_information)
1061    }
1062
1063    /// Same as [`memorysrcconfig_1_partition_with_different_sized_batches`],
1064    /// but the batches are ordered differently (not by size)
1065    /// in the Memtable partition (a.k.a. vector of batches).
1066    fn memorysrcconfig_1_partition_with_ordering_not_matching_size(
1067        sort_information: Vec<LexOrdering>,
1068    ) -> Result<MemorySourceConfig> {
1069        let partitions = vec![vec![batch(100_000), batch(1), batch(100), batch(10_000)]];
1070        MemorySourceConfig::try_new(&partitions, schema(), None)?
1071            .try_with_sort_information(sort_information)
1072    }
1073
1074    fn memorysrcconfig_2_partition_with_different_sized_batches(
1075        sort_information: Vec<LexOrdering>,
1076    ) -> Result<MemorySourceConfig> {
1077        let partitions = vec![
1078            vec![batch(100_000), batch(10_000), batch(1_000)],
1079            vec![batch(2_000), batch(20)],
1080        ];
1081        MemorySourceConfig::try_new(&partitions, schema(), None)?
1082            .try_with_sort_information(sort_information)
1083    }
1084
1085    fn memorysrcconfig_2_partition_with_extreme_sized_batches(
1086        sort_information: Vec<LexOrdering>,
1087    ) -> Result<MemorySourceConfig> {
1088        let partitions = vec![
1089            vec![
1090                batch(100_000),
1091                batch(1),
1092                batch(1),
1093                batch(1),
1094                batch(1),
1095                batch(0),
1096            ],
1097            vec![batch(1), batch(1), batch(1), batch(1), batch(0), batch(100)],
1098        ];
1099        MemorySourceConfig::try_new(&partitions, schema(), None)?
1100            .try_with_sort_information(sort_information)
1101    }
1102
1103    /// Assert that we get the expected count of partitions after repartitioning.
1104    ///
1105    /// If None, then we expected the [`DataSource::repartitioned`] to return None.
1106    fn assert_partitioning(
1107        partitioned_datasrc: Option<Arc<dyn DataSource>>,
1108        partition_cnt: Option<usize>,
1109    ) {
1110        let should_exist = if let Some(partition_cnt) = partition_cnt {
1111            format!("new datasource should exist and have {partition_cnt:?} partitions")
1112        } else {
1113            "new datasource should not exist".into()
1114        };
1115
1116        let actual = partitioned_datasrc
1117            .map(|datasrc| datasrc.output_partitioning().partition_count());
1118        assert_eq!(
1119            actual, partition_cnt,
1120            "partitioned datasrc does not match expected, we expected {should_exist}, instead found {actual:?}"
1121        );
1122    }
1123
1124    fn run_all_test_scenarios(
1125        output_ordering: Option<LexOrdering>,
1126        sort_information_on_config: Vec<LexOrdering>,
1127    ) -> Result<()> {
1128        let not_used = usize::MAX;
1129
1130        // src has no partitions
1131        let mem_src_config =
1132            memorysrcconfig_no_partitions(sort_information_on_config.clone())?;
1133        let partitioned_datasrc =
1134            mem_src_config.repartitioned(1, not_used, output_ordering.clone())?;
1135        assert_partitioning(partitioned_datasrc, None);
1136
1137        // src has partitions == target partitions (=1)
1138        let target_partitions = 1;
1139        let mem_src_config =
1140            memorysrcconfig_1_partition_1_batch(sort_information_on_config.clone())?;
1141        let partitioned_datasrc = mem_src_config.repartitioned(
1142            target_partitions,
1143            not_used,
1144            output_ordering.clone(),
1145        )?;
1146        assert_partitioning(partitioned_datasrc, None);
1147
1148        // src has partitions == target partitions (=3)
1149        let target_partitions = 3;
1150        let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
1151            sort_information_on_config.clone(),
1152        )?;
1153        let partitioned_datasrc = mem_src_config.repartitioned(
1154            target_partitions,
1155            not_used,
1156            output_ordering.clone(),
1157        )?;
1158        assert_partitioning(partitioned_datasrc, None);
1159
1160        // src has partitions > target partitions, but we don't merge them
1161        let target_partitions = 2;
1162        let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
1163            sort_information_on_config.clone(),
1164        )?;
1165        let partitioned_datasrc = mem_src_config.repartitioned(
1166            target_partitions,
1167            not_used,
1168            output_ordering.clone(),
1169        )?;
1170        assert_partitioning(partitioned_datasrc, None);
1171
1172        // src has partitions < target partitions, but not enough batches (per partition) to split into more partitions
1173        let target_partitions = 4;
1174        let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
1175            sort_information_on_config.clone(),
1176        )?;
1177        let partitioned_datasrc = mem_src_config.repartitioned(
1178            target_partitions,
1179            not_used,
1180            output_ordering.clone(),
1181        )?;
1182        assert_partitioning(partitioned_datasrc, None);
1183
1184        // src has partitions < target partitions, and can split to sufficient amount
1185        // has 6 batches across 3 partitions. Will need to split 2 of it's partitions.
1186        let target_partitions = 5;
1187        let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
1188            sort_information_on_config.clone(),
1189        )?;
1190        let partitioned_datasrc = mem_src_config.repartitioned(
1191            target_partitions,
1192            not_used,
1193            output_ordering.clone(),
1194        )?;
1195        assert_partitioning(partitioned_datasrc, Some(5));
1196
1197        // src has partitions < target partitions, and can split to sufficient amount
1198        // has 6 batches across 3 partitions. Will need to split all of it's partitions.
1199        let target_partitions = 6;
1200        let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
1201            sort_information_on_config.clone(),
1202        )?;
1203        let partitioned_datasrc = mem_src_config.repartitioned(
1204            target_partitions,
1205            not_used,
1206            output_ordering.clone(),
1207        )?;
1208        assert_partitioning(partitioned_datasrc, Some(6));
1209
1210        // src has partitions < target partitions, but not enough total batches to fulfill the split (desired target_partitions)
1211        let target_partitions = 3 * 2 + 1;
1212        let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
1213            sort_information_on_config.clone(),
1214        )?;
1215        let partitioned_datasrc = mem_src_config.repartitioned(
1216            target_partitions,
1217            not_used,
1218            output_ordering.clone(),
1219        )?;
1220        assert_partitioning(partitioned_datasrc, None);
1221
1222        // src has 1 partition with many batches of lopsided sizes
1223        // make sure it handles the split properly
1224        let target_partitions = 2;
1225        let mem_src_config = memorysrcconfig_1_partition_with_different_sized_batches(
1226            sort_information_on_config,
1227        )?;
1228        let partitioned_datasrc = mem_src_config.clone().repartitioned(
1229            target_partitions,
1230            not_used,
1231            output_ordering,
1232        )?;
1233        assert_partitioning(partitioned_datasrc.clone(), Some(2));
1234        // Starting = batch(100_000), batch(10_000), batch(100), batch(1).
1235        // It should have split as p1=batch(100_000), p2=[batch(10_000), batch(100), batch(1)]
1236        let partitioned_datasrc = partitioned_datasrc.unwrap();
1237        let Some(mem_src_config) =
1238            partitioned_datasrc.downcast_ref::<MemorySourceConfig>()
1239        else {
1240            unreachable!()
1241        };
1242        let repartitioned_raw_batches = mem_src_config.partitions.clone();
1243        assert_eq!(repartitioned_raw_batches.len(), 2);
1244        let [ref p1, ref p2] = repartitioned_raw_batches[..] else {
1245            unreachable!()
1246        };
1247        // p1=batch(100_000)
1248        assert_eq!(p1.len(), 1);
1249        assert_eq!(p1[0].num_rows(), 100_000);
1250        // p2=[batch(10_000), batch(100), batch(1)]
1251        assert_eq!(p2.len(), 3);
1252        assert_eq!(p2[0].num_rows(), 10_000);
1253        assert_eq!(p2[1].num_rows(), 100);
1254        assert_eq!(p2[2].num_rows(), 1);
1255
1256        Ok(())
1257    }
1258
1259    #[test]
1260    fn test_repartition_no_sort_information_no_output_ordering() -> Result<()> {
1261        let no_sort = vec![];
1262        let no_output_ordering = None;
1263
1264        // Test: common set of functionality
1265        run_all_test_scenarios(no_output_ordering.clone(), no_sort.clone())?;
1266
1267        // Test: how no-sort-order divides differently.
1268        //    * does not preserve separate partitions (with own internal ordering) on even split,
1269        //    * nor does it preserve ordering (re-orders batch(2_000) vs batch(1_000)).
1270        let target_partitions = 3;
1271        let mem_src_config =
1272            memorysrcconfig_2_partition_with_different_sized_batches(no_sort)?;
1273        let partitioned_datasrc = mem_src_config.clone().repartitioned(
1274            target_partitions,
1275            usize::MAX,
1276            no_output_ordering,
1277        )?;
1278        assert_partitioning(partitioned_datasrc.clone(), Some(3));
1279        // Starting = batch(100_000), batch(10_000), batch(1_000), batch(2_000), batch(20)
1280        // It should have split as p1=batch(100_000), p2=batch(10_000),  p3=rest(mixed across original partitions)
1281        let repartitioned_raw_batches = mem_src_config
1282            .repartition_evenly_by_size(target_partitions)?
1283            .unwrap();
1284        assert_eq!(repartitioned_raw_batches.len(), 3);
1285        let [ref p1, ref p2, ref p3] = repartitioned_raw_batches[..] else {
1286            unreachable!()
1287        };
1288        // p1=batch(100_000)
1289        assert_eq!(p1.len(), 1);
1290        assert_eq!(p1[0].num_rows(), 100_000);
1291        // p2=batch(10_000)
1292        assert_eq!(p2.len(), 1);
1293        assert_eq!(p2[0].num_rows(), 10_000);
1294        // p3= batch(2_000), batch(1_000), batch(20)
1295        assert_eq!(p3.len(), 3);
1296        assert_eq!(p3[0].num_rows(), 2_000);
1297        assert_eq!(p3[1].num_rows(), 1_000);
1298        assert_eq!(p3[2].num_rows(), 20);
1299
1300        Ok(())
1301    }
1302
1303    #[test]
1304    fn test_repartition_no_sort_information_no_output_ordering_lopsized_batches()
1305    -> Result<()> {
1306        let no_sort = vec![];
1307        let no_output_ordering = None;
1308
1309        // Test: case has two input partitions:
1310        //     b(100_000), b(1), b(1), b(1), b(1), b(0)
1311        //     b(1), b(1), b(1), b(1), b(0), b(100)
1312        //
1313        // We want an output with target_partitions=5, which means the ideal division is:
1314        //     b(100_000)
1315        //     b(100)
1316        //     b(1), b(1), b(1)
1317        //     b(1), b(1), b(1)
1318        //     b(1), b(1), b(0)
1319        let target_partitions = 5;
1320        let mem_src_config =
1321            memorysrcconfig_2_partition_with_extreme_sized_batches(no_sort)?;
1322        let partitioned_datasrc = mem_src_config.clone().repartitioned(
1323            target_partitions,
1324            usize::MAX,
1325            no_output_ordering,
1326        )?;
1327        assert_partitioning(partitioned_datasrc.clone(), Some(5));
1328        // Starting partition 1 = batch(100_000), batch(1), batch(1), batch(1), batch(1), batch(0)
1329        // Starting partition 1 = batch(1), batch(1), batch(1), batch(1), batch(0), batch(100)
1330        // It should have split as p1=batch(100_000), p2=batch(100), p3=[batch(1),batch(1)], p4=[batch(1),batch(1)], p5=[batch(1),batch(1),batch(0),batch(0)]
1331        let repartitioned_raw_batches = mem_src_config
1332            .repartition_evenly_by_size(target_partitions)?
1333            .unwrap();
1334        assert_eq!(repartitioned_raw_batches.len(), 5);
1335        let [ref p1, ref p2, ref p3, ref p4, ref p5] = repartitioned_raw_batches[..]
1336        else {
1337            unreachable!()
1338        };
1339        // p1=batch(100_000)
1340        assert_eq!(p1.len(), 1);
1341        assert_eq!(p1[0].num_rows(), 100_000);
1342        // p2=batch(100)
1343        assert_eq!(p2.len(), 1);
1344        assert_eq!(p2[0].num_rows(), 100);
1345        // p3=[batch(1),batch(1),batch(1)]
1346        assert_eq!(p3.len(), 3);
1347        assert_eq!(p3[0].num_rows(), 1);
1348        assert_eq!(p3[1].num_rows(), 1);
1349        assert_eq!(p3[2].num_rows(), 1);
1350        // p4=[batch(1),batch(1),batch(1)]
1351        assert_eq!(p4.len(), 3);
1352        assert_eq!(p4[0].num_rows(), 1);
1353        assert_eq!(p4[1].num_rows(), 1);
1354        assert_eq!(p4[2].num_rows(), 1);
1355        // p5=[batch(1),batch(1),batch(0),batch(0)]
1356        assert_eq!(p5.len(), 4);
1357        assert_eq!(p5[0].num_rows(), 1);
1358        assert_eq!(p5[1].num_rows(), 1);
1359        assert_eq!(p5[2].num_rows(), 0);
1360        assert_eq!(p5[3].num_rows(), 0);
1361
1362        Ok(())
1363    }
1364
1365    #[test]
1366    fn test_repartition_with_sort_information() -> Result<()> {
1367        let schema = schema();
1368        let sort_key: LexOrdering =
1369            [PhysicalSortExpr::new_default(col("c", &schema)?)].into();
1370        let has_sort = vec![sort_key.clone()];
1371        let output_ordering = Some(sort_key);
1372
1373        // Test: common set of functionality
1374        run_all_test_scenarios(output_ordering.clone(), has_sort.clone())?;
1375
1376        // Test: DOES preserve separate partitions (with own internal ordering)
1377        let target_partitions = 3;
1378        let mem_src_config =
1379            memorysrcconfig_2_partition_with_different_sized_batches(has_sort)?;
1380        let partitioned_datasrc = mem_src_config.clone().repartitioned(
1381            target_partitions,
1382            usize::MAX,
1383            output_ordering.clone(),
1384        )?;
1385        assert_partitioning(partitioned_datasrc.clone(), Some(3));
1386        // Starting = batch(100_000), batch(10_000), batch(1_000), batch(2_000), batch(20)
1387        // It should have split as p1=batch(100_000), p2=[batch(10_000),batch(1_000)],  p3=<other_partition>
1388        let Some(output_ord) = output_ordering else {
1389            unreachable!()
1390        };
1391        let repartitioned_raw_batches = mem_src_config
1392            .repartition_preserving_order(target_partitions, output_ord)?
1393            .unwrap();
1394        assert_eq!(repartitioned_raw_batches.len(), 3);
1395        let [ref p1, ref p2, ref p3] = repartitioned_raw_batches[..] else {
1396            unreachable!()
1397        };
1398        // p1=batch(100_000)
1399        assert_eq!(p1.len(), 1);
1400        assert_eq!(p1[0].num_rows(), 100_000);
1401        // p2=[batch(10_000),batch(1_000)]
1402        assert_eq!(p2.len(), 2);
1403        assert_eq!(p2[0].num_rows(), 10_000);
1404        assert_eq!(p2[1].num_rows(), 1_000);
1405        // p3=batch(2_000), batch(20)
1406        assert_eq!(p3.len(), 2);
1407        assert_eq!(p3[0].num_rows(), 2_000);
1408        assert_eq!(p3[1].num_rows(), 20);
1409
1410        Ok(())
1411    }
1412
1413    #[test]
1414    fn test_repartition_with_batch_ordering_not_matching_sizing() -> Result<()> {
1415        let schema = schema();
1416        let sort_key: LexOrdering =
1417            [PhysicalSortExpr::new_default(col("c", &schema)?)].into();
1418        let has_sort = vec![sort_key.clone()];
1419        let output_ordering = Some(sort_key);
1420
1421        // src has 1 partition with many batches of lopsided sizes
1422        // note that the input vector of batches are not ordered by decreasing size
1423        let target_partitions = 2;
1424        let mem_src_config =
1425            memorysrcconfig_1_partition_with_ordering_not_matching_size(has_sort)?;
1426        let partitioned_datasrc = mem_src_config.clone().repartitioned(
1427            target_partitions,
1428            usize::MAX,
1429            output_ordering,
1430        )?;
1431        assert_partitioning(partitioned_datasrc.clone(), Some(2));
1432        // Starting = batch(100_000), batch(1), batch(100), batch(10_000).
1433        // It should have split as p1=batch(100_000), p2=[batch(1), batch(100), batch(10_000)]
1434        let partitioned_datasrc = partitioned_datasrc.unwrap();
1435        let Some(mem_src_config) =
1436            partitioned_datasrc.downcast_ref::<MemorySourceConfig>()
1437        else {
1438            unreachable!()
1439        };
1440        let repartitioned_raw_batches = mem_src_config.partitions.clone();
1441        assert_eq!(repartitioned_raw_batches.len(), 2);
1442        let [ref p1, ref p2] = repartitioned_raw_batches[..] else {
1443            unreachable!()
1444        };
1445        // p1=batch(100_000)
1446        assert_eq!(p1.len(), 1);
1447        assert_eq!(p1[0].num_rows(), 100_000);
1448        // p2=[batch(1), batch(100), batch(10_000)] -- **this is preserving the partition order**
1449        assert_eq!(p2.len(), 3);
1450        assert_eq!(p2[0].num_rows(), 1);
1451        assert_eq!(p2[1].num_rows(), 100);
1452        assert_eq!(p2[2].num_rows(), 10_000);
1453
1454        Ok(())
1455    }
1456}