datafusion_datasource/
memory.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::cmp::Ordering;
20use std::collections::BinaryHeap;
21use std::fmt;
22use std::fmt::Debug;
23use std::sync::Arc;
24
25use crate::sink::DataSink;
26use crate::source::{DataSource, DataSourceExec};
27
28use arrow::array::{RecordBatch, RecordBatchOptions};
29use arrow::datatypes::{Schema, SchemaRef};
30use datafusion_common::{internal_err, plan_err, project_schema, Result, ScalarValue};
31use datafusion_execution::TaskContext;
32use datafusion_physical_expr::equivalence::{
33    OrderingEquivalenceClass, ProjectionMapping,
34};
35use datafusion_physical_expr::expressions::Column;
36use datafusion_physical_expr::utils::collect_columns;
37use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
38use datafusion_physical_plan::memory::MemoryStream;
39use datafusion_physical_plan::projection::{
40    all_alias_free_columns, new_projections_for_columns, ProjectionExec,
41};
42use datafusion_physical_plan::{
43    common, ColumnarValue, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
44    PhysicalExpr, SendableRecordBatchStream, Statistics,
45};
46
47use async_trait::async_trait;
48use datafusion_physical_plan::coop::cooperative;
49use datafusion_physical_plan::execution_plan::SchedulingType;
50use futures::StreamExt;
51use itertools::Itertools;
52use tokio::sync::RwLock;
53
54/// Data source configuration for reading in-memory batches of data
55#[derive(Clone, Debug)]
56pub struct MemorySourceConfig {
57    /// The partitions to query.
58    ///
59    /// Each partition is a `Vec<RecordBatch>`.
60    partitions: Vec<Vec<RecordBatch>>,
61    /// Schema representing the data before projection
62    schema: SchemaRef,
63    /// Schema representing the data after the optional projection is applied
64    projected_schema: SchemaRef,
65    /// Optional projection
66    projection: Option<Vec<usize>>,
67    /// Sort information: one or more equivalent orderings
68    sort_information: Vec<LexOrdering>,
69    /// if partition sizes should be displayed
70    show_sizes: bool,
71    /// The maximum number of records to read from this plan. If `None`,
72    /// all records after filtering are returned.
73    fetch: Option<usize>,
74}
75
76impl DataSource for MemorySourceConfig {
77    fn open(
78        &self,
79        partition: usize,
80        _context: Arc<TaskContext>,
81    ) -> Result<SendableRecordBatchStream> {
82        Ok(Box::pin(cooperative(
83            MemoryStream::try_new(
84                self.partitions[partition].clone(),
85                Arc::clone(&self.projected_schema),
86                self.projection.clone(),
87            )?
88            .with_fetch(self.fetch),
89        )))
90    }
91
92    fn as_any(&self) -> &dyn Any {
93        self
94    }
95
96    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
97        match t {
98            DisplayFormatType::Default | DisplayFormatType::Verbose => {
99                let partition_sizes: Vec<_> =
100                    self.partitions.iter().map(|b| b.len()).collect();
101
102                let output_ordering = self
103                    .sort_information
104                    .first()
105                    .map(|output_ordering| format!(", output_ordering={output_ordering}"))
106                    .unwrap_or_default();
107
108                let eq_properties = self.eq_properties();
109                let constraints = eq_properties.constraints();
110                let constraints = if constraints.is_empty() {
111                    String::new()
112                } else {
113                    format!(", {constraints}")
114                };
115
116                let limit = self
117                    .fetch
118                    .map_or(String::new(), |limit| format!(", fetch={limit}"));
119                if self.show_sizes {
120                    write!(
121                                f,
122                                "partitions={}, partition_sizes={partition_sizes:?}{limit}{output_ordering}{constraints}",
123                                partition_sizes.len(),
124                            )
125                } else {
126                    write!(
127                        f,
128                        "partitions={}{limit}{output_ordering}{constraints}",
129                        partition_sizes.len(),
130                    )
131                }
132            }
133            DisplayFormatType::TreeRender => {
134                let total_rows = self.partitions.iter().map(|b| b.len()).sum::<usize>();
135                let total_bytes: usize = self
136                    .partitions
137                    .iter()
138                    .flatten()
139                    .map(|batch| batch.get_array_memory_size())
140                    .sum();
141                writeln!(f, "format=memory")?;
142                writeln!(f, "rows={total_rows}")?;
143                writeln!(f, "bytes={total_bytes}")?;
144                Ok(())
145            }
146        }
147    }
148
149    /// If possible, redistribute batches across partitions according to their size.
150    ///
151    /// Returns `Ok(None)` if unable to repartition. Preserve output ordering if exists.
152    /// Refer to [`DataSource::repartitioned`] for further details.
153    fn repartitioned(
154        &self,
155        target_partitions: usize,
156        _repartition_file_min_size: usize,
157        output_ordering: Option<LexOrdering>,
158    ) -> Result<Option<Arc<dyn DataSource>>> {
159        if self.partitions.is_empty() || self.partitions.len() >= target_partitions
160        // if have no partitions, or already have more partitions than desired, do not repartition
161        {
162            return Ok(None);
163        }
164
165        let maybe_repartitioned = if let Some(output_ordering) = output_ordering {
166            self.repartition_preserving_order(target_partitions, output_ordering)?
167        } else {
168            self.repartition_evenly_by_size(target_partitions)?
169        };
170
171        if let Some(repartitioned) = maybe_repartitioned {
172            Ok(Some(Arc::new(Self::try_new(
173                &repartitioned,
174                self.original_schema(),
175                self.projection.clone(),
176            )?)))
177        } else {
178            Ok(None)
179        }
180    }
181
182    fn output_partitioning(&self) -> Partitioning {
183        Partitioning::UnknownPartitioning(self.partitions.len())
184    }
185
186    fn eq_properties(&self) -> EquivalenceProperties {
187        EquivalenceProperties::new_with_orderings(
188            Arc::clone(&self.projected_schema),
189            self.sort_information.clone(),
190        )
191    }
192
193    fn scheduling_type(&self) -> SchedulingType {
194        SchedulingType::Cooperative
195    }
196
197    fn statistics(&self) -> Result<Statistics> {
198        Ok(common::compute_record_batch_statistics(
199            &self.partitions,
200            &self.schema,
201            self.projection.clone(),
202        ))
203    }
204
205    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
206        let source = self.clone();
207        Some(Arc::new(source.with_limit(limit)))
208    }
209
210    fn fetch(&self) -> Option<usize> {
211        self.fetch
212    }
213
214    fn try_swapping_with_projection(
215        &self,
216        projection: &ProjectionExec,
217    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
218        // If there is any non-column or alias-carrier expression, Projection should not be removed.
219        // This process can be moved into MemoryExec, but it would be an overlap of their responsibility.
220        all_alias_free_columns(projection.expr())
221            .then(|| {
222                let all_projections = (0..self.schema.fields().len()).collect();
223                let new_projections = new_projections_for_columns(
224                    projection,
225                    self.projection().as_ref().unwrap_or(&all_projections),
226                );
227
228                MemorySourceConfig::try_new_exec(
229                    self.partitions(),
230                    self.original_schema(),
231                    Some(new_projections),
232                )
233                .map(|e| e as _)
234            })
235            .transpose()
236    }
237}
238
239impl MemorySourceConfig {
240    /// Create a new `MemorySourceConfig` for reading in-memory record batches
241    /// The provided `schema` should not have the projection applied.
242    pub fn try_new(
243        partitions: &[Vec<RecordBatch>],
244        schema: SchemaRef,
245        projection: Option<Vec<usize>>,
246    ) -> Result<Self> {
247        let projected_schema = project_schema(&schema, projection.as_ref())?;
248        Ok(Self {
249            partitions: partitions.to_vec(),
250            schema,
251            projected_schema,
252            projection,
253            sort_information: vec![],
254            show_sizes: true,
255            fetch: None,
256        })
257    }
258
259    /// Create a new `DataSourceExec` plan for reading in-memory record batches
260    /// The provided `schema` should not have the projection applied.
261    pub fn try_new_exec(
262        partitions: &[Vec<RecordBatch>],
263        schema: SchemaRef,
264        projection: Option<Vec<usize>>,
265    ) -> Result<Arc<DataSourceExec>> {
266        let source = Self::try_new(partitions, schema, projection)?;
267        Ok(DataSourceExec::from_data_source(source))
268    }
269
270    /// Create a new execution plan from a list of constant values (`ValuesExec`)
271    pub fn try_new_as_values(
272        schema: SchemaRef,
273        data: Vec<Vec<Arc<dyn PhysicalExpr>>>,
274    ) -> Result<Arc<DataSourceExec>> {
275        if data.is_empty() {
276            return plan_err!("Values list cannot be empty");
277        }
278
279        let n_row = data.len();
280        let n_col = schema.fields().len();
281
282        // We have this single row batch as a placeholder to satisfy evaluation argument
283        // and generate a single output row
284        let placeholder_schema = Arc::new(Schema::empty());
285        let placeholder_batch = RecordBatch::try_new_with_options(
286            Arc::clone(&placeholder_schema),
287            vec![],
288            &RecordBatchOptions::new().with_row_count(Some(1)),
289        )?;
290
291        // Evaluate each column
292        let arrays = (0..n_col)
293            .map(|j| {
294                (0..n_row)
295                    .map(|i| {
296                        let expr = &data[i][j];
297                        let result = expr.evaluate(&placeholder_batch)?;
298
299                        match result {
300                            ColumnarValue::Scalar(scalar) => Ok(scalar),
301                            ColumnarValue::Array(array) if array.len() == 1 => {
302                                ScalarValue::try_from_array(&array, 0)
303                            }
304                            ColumnarValue::Array(_) => {
305                                plan_err!("Cannot have array values in a values list")
306                            }
307                        }
308                    })
309                    .collect::<Result<Vec<_>>>()
310                    .and_then(ScalarValue::iter_to_array)
311            })
312            .collect::<Result<Vec<_>>>()?;
313
314        let batch = RecordBatch::try_new_with_options(
315            Arc::clone(&schema),
316            arrays,
317            &RecordBatchOptions::new().with_row_count(Some(n_row)),
318        )?;
319
320        let partitions = vec![batch];
321        Self::try_new_from_batches(Arc::clone(&schema), partitions)
322    }
323
324    /// Create a new plan using the provided schema and batches.
325    ///
326    /// Errors if any of the batches don't match the provided schema, or if no
327    /// batches are provided.
328    pub fn try_new_from_batches(
329        schema: SchemaRef,
330        batches: Vec<RecordBatch>,
331    ) -> Result<Arc<DataSourceExec>> {
332        if batches.is_empty() {
333            return plan_err!("Values list cannot be empty");
334        }
335
336        for batch in &batches {
337            let batch_schema = batch.schema();
338            if batch_schema != schema {
339                return plan_err!(
340                    "Batch has invalid schema. Expected: {}, got: {}",
341                    schema,
342                    batch_schema
343                );
344            }
345        }
346
347        let partitions = vec![batches];
348        let source = Self {
349            partitions,
350            schema: Arc::clone(&schema),
351            projected_schema: Arc::clone(&schema),
352            projection: None,
353            sort_information: vec![],
354            show_sizes: true,
355            fetch: None,
356        };
357        Ok(DataSourceExec::from_data_source(source))
358    }
359
360    /// Set the limit of the files
361    pub fn with_limit(mut self, limit: Option<usize>) -> Self {
362        self.fetch = limit;
363        self
364    }
365
366    /// Set `show_sizes` to determine whether to display partition sizes
367    pub fn with_show_sizes(mut self, show_sizes: bool) -> Self {
368        self.show_sizes = show_sizes;
369        self
370    }
371
372    /// Ref to partitions
373    pub fn partitions(&self) -> &[Vec<RecordBatch>] {
374        &self.partitions
375    }
376
377    /// Ref to projection
378    pub fn projection(&self) -> &Option<Vec<usize>> {
379        &self.projection
380    }
381
382    /// Show sizes
383    pub fn show_sizes(&self) -> bool {
384        self.show_sizes
385    }
386
387    /// Ref to sort information
388    pub fn sort_information(&self) -> &[LexOrdering] {
389        &self.sort_information
390    }
391
392    /// A memory table can be ordered by multiple expressions simultaneously.
393    /// [`EquivalenceProperties`] keeps track of expressions that describe the
394    /// global ordering of the schema. These columns are not necessarily same; e.g.
395    /// ```text
396    /// ┌-------┐
397    /// | a | b |
398    /// |---|---|
399    /// | 1 | 9 |
400    /// | 2 | 8 |
401    /// | 3 | 7 |
402    /// | 5 | 5 |
403    /// └---┴---┘
404    /// ```
405    /// where both `a ASC` and `b DESC` can describe the table ordering. With
406    /// [`EquivalenceProperties`], we can keep track of these equivalences
407    /// and treat `a ASC` and `b DESC` as the same ordering requirement.
408    ///
409    /// Note that if there is an internal projection, that projection will be
410    /// also applied to the given `sort_information`.
411    pub fn try_with_sort_information(
412        mut self,
413        mut sort_information: Vec<LexOrdering>,
414    ) -> Result<Self> {
415        // All sort expressions must refer to the original schema
416        let fields = self.schema.fields();
417        let ambiguous_column = sort_information
418            .iter()
419            .flat_map(|ordering| ordering.clone())
420            .flat_map(|expr| collect_columns(&expr.expr))
421            .find(|col| {
422                fields
423                    .get(col.index())
424                    .map(|field| field.name() != col.name())
425                    .unwrap_or(true)
426            });
427        if let Some(col) = ambiguous_column {
428            return internal_err!(
429                "Column {:?} is not found in the original schema of the MemorySourceConfig",
430                col
431            );
432        }
433
434        // If there is a projection on the source, we also need to project orderings
435        if let Some(projection) = &self.projection {
436            let base_schema = self.original_schema();
437            let proj_exprs = projection.iter().map(|idx| {
438                let name = base_schema.field(*idx).name();
439                (Arc::new(Column::new(name, *idx)) as _, name.to_string())
440            });
441            let projection_mapping =
442                ProjectionMapping::try_new(proj_exprs, &base_schema)?;
443            let base_eqp = EquivalenceProperties::new_with_orderings(
444                Arc::clone(&base_schema),
445                sort_information,
446            );
447            let proj_eqp =
448                base_eqp.project(&projection_mapping, Arc::clone(&self.projected_schema));
449            let oeq_class: OrderingEquivalenceClass = proj_eqp.into();
450            sort_information = oeq_class.into();
451        }
452
453        self.sort_information = sort_information;
454        Ok(self)
455    }
456
457    /// Arc clone of ref to original schema
458    pub fn original_schema(&self) -> SchemaRef {
459        Arc::clone(&self.schema)
460    }
461
462    /// Repartition while preserving order.
463    ///
464    /// Returns `Ok(None)` if cannot fulfill the requested repartitioning, such
465    /// as having too few batches to fulfill the `target_partitions` or if unable
466    /// to preserve output ordering.
467    fn repartition_preserving_order(
468        &self,
469        target_partitions: usize,
470        output_ordering: LexOrdering,
471    ) -> Result<Option<Vec<Vec<RecordBatch>>>> {
472        if !self.eq_properties().ordering_satisfy(output_ordering)? {
473            Ok(None)
474        } else {
475            let total_num_batches =
476                self.partitions.iter().map(|b| b.len()).sum::<usize>();
477            if total_num_batches < target_partitions {
478                // no way to create the desired repartitioning
479                return Ok(None);
480            }
481
482            let cnt_to_repartition = target_partitions - self.partitions.len();
483
484            // Label the current partitions and their order.
485            // Such that when we later split up the partitions into smaller sizes, we are maintaining the order.
486            let to_repartition = self
487                .partitions
488                .iter()
489                .enumerate()
490                .map(|(idx, batches)| RePartition {
491                    idx: idx + (cnt_to_repartition * idx), // make space in ordering for split partitions
492                    row_count: batches.iter().map(|batch| batch.num_rows()).sum(),
493                    batches: batches.clone(),
494                })
495                .collect_vec();
496
497            // Put all of the partitions into a heap ordered by `RePartition::partial_cmp`, which sizes
498            // by count of rows.
499            let mut max_heap = BinaryHeap::with_capacity(target_partitions);
500            for rep in to_repartition {
501                max_heap.push(rep);
502            }
503
504            // Split the largest partitions into smaller partitions. Maintaining the output
505            // order of the partitions & newly created partitions.
506            let mut cannot_split_further = Vec::with_capacity(target_partitions);
507            for _ in 0..cnt_to_repartition {
508                // triggers loop for the cnt_to_repartition. So if need another 4 partitions, it attempts to split 4 times.
509                loop {
510                    // Take the largest item off the heap, and attempt to split.
511                    let Some(to_split) = max_heap.pop() else {
512                        // Nothing left to attempt repartition. Break inner loop.
513                        break;
514                    };
515
516                    // Split the partition. The new partitions will be ordered with idx and idx+1.
517                    let mut new_partitions = to_split.split();
518                    if new_partitions.len() > 1 {
519                        for new_partition in new_partitions {
520                            max_heap.push(new_partition);
521                        }
522                        // Successful repartition. Break inner loop, and return to outer `cnt_to_repartition` loop.
523                        break;
524                    } else {
525                        cannot_split_further.push(new_partitions.remove(0));
526                    }
527                }
528            }
529            let mut partitions = max_heap.drain().collect_vec();
530            partitions.extend(cannot_split_further);
531
532            // Finally, sort all partitions by the output ordering.
533            // This was the original ordering of the batches within the partition. We are maintaining this ordering.
534            partitions.sort_by_key(|p| p.idx);
535            let partitions = partitions.into_iter().map(|rep| rep.batches).collect_vec();
536
537            Ok(Some(partitions))
538        }
539    }
540
541    /// Repartition into evenly sized chunks (as much as possible without batch splitting),
542    /// disregarding any ordering.
543    ///
544    /// Current implementation uses a first-fit-decreasing bin packing, modified to enable
545    /// us to still return the desired count of `target_partitions`.
546    ///
547    /// Returns `Ok(None)` if cannot fulfill the requested repartitioning, such
548    /// as having too few batches to fulfill the `target_partitions`.
549    fn repartition_evenly_by_size(
550        &self,
551        target_partitions: usize,
552    ) -> Result<Option<Vec<Vec<RecordBatch>>>> {
553        // determine if we have enough total batches to fulfill request
554        let mut flatten_batches =
555            self.partitions.clone().into_iter().flatten().collect_vec();
556        if flatten_batches.len() < target_partitions {
557            return Ok(None);
558        }
559
560        // Take all flattened batches (all in 1 partititon/vec) and divide evenly into the desired number of `target_partitions`.
561        let total_num_rows = flatten_batches.iter().map(|b| b.num_rows()).sum::<usize>();
562        // sort by size, so we pack multiple smaller batches into the same partition
563        flatten_batches.sort_by_key(|b| std::cmp::Reverse(b.num_rows()));
564
565        // Divide.
566        let mut partitions =
567            vec![Vec::with_capacity(flatten_batches.len()); target_partitions];
568        let mut target_partition_size = total_num_rows.div_ceil(target_partitions);
569        let mut total_rows_seen = 0;
570        let mut curr_bin_row_count = 0;
571        let mut idx = 0;
572        for batch in flatten_batches {
573            let row_cnt = batch.num_rows();
574            idx = std::cmp::min(idx, target_partitions - 1);
575
576            partitions[idx].push(batch);
577            curr_bin_row_count += row_cnt;
578            total_rows_seen += row_cnt;
579
580            if curr_bin_row_count >= target_partition_size {
581                idx += 1;
582                curr_bin_row_count = 0;
583
584                // update target_partition_size, to handle very lopsided batch distributions
585                // while still returning the count of `target_partitions`
586                if total_rows_seen < total_num_rows {
587                    target_partition_size = (total_num_rows - total_rows_seen)
588                        .div_ceil(target_partitions - idx);
589                }
590            }
591        }
592
593        Ok(Some(partitions))
594    }
595}
596
597/// For use in repartitioning, track the total size and original partition index.
598///
599/// Do not implement clone, in order to avoid unnecessary copying during repartitioning.
600struct RePartition {
601    /// Original output ordering for the partition.
602    idx: usize,
603    /// Total size of the partition, for use in heap ordering
604    /// (a.k.a. splitting up the largest partitions).
605    row_count: usize,
606    /// A partition containing record batches.
607    batches: Vec<RecordBatch>,
608}
609
610impl RePartition {
611    /// Split [`RePartition`] into 2 pieces, consuming self.
612    ///
613    /// Returns only 1 partition if cannot be split further.
614    fn split(self) -> Vec<Self> {
615        if self.batches.len() == 1 {
616            return vec![self];
617        }
618
619        let new_0 = RePartition {
620            idx: self.idx, // output ordering
621            row_count: 0,
622            batches: vec![],
623        };
624        let new_1 = RePartition {
625            idx: self.idx + 1, // output ordering +1
626            row_count: 0,
627            batches: vec![],
628        };
629        let split_pt = self.row_count / 2;
630
631        let [new_0, new_1] = self.batches.into_iter().fold(
632            [new_0, new_1],
633            |[mut new0, mut new1], batch| {
634                if new0.row_count < split_pt {
635                    new0.add_batch(batch);
636                } else {
637                    new1.add_batch(batch);
638                }
639                [new0, new1]
640            },
641        );
642        vec![new_0, new_1]
643    }
644
645    fn add_batch(&mut self, batch: RecordBatch) {
646        self.row_count += batch.num_rows();
647        self.batches.push(batch);
648    }
649}
650
651impl PartialOrd for RePartition {
652    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
653        Some(self.row_count.cmp(&other.row_count))
654    }
655}
656
657impl Ord for RePartition {
658    fn cmp(&self, other: &Self) -> Ordering {
659        self.row_count.cmp(&other.row_count)
660    }
661}
662
663impl PartialEq for RePartition {
664    fn eq(&self, other: &Self) -> bool {
665        self.row_count.eq(&other.row_count)
666    }
667}
668
669impl Eq for RePartition {}
670
671impl fmt::Display for RePartition {
672    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
673        write!(
674            f,
675            "{}rows-in-{}batches@{}",
676            self.row_count,
677            self.batches.len(),
678            self.idx
679        )
680    }
681}
682
683/// Type alias for partition data
684pub type PartitionData = Arc<RwLock<Vec<RecordBatch>>>;
685
686/// Implements for writing to a [`MemTable`]
687///
688/// [`MemTable`]: <https://docs.rs/datafusion/latest/datafusion/datasource/memory/struct.MemTable.html>
689pub struct MemSink {
690    /// Target locations for writing data
691    batches: Vec<PartitionData>,
692    schema: SchemaRef,
693}
694
695impl Debug for MemSink {
696    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
697        f.debug_struct("MemSink")
698            .field("num_partitions", &self.batches.len())
699            .finish()
700    }
701}
702
703impl DisplayAs for MemSink {
704    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
705        match t {
706            DisplayFormatType::Default | DisplayFormatType::Verbose => {
707                let partition_count = self.batches.len();
708                write!(f, "MemoryTable (partitions={partition_count})")
709            }
710            DisplayFormatType::TreeRender => {
711                // TODO: collect info
712                write!(f, "")
713            }
714        }
715    }
716}
717
718impl MemSink {
719    /// Creates a new [`MemSink`].
720    ///
721    /// The caller is responsible for ensuring that there is at least one partition to insert into.
722    pub fn try_new(batches: Vec<PartitionData>, schema: SchemaRef) -> Result<Self> {
723        if batches.is_empty() {
724            return plan_err!("Cannot insert into MemTable with zero partitions");
725        }
726        Ok(Self { batches, schema })
727    }
728}
729
730#[async_trait]
731impl DataSink for MemSink {
732    fn as_any(&self) -> &dyn Any {
733        self
734    }
735
736    fn schema(&self) -> &SchemaRef {
737        &self.schema
738    }
739
740    async fn write_all(
741        &self,
742        mut data: SendableRecordBatchStream,
743        _context: &Arc<TaskContext>,
744    ) -> Result<u64> {
745        let num_partitions = self.batches.len();
746
747        // buffer up the data round robin style into num_partitions
748
749        let mut new_batches = vec![vec![]; num_partitions];
750        let mut i = 0;
751        let mut row_count = 0;
752        while let Some(batch) = data.next().await.transpose()? {
753            row_count += batch.num_rows();
754            new_batches[i].push(batch);
755            i = (i + 1) % num_partitions;
756        }
757
758        // write the outputs into the batches
759        for (target, mut batches) in self.batches.iter().zip(new_batches.into_iter()) {
760            // Append all the new batches in one go to minimize locking overhead
761            target.write().await.append(&mut batches);
762        }
763
764        Ok(row_count as u64)
765    }
766}
767
768#[cfg(test)]
769mod memory_source_tests {
770    use std::sync::Arc;
771
772    use crate::memory::MemorySourceConfig;
773    use crate::source::DataSourceExec;
774
775    use arrow::compute::SortOptions;
776    use arrow::datatypes::{DataType, Field, Schema};
777    use datafusion_common::Result;
778    use datafusion_physical_expr::expressions::col;
779    use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
780    use datafusion_physical_plan::ExecutionPlan;
781
782    #[test]
783    fn test_memory_order_eq() -> Result<()> {
784        let schema = Arc::new(Schema::new(vec![
785            Field::new("a", DataType::Int64, false),
786            Field::new("b", DataType::Int64, false),
787            Field::new("c", DataType::Int64, false),
788        ]));
789        let sort1: LexOrdering = [
790            PhysicalSortExpr {
791                expr: col("a", &schema)?,
792                options: SortOptions::default(),
793            },
794            PhysicalSortExpr {
795                expr: col("b", &schema)?,
796                options: SortOptions::default(),
797            },
798        ]
799        .into();
800        let sort2: LexOrdering = [PhysicalSortExpr {
801            expr: col("c", &schema)?,
802            options: SortOptions::default(),
803        }]
804        .into();
805        let mut expected_output_order = sort1.clone();
806        expected_output_order.extend(sort2.clone());
807
808        let sort_information = vec![sort1.clone(), sort2.clone()];
809        let mem_exec = DataSourceExec::from_data_source(
810            MemorySourceConfig::try_new(&[vec![]], schema, None)?
811                .try_with_sort_information(sort_information)?,
812        );
813
814        assert_eq!(
815            mem_exec.properties().output_ordering().unwrap(),
816            &expected_output_order
817        );
818        let eq_properties = mem_exec.properties().equivalence_properties();
819        assert!(eq_properties.oeq_class().contains(&sort1));
820        assert!(eq_properties.oeq_class().contains(&sort2));
821        Ok(())
822    }
823}
824
825#[cfg(test)]
826mod tests {
827    use super::*;
828    use crate::test_util::col;
829    use crate::tests::{aggr_test_schema, make_partition};
830
831    use arrow::array::{ArrayRef, Int32Array, Int64Array, StringArray};
832    use arrow::datatypes::{DataType, Field};
833    use datafusion_common::assert_batches_eq;
834    use datafusion_common::stats::{ColumnStatistics, Precision};
835    use datafusion_physical_expr::PhysicalSortExpr;
836    use datafusion_physical_plan::expressions::lit;
837
838    use futures::StreamExt;
839
840    #[tokio::test]
841    async fn exec_with_limit() -> Result<()> {
842        let task_ctx = Arc::new(TaskContext::default());
843        let batch = make_partition(7);
844        let schema = batch.schema();
845        let batches = vec![batch.clone(), batch];
846
847        let exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap();
848        assert_eq!(exec.fetch(), None);
849
850        let exec = exec.with_fetch(Some(4)).unwrap();
851        assert_eq!(exec.fetch(), Some(4));
852
853        let mut it = exec.execute(0, task_ctx)?;
854        let mut results = vec![];
855        while let Some(batch) = it.next().await {
856            results.push(batch?);
857        }
858
859        let expected = [
860            "+---+", "| i |", "+---+", "| 0 |", "| 1 |", "| 2 |", "| 3 |", "+---+",
861        ];
862        assert_batches_eq!(expected, &results);
863        Ok(())
864    }
865
866    #[tokio::test]
867    async fn values_empty_case() -> Result<()> {
868        let schema = aggr_test_schema();
869        let empty = MemorySourceConfig::try_new_as_values(schema, vec![]);
870        assert!(empty.is_err());
871        Ok(())
872    }
873
874    #[test]
875    fn new_exec_with_batches() {
876        let batch = make_partition(7);
877        let schema = batch.schema();
878        let batches = vec![batch.clone(), batch];
879        let _exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap();
880    }
881
882    #[test]
883    fn new_exec_with_batches_empty() {
884        let batch = make_partition(7);
885        let schema = batch.schema();
886        let _ = MemorySourceConfig::try_new_from_batches(schema, Vec::new()).unwrap_err();
887    }
888
889    #[test]
890    fn new_exec_with_batches_invalid_schema() {
891        let batch = make_partition(7);
892        let batches = vec![batch.clone(), batch];
893
894        let invalid_schema = Arc::new(Schema::new(vec![
895            Field::new("col0", DataType::UInt32, false),
896            Field::new("col1", DataType::Utf8, false),
897        ]));
898        let _ = MemorySourceConfig::try_new_from_batches(invalid_schema, batches)
899            .unwrap_err();
900    }
901
902    // Test issue: https://github.com/apache/datafusion/issues/8763
903    #[test]
904    fn new_exec_with_non_nullable_schema() {
905        let schema = Arc::new(Schema::new(vec![Field::new(
906            "col0",
907            DataType::UInt32,
908            false,
909        )]));
910        let _ = MemorySourceConfig::try_new_as_values(
911            Arc::clone(&schema),
912            vec![vec![lit(1u32)]],
913        )
914        .unwrap();
915        // Test that a null value is rejected
916        let _ = MemorySourceConfig::try_new_as_values(
917            schema,
918            vec![vec![lit(ScalarValue::UInt32(None))]],
919        )
920        .unwrap_err();
921    }
922
923    #[test]
924    fn values_stats_with_nulls_only() -> Result<()> {
925        let data = vec![
926            vec![lit(ScalarValue::Null)],
927            vec![lit(ScalarValue::Null)],
928            vec![lit(ScalarValue::Null)],
929        ];
930        let rows = data.len();
931        let values = MemorySourceConfig::try_new_as_values(
932            Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)])),
933            data,
934        )?;
935
936        assert_eq!(
937            values.partition_statistics(None)?,
938            Statistics {
939                num_rows: Precision::Exact(rows),
940                total_byte_size: Precision::Exact(8), // not important
941                column_statistics: vec![ColumnStatistics {
942                    null_count: Precision::Exact(rows), // there are only nulls
943                    distinct_count: Precision::Absent,
944                    max_value: Precision::Absent,
945                    min_value: Precision::Absent,
946                    sum_value: Precision::Absent,
947                },],
948            }
949        );
950
951        Ok(())
952    }
953
954    fn batch(row_size: usize) -> RecordBatch {
955        let a: ArrayRef = Arc::new(Int32Array::from(vec![1; row_size]));
956        let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("foo"); row_size]));
957        let c: ArrayRef = Arc::new(Int64Array::from_iter(vec![1; row_size]));
958        RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
959    }
960
961    fn schema() -> SchemaRef {
962        batch(1).schema()
963    }
964
965    fn memorysrcconfig_no_partitions(
966        sort_information: Vec<LexOrdering>,
967    ) -> Result<MemorySourceConfig> {
968        let partitions = vec![];
969        MemorySourceConfig::try_new(&partitions, schema(), None)?
970            .try_with_sort_information(sort_information)
971    }
972
973    fn memorysrcconfig_1_partition_1_batch(
974        sort_information: Vec<LexOrdering>,
975    ) -> Result<MemorySourceConfig> {
976        let partitions = vec![vec![batch(100)]];
977        MemorySourceConfig::try_new(&partitions, schema(), None)?
978            .try_with_sort_information(sort_information)
979    }
980
981    fn memorysrcconfig_3_partitions_1_batch_each(
982        sort_information: Vec<LexOrdering>,
983    ) -> Result<MemorySourceConfig> {
984        let partitions = vec![vec![batch(100)], vec![batch(100)], vec![batch(100)]];
985        MemorySourceConfig::try_new(&partitions, schema(), None)?
986            .try_with_sort_information(sort_information)
987    }
988
989    fn memorysrcconfig_3_partitions_with_2_batches_each(
990        sort_information: Vec<LexOrdering>,
991    ) -> Result<MemorySourceConfig> {
992        let partitions = vec![
993            vec![batch(100), batch(100)],
994            vec![batch(100), batch(100)],
995            vec![batch(100), batch(100)],
996        ];
997        MemorySourceConfig::try_new(&partitions, schema(), None)?
998            .try_with_sort_information(sort_information)
999    }
1000
1001    /// Batches of different sizes, with batches ordered by size (100_000, 10_000, 100, 1)
1002    /// in the Memtable partition (a.k.a. vector of batches).
1003    fn memorysrcconfig_1_partition_with_different_sized_batches(
1004        sort_information: Vec<LexOrdering>,
1005    ) -> Result<MemorySourceConfig> {
1006        let partitions = vec![vec![batch(100_000), batch(10_000), batch(100), batch(1)]];
1007        MemorySourceConfig::try_new(&partitions, schema(), None)?
1008            .try_with_sort_information(sort_information)
1009    }
1010
1011    /// Same as [`memorysrcconfig_1_partition_with_different_sized_batches`],
1012    /// but the batches are ordered differently (not by size)
1013    /// in the Memtable partition (a.k.a. vector of batches).
1014    fn memorysrcconfig_1_partition_with_ordering_not_matching_size(
1015        sort_information: Vec<LexOrdering>,
1016    ) -> Result<MemorySourceConfig> {
1017        let partitions = vec![vec![batch(100_000), batch(1), batch(100), batch(10_000)]];
1018        MemorySourceConfig::try_new(&partitions, schema(), None)?
1019            .try_with_sort_information(sort_information)
1020    }
1021
1022    fn memorysrcconfig_2_partition_with_different_sized_batches(
1023        sort_information: Vec<LexOrdering>,
1024    ) -> Result<MemorySourceConfig> {
1025        let partitions = vec![
1026            vec![batch(100_000), batch(10_000), batch(1_000)],
1027            vec![batch(2_000), batch(20)],
1028        ];
1029        MemorySourceConfig::try_new(&partitions, schema(), None)?
1030            .try_with_sort_information(sort_information)
1031    }
1032
1033    fn memorysrcconfig_2_partition_with_extreme_sized_batches(
1034        sort_information: Vec<LexOrdering>,
1035    ) -> Result<MemorySourceConfig> {
1036        let partitions = vec![
1037            vec![
1038                batch(100_000),
1039                batch(1),
1040                batch(1),
1041                batch(1),
1042                batch(1),
1043                batch(0),
1044            ],
1045            vec![batch(1), batch(1), batch(1), batch(1), batch(0), batch(100)],
1046        ];
1047        MemorySourceConfig::try_new(&partitions, schema(), None)?
1048            .try_with_sort_information(sort_information)
1049    }
1050
1051    /// Assert that we get the expected count of partitions after repartitioning.
1052    ///
1053    /// If None, then we expected the [`DataSource::repartitioned`] to return None.
1054    fn assert_partitioning(
1055        partitioned_datasrc: Option<Arc<dyn DataSource>>,
1056        partition_cnt: Option<usize>,
1057    ) {
1058        let should_exist = if let Some(partition_cnt) = partition_cnt {
1059            format!("new datasource should exist and have {partition_cnt:?} partitions")
1060        } else {
1061            "new datasource should not exist".into()
1062        };
1063
1064        let actual = partitioned_datasrc
1065            .map(|datasrc| datasrc.output_partitioning().partition_count());
1066        assert_eq!(
1067            actual,
1068            partition_cnt,
1069            "partitioned datasrc does not match expected, we expected {should_exist}, instead found {actual:?}"
1070        );
1071    }
1072
1073    fn run_all_test_scenarios(
1074        output_ordering: Option<LexOrdering>,
1075        sort_information_on_config: Vec<LexOrdering>,
1076    ) -> Result<()> {
1077        let not_used = usize::MAX;
1078
1079        // src has no partitions
1080        let mem_src_config =
1081            memorysrcconfig_no_partitions(sort_information_on_config.clone())?;
1082        let partitioned_datasrc =
1083            mem_src_config.repartitioned(1, not_used, output_ordering.clone())?;
1084        assert_partitioning(partitioned_datasrc, None);
1085
1086        // src has partitions == target partitions (=1)
1087        let target_partitions = 1;
1088        let mem_src_config =
1089            memorysrcconfig_1_partition_1_batch(sort_information_on_config.clone())?;
1090        let partitioned_datasrc = mem_src_config.repartitioned(
1091            target_partitions,
1092            not_used,
1093            output_ordering.clone(),
1094        )?;
1095        assert_partitioning(partitioned_datasrc, None);
1096
1097        // src has partitions == target partitions (=3)
1098        let target_partitions = 3;
1099        let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
1100            sort_information_on_config.clone(),
1101        )?;
1102        let partitioned_datasrc = mem_src_config.repartitioned(
1103            target_partitions,
1104            not_used,
1105            output_ordering.clone(),
1106        )?;
1107        assert_partitioning(partitioned_datasrc, None);
1108
1109        // src has partitions > target partitions, but we don't merge them
1110        let target_partitions = 2;
1111        let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
1112            sort_information_on_config.clone(),
1113        )?;
1114        let partitioned_datasrc = mem_src_config.repartitioned(
1115            target_partitions,
1116            not_used,
1117            output_ordering.clone(),
1118        )?;
1119        assert_partitioning(partitioned_datasrc, None);
1120
1121        // src has partitions < target partitions, but not enough batches (per partition) to split into more partitions
1122        let target_partitions = 4;
1123        let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
1124            sort_information_on_config.clone(),
1125        )?;
1126        let partitioned_datasrc = mem_src_config.repartitioned(
1127            target_partitions,
1128            not_used,
1129            output_ordering.clone(),
1130        )?;
1131        assert_partitioning(partitioned_datasrc, None);
1132
1133        // src has partitions < target partitions, and can split to sufficient amount
1134        // has 6 batches across 3 partitions. Will need to split 2 of it's partitions.
1135        let target_partitions = 5;
1136        let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
1137            sort_information_on_config.clone(),
1138        )?;
1139        let partitioned_datasrc = mem_src_config.repartitioned(
1140            target_partitions,
1141            not_used,
1142            output_ordering.clone(),
1143        )?;
1144        assert_partitioning(partitioned_datasrc, Some(5));
1145
1146        // src has partitions < target partitions, and can split to sufficient amount
1147        // has 6 batches across 3 partitions. Will need to split all of it's partitions.
1148        let target_partitions = 6;
1149        let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
1150            sort_information_on_config.clone(),
1151        )?;
1152        let partitioned_datasrc = mem_src_config.repartitioned(
1153            target_partitions,
1154            not_used,
1155            output_ordering.clone(),
1156        )?;
1157        assert_partitioning(partitioned_datasrc, Some(6));
1158
1159        // src has partitions < target partitions, but not enough total batches to fulfill the split (desired target_partitions)
1160        let target_partitions = 3 * 2 + 1;
1161        let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
1162            sort_information_on_config.clone(),
1163        )?;
1164        let partitioned_datasrc = mem_src_config.repartitioned(
1165            target_partitions,
1166            not_used,
1167            output_ordering.clone(),
1168        )?;
1169        assert_partitioning(partitioned_datasrc, None);
1170
1171        // src has 1 partition with many batches of lopsided sizes
1172        // make sure it handles the split properly
1173        let target_partitions = 2;
1174        let mem_src_config = memorysrcconfig_1_partition_with_different_sized_batches(
1175            sort_information_on_config,
1176        )?;
1177        let partitioned_datasrc = mem_src_config.clone().repartitioned(
1178            target_partitions,
1179            not_used,
1180            output_ordering,
1181        )?;
1182        assert_partitioning(partitioned_datasrc.clone(), Some(2));
1183        // Starting = batch(100_000), batch(10_000), batch(100), batch(1).
1184        // It should have split as p1=batch(100_000), p2=[batch(10_000), batch(100), batch(1)]
1185        let partitioned_datasrc = partitioned_datasrc.unwrap();
1186        let Some(mem_src_config) = partitioned_datasrc
1187            .as_any()
1188            .downcast_ref::<MemorySourceConfig>()
1189        else {
1190            unreachable!()
1191        };
1192        let repartitioned_raw_batches = mem_src_config.partitions.clone();
1193        assert_eq!(repartitioned_raw_batches.len(), 2);
1194        let [ref p1, ref p2] = repartitioned_raw_batches[..] else {
1195            unreachable!()
1196        };
1197        // p1=batch(100_000)
1198        assert_eq!(p1.len(), 1);
1199        assert_eq!(p1[0].num_rows(), 100_000);
1200        // p2=[batch(10_000), batch(100), batch(1)]
1201        assert_eq!(p2.len(), 3);
1202        assert_eq!(p2[0].num_rows(), 10_000);
1203        assert_eq!(p2[1].num_rows(), 100);
1204        assert_eq!(p2[2].num_rows(), 1);
1205
1206        Ok(())
1207    }
1208
1209    #[test]
1210    fn test_repartition_no_sort_information_no_output_ordering() -> Result<()> {
1211        let no_sort = vec![];
1212        let no_output_ordering = None;
1213
1214        // Test: common set of functionality
1215        run_all_test_scenarios(no_output_ordering.clone(), no_sort.clone())?;
1216
1217        // Test: how no-sort-order divides differently.
1218        //    * does not preserve separate partitions (with own internal ordering) on even split,
1219        //    * nor does it preserve ordering (re-orders batch(2_000) vs batch(1_000)).
1220        let target_partitions = 3;
1221        let mem_src_config =
1222            memorysrcconfig_2_partition_with_different_sized_batches(no_sort)?;
1223        let partitioned_datasrc = mem_src_config.clone().repartitioned(
1224            target_partitions,
1225            usize::MAX,
1226            no_output_ordering,
1227        )?;
1228        assert_partitioning(partitioned_datasrc.clone(), Some(3));
1229        // Starting = batch(100_000), batch(10_000), batch(1_000), batch(2_000), batch(20)
1230        // It should have split as p1=batch(100_000), p2=batch(10_000),  p3=rest(mixed across original partitions)
1231        let repartitioned_raw_batches = mem_src_config
1232            .repartition_evenly_by_size(target_partitions)?
1233            .unwrap();
1234        assert_eq!(repartitioned_raw_batches.len(), 3);
1235        let [ref p1, ref p2, ref p3] = repartitioned_raw_batches[..] else {
1236            unreachable!()
1237        };
1238        // p1=batch(100_000)
1239        assert_eq!(p1.len(), 1);
1240        assert_eq!(p1[0].num_rows(), 100_000);
1241        // p2=batch(10_000)
1242        assert_eq!(p2.len(), 1);
1243        assert_eq!(p2[0].num_rows(), 10_000);
1244        // p3= batch(2_000), batch(1_000), batch(20)
1245        assert_eq!(p3.len(), 3);
1246        assert_eq!(p3[0].num_rows(), 2_000);
1247        assert_eq!(p3[1].num_rows(), 1_000);
1248        assert_eq!(p3[2].num_rows(), 20);
1249
1250        Ok(())
1251    }
1252
1253    #[test]
1254    fn test_repartition_no_sort_information_no_output_ordering_lopsized_batches(
1255    ) -> Result<()> {
1256        let no_sort = vec![];
1257        let no_output_ordering = None;
1258
1259        // Test: case has two input partitions:
1260        //     b(100_000), b(1), b(1), b(1), b(1), b(0)
1261        //     b(1), b(1), b(1), b(1), b(0), b(100)
1262        //
1263        // We want an output with target_partitions=5, which means the ideal division is:
1264        //     b(100_000)
1265        //     b(100)
1266        //     b(1), b(1), b(1)
1267        //     b(1), b(1), b(1)
1268        //     b(1), b(1), b(0)
1269        let target_partitions = 5;
1270        let mem_src_config =
1271            memorysrcconfig_2_partition_with_extreme_sized_batches(no_sort)?;
1272        let partitioned_datasrc = mem_src_config.clone().repartitioned(
1273            target_partitions,
1274            usize::MAX,
1275            no_output_ordering,
1276        )?;
1277        assert_partitioning(partitioned_datasrc.clone(), Some(5));
1278        // Starting partition 1 = batch(100_000), batch(1), batch(1), batch(1), batch(1), batch(0)
1279        // Starting partition 1 = batch(1), batch(1), batch(1), batch(1), batch(0), batch(100)
1280        // It should have split as p1=batch(100_000), p2=batch(100), p3=[batch(1),batch(1)], p4=[batch(1),batch(1)], p5=[batch(1),batch(1),batch(0),batch(0)]
1281        let repartitioned_raw_batches = mem_src_config
1282            .repartition_evenly_by_size(target_partitions)?
1283            .unwrap();
1284        assert_eq!(repartitioned_raw_batches.len(), 5);
1285        let [ref p1, ref p2, ref p3, ref p4, ref p5] = repartitioned_raw_batches[..]
1286        else {
1287            unreachable!()
1288        };
1289        // p1=batch(100_000)
1290        assert_eq!(p1.len(), 1);
1291        assert_eq!(p1[0].num_rows(), 100_000);
1292        // p2=batch(100)
1293        assert_eq!(p2.len(), 1);
1294        assert_eq!(p2[0].num_rows(), 100);
1295        // p3=[batch(1),batch(1),batch(1)]
1296        assert_eq!(p3.len(), 3);
1297        assert_eq!(p3[0].num_rows(), 1);
1298        assert_eq!(p3[1].num_rows(), 1);
1299        assert_eq!(p3[2].num_rows(), 1);
1300        // p4=[batch(1),batch(1),batch(1)]
1301        assert_eq!(p4.len(), 3);
1302        assert_eq!(p4[0].num_rows(), 1);
1303        assert_eq!(p4[1].num_rows(), 1);
1304        assert_eq!(p4[2].num_rows(), 1);
1305        // p5=[batch(1),batch(1),batch(0),batch(0)]
1306        assert_eq!(p5.len(), 4);
1307        assert_eq!(p5[0].num_rows(), 1);
1308        assert_eq!(p5[1].num_rows(), 1);
1309        assert_eq!(p5[2].num_rows(), 0);
1310        assert_eq!(p5[3].num_rows(), 0);
1311
1312        Ok(())
1313    }
1314
1315    #[test]
1316    fn test_repartition_with_sort_information() -> Result<()> {
1317        let schema = schema();
1318        let sort_key: LexOrdering =
1319            [PhysicalSortExpr::new_default(col("c", &schema)?)].into();
1320        let has_sort = vec![sort_key.clone()];
1321        let output_ordering = Some(sort_key);
1322
1323        // Test: common set of functionality
1324        run_all_test_scenarios(output_ordering.clone(), has_sort.clone())?;
1325
1326        // Test: DOES preserve separate partitions (with own internal ordering)
1327        let target_partitions = 3;
1328        let mem_src_config =
1329            memorysrcconfig_2_partition_with_different_sized_batches(has_sort)?;
1330        let partitioned_datasrc = mem_src_config.clone().repartitioned(
1331            target_partitions,
1332            usize::MAX,
1333            output_ordering.clone(),
1334        )?;
1335        assert_partitioning(partitioned_datasrc.clone(), Some(3));
1336        // Starting = batch(100_000), batch(10_000), batch(1_000), batch(2_000), batch(20)
1337        // It should have split as p1=batch(100_000), p2=[batch(10_000),batch(1_000)],  p3=<other_partition>
1338        let Some(output_ord) = output_ordering else {
1339            unreachable!()
1340        };
1341        let repartitioned_raw_batches = mem_src_config
1342            .repartition_preserving_order(target_partitions, output_ord)?
1343            .unwrap();
1344        assert_eq!(repartitioned_raw_batches.len(), 3);
1345        let [ref p1, ref p2, ref p3] = repartitioned_raw_batches[..] else {
1346            unreachable!()
1347        };
1348        // p1=batch(100_000)
1349        assert_eq!(p1.len(), 1);
1350        assert_eq!(p1[0].num_rows(), 100_000);
1351        // p2=[batch(10_000),batch(1_000)]
1352        assert_eq!(p2.len(), 2);
1353        assert_eq!(p2[0].num_rows(), 10_000);
1354        assert_eq!(p2[1].num_rows(), 1_000);
1355        // p3=batch(2_000), batch(20)
1356        assert_eq!(p3.len(), 2);
1357        assert_eq!(p3[0].num_rows(), 2_000);
1358        assert_eq!(p3[1].num_rows(), 20);
1359
1360        Ok(())
1361    }
1362
1363    #[test]
1364    fn test_repartition_with_batch_ordering_not_matching_sizing() -> Result<()> {
1365        let schema = schema();
1366        let sort_key: LexOrdering =
1367            [PhysicalSortExpr::new_default(col("c", &schema)?)].into();
1368        let has_sort = vec![sort_key.clone()];
1369        let output_ordering = Some(sort_key);
1370
1371        // src has 1 partition with many batches of lopsided sizes
1372        // note that the input vector of batches are not ordered by decreasing size
1373        let target_partitions = 2;
1374        let mem_src_config =
1375            memorysrcconfig_1_partition_with_ordering_not_matching_size(has_sort)?;
1376        let partitioned_datasrc = mem_src_config.clone().repartitioned(
1377            target_partitions,
1378            usize::MAX,
1379            output_ordering,
1380        )?;
1381        assert_partitioning(partitioned_datasrc.clone(), Some(2));
1382        // Starting = batch(100_000), batch(1), batch(100), batch(10_000).
1383        // It should have split as p1=batch(100_000), p2=[batch(1), batch(100), batch(10_000)]
1384        let partitioned_datasrc = partitioned_datasrc.unwrap();
1385        let Some(mem_src_config) = partitioned_datasrc
1386            .as_any()
1387            .downcast_ref::<MemorySourceConfig>()
1388        else {
1389            unreachable!()
1390        };
1391        let repartitioned_raw_batches = mem_src_config.partitions.clone();
1392        assert_eq!(repartitioned_raw_batches.len(), 2);
1393        let [ref p1, ref p2] = repartitioned_raw_batches[..] else {
1394            unreachable!()
1395        };
1396        // p1=batch(100_000)
1397        assert_eq!(p1.len(), 1);
1398        assert_eq!(p1[0].num_rows(), 100_000);
1399        // p2=[batch(1), batch(100), batch(10_000)] -- **this is preserving the partition order**
1400        assert_eq!(p2.len(), 3);
1401        assert_eq!(p2[0].num_rows(), 1);
1402        assert_eq!(p2[1].num_rows(), 100);
1403        assert_eq!(p2[2].num_rows(), 10_000);
1404
1405        Ok(())
1406    }
1407}