datafusion_datasource/
memory.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::cmp::Ordering;
20use std::collections::BinaryHeap;
21use std::fmt;
22use std::fmt::Debug;
23use std::sync::Arc;
24
25use crate::sink::DataSink;
26use crate::source::{DataSource, DataSourceExec};
27use async_trait::async_trait;
28use datafusion_physical_plan::memory::MemoryStream;
29use datafusion_physical_plan::projection::{
30    all_alias_free_columns, new_projections_for_columns, ProjectionExec,
31};
32use datafusion_physical_plan::{
33    common, ColumnarValue, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
34    PhysicalExpr, SendableRecordBatchStream, Statistics,
35};
36
37use arrow::array::{RecordBatch, RecordBatchOptions};
38use arrow::datatypes::{Schema, SchemaRef};
39use datafusion_common::{internal_err, plan_err, project_schema, Result, ScalarValue};
40use datafusion_execution::TaskContext;
41use datafusion_physical_expr::equivalence::ProjectionMapping;
42use datafusion_physical_expr::expressions::Column;
43use datafusion_physical_expr::utils::collect_columns;
44use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
45use futures::StreamExt;
46use itertools::Itertools;
47use tokio::sync::RwLock;
48
49/// Data source configuration for reading in-memory batches of data
50#[derive(Clone, Debug)]
51pub struct MemorySourceConfig {
52    /// The partitions to query.
53    ///
54    /// Each partition is a `Vec<RecordBatch>`.
55    partitions: Vec<Vec<RecordBatch>>,
56    /// Schema representing the data before projection
57    schema: SchemaRef,
58    /// Schema representing the data after the optional projection is applied
59    projected_schema: SchemaRef,
60    /// Optional projection
61    projection: Option<Vec<usize>>,
62    /// Sort information: one or more equivalent orderings
63    sort_information: Vec<LexOrdering>,
64    /// if partition sizes should be displayed
65    show_sizes: bool,
66    /// The maximum number of records to read from this plan. If `None`,
67    /// all records after filtering are returned.
68    fetch: Option<usize>,
69}
70
71impl DataSource for MemorySourceConfig {
72    fn open(
73        &self,
74        partition: usize,
75        _context: Arc<TaskContext>,
76    ) -> Result<SendableRecordBatchStream> {
77        Ok(Box::pin(
78            MemoryStream::try_new(
79                self.partitions[partition].clone(),
80                Arc::clone(&self.projected_schema),
81                self.projection.clone(),
82            )?
83            .with_fetch(self.fetch),
84        ))
85    }
86
87    fn as_any(&self) -> &dyn Any {
88        self
89    }
90
91    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
92        match t {
93            DisplayFormatType::Default | DisplayFormatType::Verbose => {
94                let partition_sizes: Vec<_> =
95                    self.partitions.iter().map(|b| b.len()).collect();
96
97                let output_ordering = self
98                    .sort_information
99                    .first()
100                    .map(|output_ordering| format!(", output_ordering={output_ordering}"))
101                    .unwrap_or_default();
102
103                let eq_properties = self.eq_properties();
104                let constraints = eq_properties.constraints();
105                let constraints = if constraints.is_empty() {
106                    String::new()
107                } else {
108                    format!(", {constraints}")
109                };
110
111                let limit = self
112                    .fetch
113                    .map_or(String::new(), |limit| format!(", fetch={limit}"));
114                if self.show_sizes {
115                    write!(
116                                f,
117                                "partitions={}, partition_sizes={partition_sizes:?}{limit}{output_ordering}{constraints}",
118                                partition_sizes.len(),
119                            )
120                } else {
121                    write!(
122                        f,
123                        "partitions={}{limit}{output_ordering}{constraints}",
124                        partition_sizes.len(),
125                    )
126                }
127            }
128            DisplayFormatType::TreeRender => {
129                let total_rows = self.partitions.iter().map(|b| b.len()).sum::<usize>();
130                let total_bytes: usize = self
131                    .partitions
132                    .iter()
133                    .flatten()
134                    .map(|batch| batch.get_array_memory_size())
135                    .sum();
136                writeln!(f, "format=memory")?;
137                writeln!(f, "rows={total_rows}")?;
138                writeln!(f, "bytes={total_bytes}")?;
139                Ok(())
140            }
141        }
142    }
143
144    /// If possible, redistribute batches across partitions according to their size.
145    ///
146    /// Returns `Ok(None)` if unable to repartition. Preserve output ordering if exists.
147    /// Refer to [`DataSource::repartitioned`] for further details.
148    fn repartitioned(
149        &self,
150        target_partitions: usize,
151        _repartition_file_min_size: usize,
152        output_ordering: Option<LexOrdering>,
153    ) -> Result<Option<Arc<dyn DataSource>>> {
154        if self.partitions.is_empty() || self.partitions.len() >= target_partitions
155        // if have no partitions, or already have more partitions than desired, do not repartition
156        {
157            return Ok(None);
158        }
159
160        let maybe_repartitioned = if let Some(output_ordering) = output_ordering {
161            self.repartition_preserving_order(target_partitions, output_ordering)?
162        } else {
163            self.repartition_evenly_by_size(target_partitions)?
164        };
165
166        if let Some(repartitioned) = maybe_repartitioned {
167            Ok(Some(Arc::new(Self::try_new(
168                &repartitioned,
169                self.original_schema(),
170                self.projection.clone(),
171            )?)))
172        } else {
173            Ok(None)
174        }
175    }
176
177    fn output_partitioning(&self) -> Partitioning {
178        Partitioning::UnknownPartitioning(self.partitions.len())
179    }
180
181    fn eq_properties(&self) -> EquivalenceProperties {
182        EquivalenceProperties::new_with_orderings(
183            Arc::clone(&self.projected_schema),
184            self.sort_information.as_slice(),
185        )
186    }
187
188    fn statistics(&self) -> Result<Statistics> {
189        Ok(common::compute_record_batch_statistics(
190            &self.partitions,
191            &self.schema,
192            self.projection.clone(),
193        ))
194    }
195
196    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
197        let source = self.clone();
198        Some(Arc::new(source.with_limit(limit)))
199    }
200
201    fn fetch(&self) -> Option<usize> {
202        self.fetch
203    }
204
205    fn try_swapping_with_projection(
206        &self,
207        projection: &ProjectionExec,
208    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
209        // If there is any non-column or alias-carrier expression, Projection should not be removed.
210        // This process can be moved into MemoryExec, but it would be an overlap of their responsibility.
211        all_alias_free_columns(projection.expr())
212            .then(|| {
213                let all_projections = (0..self.schema.fields().len()).collect();
214                let new_projections = new_projections_for_columns(
215                    projection,
216                    self.projection().as_ref().unwrap_or(&all_projections),
217                );
218
219                MemorySourceConfig::try_new_exec(
220                    self.partitions(),
221                    self.original_schema(),
222                    Some(new_projections),
223                )
224                .map(|e| e as _)
225            })
226            .transpose()
227    }
228}
229
230impl MemorySourceConfig {
231    /// Create a new `MemorySourceConfig` for reading in-memory record batches
232    /// The provided `schema` should not have the projection applied.
233    pub fn try_new(
234        partitions: &[Vec<RecordBatch>],
235        schema: SchemaRef,
236        projection: Option<Vec<usize>>,
237    ) -> Result<Self> {
238        let projected_schema = project_schema(&schema, projection.as_ref())?;
239        Ok(Self {
240            partitions: partitions.to_vec(),
241            schema,
242            projected_schema,
243            projection,
244            sort_information: vec![],
245            show_sizes: true,
246            fetch: None,
247        })
248    }
249
250    /// Create a new `DataSourceExec` plan for reading in-memory record batches
251    /// The provided `schema` should not have the projection applied.
252    pub fn try_new_exec(
253        partitions: &[Vec<RecordBatch>],
254        schema: SchemaRef,
255        projection: Option<Vec<usize>>,
256    ) -> Result<Arc<DataSourceExec>> {
257        let source = Self::try_new(partitions, schema, projection)?;
258        Ok(DataSourceExec::from_data_source(source))
259    }
260
261    /// Create a new execution plan from a list of constant values (`ValuesExec`)
262    pub fn try_new_as_values(
263        schema: SchemaRef,
264        data: Vec<Vec<Arc<dyn PhysicalExpr>>>,
265    ) -> Result<Arc<DataSourceExec>> {
266        if data.is_empty() {
267            return plan_err!("Values list cannot be empty");
268        }
269
270        let n_row = data.len();
271        let n_col = schema.fields().len();
272
273        // We have this single row batch as a placeholder to satisfy evaluation argument
274        // and generate a single output row
275        let placeholder_schema = Arc::new(Schema::empty());
276        let placeholder_batch = RecordBatch::try_new_with_options(
277            Arc::clone(&placeholder_schema),
278            vec![],
279            &RecordBatchOptions::new().with_row_count(Some(1)),
280        )?;
281
282        // Evaluate each column
283        let arrays = (0..n_col)
284            .map(|j| {
285                (0..n_row)
286                    .map(|i| {
287                        let expr = &data[i][j];
288                        let result = expr.evaluate(&placeholder_batch)?;
289
290                        match result {
291                            ColumnarValue::Scalar(scalar) => Ok(scalar),
292                            ColumnarValue::Array(array) if array.len() == 1 => {
293                                ScalarValue::try_from_array(&array, 0)
294                            }
295                            ColumnarValue::Array(_) => {
296                                plan_err!("Cannot have array values in a values list")
297                            }
298                        }
299                    })
300                    .collect::<Result<Vec<_>>>()
301                    .and_then(ScalarValue::iter_to_array)
302            })
303            .collect::<Result<Vec<_>>>()?;
304
305        let batch = RecordBatch::try_new_with_options(
306            Arc::clone(&schema),
307            arrays,
308            &RecordBatchOptions::new().with_row_count(Some(n_row)),
309        )?;
310
311        let partitions = vec![batch];
312        Self::try_new_from_batches(Arc::clone(&schema), partitions)
313    }
314
315    /// Create a new plan using the provided schema and batches.
316    ///
317    /// Errors if any of the batches don't match the provided schema, or if no
318    /// batches are provided.
319    pub fn try_new_from_batches(
320        schema: SchemaRef,
321        batches: Vec<RecordBatch>,
322    ) -> Result<Arc<DataSourceExec>> {
323        if batches.is_empty() {
324            return plan_err!("Values list cannot be empty");
325        }
326
327        for batch in &batches {
328            let batch_schema = batch.schema();
329            if batch_schema != schema {
330                return plan_err!(
331                    "Batch has invalid schema. Expected: {}, got: {}",
332                    schema,
333                    batch_schema
334                );
335            }
336        }
337
338        let partitions = vec![batches];
339        let source = Self {
340            partitions,
341            schema: Arc::clone(&schema),
342            projected_schema: Arc::clone(&schema),
343            projection: None,
344            sort_information: vec![],
345            show_sizes: true,
346            fetch: None,
347        };
348        Ok(DataSourceExec::from_data_source(source))
349    }
350
351    /// Set the limit of the files
352    pub fn with_limit(mut self, limit: Option<usize>) -> Self {
353        self.fetch = limit;
354        self
355    }
356
357    /// Set `show_sizes` to determine whether to display partition sizes
358    pub fn with_show_sizes(mut self, show_sizes: bool) -> Self {
359        self.show_sizes = show_sizes;
360        self
361    }
362
363    /// Ref to partitions
364    pub fn partitions(&self) -> &[Vec<RecordBatch>] {
365        &self.partitions
366    }
367
368    /// Ref to projection
369    pub fn projection(&self) -> &Option<Vec<usize>> {
370        &self.projection
371    }
372
373    /// Show sizes
374    pub fn show_sizes(&self) -> bool {
375        self.show_sizes
376    }
377
378    /// Ref to sort information
379    pub fn sort_information(&self) -> &[LexOrdering] {
380        &self.sort_information
381    }
382
383    /// A memory table can be ordered by multiple expressions simultaneously.
384    /// [`EquivalenceProperties`] keeps track of expressions that describe the
385    /// global ordering of the schema. These columns are not necessarily same; e.g.
386    /// ```text
387    /// ┌-------┐
388    /// | a | b |
389    /// |---|---|
390    /// | 1 | 9 |
391    /// | 2 | 8 |
392    /// | 3 | 7 |
393    /// | 5 | 5 |
394    /// └---┴---┘
395    /// ```
396    /// where both `a ASC` and `b DESC` can describe the table ordering. With
397    /// [`EquivalenceProperties`], we can keep track of these equivalences
398    /// and treat `a ASC` and `b DESC` as the same ordering requirement.
399    ///
400    /// Note that if there is an internal projection, that projection will be
401    /// also applied to the given `sort_information`.
402    pub fn try_with_sort_information(
403        mut self,
404        mut sort_information: Vec<LexOrdering>,
405    ) -> Result<Self> {
406        // All sort expressions must refer to the original schema
407        let fields = self.schema.fields();
408        let ambiguous_column = sort_information
409            .iter()
410            .flat_map(|ordering| ordering.clone())
411            .flat_map(|expr| collect_columns(&expr.expr))
412            .find(|col| {
413                fields
414                    .get(col.index())
415                    .map(|field| field.name() != col.name())
416                    .unwrap_or(true)
417            });
418        if let Some(col) = ambiguous_column {
419            return internal_err!(
420                "Column {:?} is not found in the original schema of the MemorySourceConfig",
421                col
422            );
423        }
424
425        // If there is a projection on the source, we also need to project orderings
426        if let Some(projection) = &self.projection {
427            let base_eqp = EquivalenceProperties::new_with_orderings(
428                self.original_schema(),
429                &sort_information,
430            );
431            let proj_exprs = projection
432                .iter()
433                .map(|idx| {
434                    let base_schema = self.original_schema();
435                    let name = base_schema.field(*idx).name();
436                    (Arc::new(Column::new(name, *idx)) as _, name.to_string())
437                })
438                .collect::<Vec<_>>();
439            let projection_mapping =
440                ProjectionMapping::try_new(&proj_exprs, &self.original_schema())?;
441            sort_information = base_eqp
442                .project(&projection_mapping, Arc::clone(&self.projected_schema))
443                .into_oeq_class()
444                .into_inner();
445        }
446
447        self.sort_information = sort_information;
448        Ok(self)
449    }
450
451    /// Arc clone of ref to original schema
452    pub fn original_schema(&self) -> SchemaRef {
453        Arc::clone(&self.schema)
454    }
455
456    /// Repartition while preserving order.
457    ///
458    /// Returns `Ok(None)` if cannot fulfill the requested repartitioning, such
459    /// as having too few batches to fulfill the `target_partitions` or if unable
460    /// to preserve output ordering.
461    fn repartition_preserving_order(
462        &self,
463        target_partitions: usize,
464        output_ordering: LexOrdering,
465    ) -> Result<Option<Vec<Vec<RecordBatch>>>> {
466        if !self.eq_properties().ordering_satisfy(&output_ordering) {
467            Ok(None)
468        } else {
469            let total_num_batches =
470                self.partitions.iter().map(|b| b.len()).sum::<usize>();
471            if total_num_batches < target_partitions {
472                // no way to create the desired repartitioning
473                return Ok(None);
474            }
475
476            let cnt_to_repartition = target_partitions - self.partitions.len();
477
478            // Label the current partitions and their order.
479            // Such that when we later split up the partitions into smaller sizes, we are maintaining the order.
480            let to_repartition = self
481                .partitions
482                .iter()
483                .enumerate()
484                .map(|(idx, batches)| RePartition {
485                    idx: idx + (cnt_to_repartition * idx), // make space in ordering for split partitions
486                    row_count: batches.iter().map(|batch| batch.num_rows()).sum(),
487                    batches: batches.clone(),
488                })
489                .collect_vec();
490
491            // Put all of the partitions into a heap ordered by `RePartition::partial_cmp`, which sizes
492            // by count of rows.
493            let mut max_heap = BinaryHeap::with_capacity(target_partitions);
494            for rep in to_repartition {
495                max_heap.push(rep);
496            }
497
498            // Split the largest partitions into smaller partitions. Maintaining the output
499            // order of the partitions & newly created partitions.
500            let mut cannot_split_further = Vec::with_capacity(target_partitions);
501            for _ in 0..cnt_to_repartition {
502                // triggers loop for the cnt_to_repartition. So if need another 4 partitions, it attempts to split 4 times.
503                loop {
504                    // Take the largest item off the heap, and attempt to split.
505                    let Some(to_split) = max_heap.pop() else {
506                        // Nothing left to attempt repartition. Break inner loop.
507                        break;
508                    };
509
510                    // Split the partition. The new partitions will be ordered with idx and idx+1.
511                    let mut new_partitions = to_split.split();
512                    if new_partitions.len() > 1 {
513                        for new_partition in new_partitions {
514                            max_heap.push(new_partition);
515                        }
516                        // Successful repartition. Break inner loop, and return to outer `cnt_to_repartition` loop.
517                        break;
518                    } else {
519                        cannot_split_further.push(new_partitions.remove(0));
520                    }
521                }
522            }
523            let mut partitions = max_heap.drain().collect_vec();
524            partitions.extend(cannot_split_further);
525
526            // Finally, sort all partitions by the output ordering.
527            // This was the original ordering of the batches within the partition. We are maintaining this ordering.
528            partitions.sort_by_key(|p| p.idx);
529            let partitions = partitions.into_iter().map(|rep| rep.batches).collect_vec();
530
531            Ok(Some(partitions))
532        }
533    }
534
535    /// Repartition into evenly sized chunks (as much as possible without batch splitting),
536    /// disregarding any ordering.
537    ///
538    /// Current implementation uses a first-fit-decreasing bin packing, modified to enable
539    /// us to still return the desired count of `target_partitions`.
540    ///
541    /// Returns `Ok(None)` if cannot fulfill the requested repartitioning, such
542    /// as having too few batches to fulfill the `target_partitions`.
543    fn repartition_evenly_by_size(
544        &self,
545        target_partitions: usize,
546    ) -> Result<Option<Vec<Vec<RecordBatch>>>> {
547        // determine if we have enough total batches to fulfill request
548        let mut flatten_batches =
549            self.partitions.clone().into_iter().flatten().collect_vec();
550        if flatten_batches.len() < target_partitions {
551            return Ok(None);
552        }
553
554        // Take all flattened batches (all in 1 partititon/vec) and divide evenly into the desired number of `target_partitions`.
555        let total_num_rows = flatten_batches.iter().map(|b| b.num_rows()).sum::<usize>();
556        // sort by size, so we pack multiple smaller batches into the same partition
557        flatten_batches.sort_by_key(|b| std::cmp::Reverse(b.num_rows()));
558
559        // Divide.
560        let mut partitions =
561            vec![Vec::with_capacity(flatten_batches.len()); target_partitions];
562        let mut target_partition_size = total_num_rows.div_ceil(target_partitions);
563        let mut total_rows_seen = 0;
564        let mut curr_bin_row_count = 0;
565        let mut idx = 0;
566        for batch in flatten_batches {
567            let row_cnt = batch.num_rows();
568            idx = std::cmp::min(idx, target_partitions - 1);
569
570            partitions[idx].push(batch);
571            curr_bin_row_count += row_cnt;
572            total_rows_seen += row_cnt;
573
574            if curr_bin_row_count >= target_partition_size {
575                idx += 1;
576                curr_bin_row_count = 0;
577
578                // update target_partition_size, to handle very lopsided batch distributions
579                // while still returning the count of `target_partitions`
580                if total_rows_seen < total_num_rows {
581                    target_partition_size = (total_num_rows - total_rows_seen)
582                        .div_ceil(target_partitions - idx);
583                }
584            }
585        }
586
587        Ok(Some(partitions))
588    }
589}
590
591/// For use in repartitioning, track the total size and original partition index.
592///
593/// Do not implement clone, in order to avoid unnecessary copying during repartitioning.
594struct RePartition {
595    /// Original output ordering for the partition.
596    idx: usize,
597    /// Total size of the partition, for use in heap ordering
598    /// (a.k.a. splitting up the largest partitions).
599    row_count: usize,
600    /// A partition containing record batches.
601    batches: Vec<RecordBatch>,
602}
603
604impl RePartition {
605    /// Split [`RePartition`] into 2 pieces, consuming self.
606    ///
607    /// Returns only 1 partition if cannot be split further.
608    fn split(self) -> Vec<Self> {
609        if self.batches.len() == 1 {
610            return vec![self];
611        }
612
613        let new_0 = RePartition {
614            idx: self.idx, // output ordering
615            row_count: 0,
616            batches: vec![],
617        };
618        let new_1 = RePartition {
619            idx: self.idx + 1, // output ordering +1
620            row_count: 0,
621            batches: vec![],
622        };
623        let split_pt = self.row_count / 2;
624
625        let [new_0, new_1] = self.batches.into_iter().fold(
626            [new_0, new_1],
627            |[mut new0, mut new1], batch| {
628                if new0.row_count < split_pt {
629                    new0.add_batch(batch);
630                } else {
631                    new1.add_batch(batch);
632                }
633                [new0, new1]
634            },
635        );
636        vec![new_0, new_1]
637    }
638
639    fn add_batch(&mut self, batch: RecordBatch) {
640        self.row_count += batch.num_rows();
641        self.batches.push(batch);
642    }
643}
644
645impl PartialOrd for RePartition {
646    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
647        Some(self.row_count.cmp(&other.row_count))
648    }
649}
650
651impl Ord for RePartition {
652    fn cmp(&self, other: &Self) -> Ordering {
653        self.row_count.cmp(&other.row_count)
654    }
655}
656
657impl PartialEq for RePartition {
658    fn eq(&self, other: &Self) -> bool {
659        self.row_count.eq(&other.row_count)
660    }
661}
662
663impl Eq for RePartition {}
664
665impl fmt::Display for RePartition {
666    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
667        write!(
668            f,
669            "{}rows-in-{}batches@{}",
670            self.row_count,
671            self.batches.len(),
672            self.idx
673        )
674    }
675}
676
677/// Type alias for partition data
678pub type PartitionData = Arc<RwLock<Vec<RecordBatch>>>;
679
680/// Implements for writing to a [`MemTable`]
681///
682/// [`MemTable`]: <https://docs.rs/datafusion/latest/datafusion/datasource/memory/struct.MemTable.html>
683pub struct MemSink {
684    /// Target locations for writing data
685    batches: Vec<PartitionData>,
686    schema: SchemaRef,
687}
688
689impl Debug for MemSink {
690    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
691        f.debug_struct("MemSink")
692            .field("num_partitions", &self.batches.len())
693            .finish()
694    }
695}
696
697impl DisplayAs for MemSink {
698    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
699        match t {
700            DisplayFormatType::Default | DisplayFormatType::Verbose => {
701                let partition_count = self.batches.len();
702                write!(f, "MemoryTable (partitions={partition_count})")
703            }
704            DisplayFormatType::TreeRender => {
705                // TODO: collect info
706                write!(f, "")
707            }
708        }
709    }
710}
711
712impl MemSink {
713    /// Creates a new [`MemSink`].
714    ///
715    /// The caller is responsible for ensuring that there is at least one partition to insert into.
716    pub fn try_new(batches: Vec<PartitionData>, schema: SchemaRef) -> Result<Self> {
717        if batches.is_empty() {
718            return plan_err!("Cannot insert into MemTable with zero partitions");
719        }
720        Ok(Self { batches, schema })
721    }
722}
723
724#[async_trait]
725impl DataSink for MemSink {
726    fn as_any(&self) -> &dyn Any {
727        self
728    }
729
730    fn schema(&self) -> &SchemaRef {
731        &self.schema
732    }
733
734    async fn write_all(
735        &self,
736        mut data: SendableRecordBatchStream,
737        _context: &Arc<TaskContext>,
738    ) -> Result<u64> {
739        let num_partitions = self.batches.len();
740
741        // buffer up the data round robin style into num_partitions
742
743        let mut new_batches = vec![vec![]; num_partitions];
744        let mut i = 0;
745        let mut row_count = 0;
746        while let Some(batch) = data.next().await.transpose()? {
747            row_count += batch.num_rows();
748            new_batches[i].push(batch);
749            i = (i + 1) % num_partitions;
750        }
751
752        // write the outputs into the batches
753        for (target, mut batches) in self.batches.iter().zip(new_batches.into_iter()) {
754            // Append all the new batches in one go to minimize locking overhead
755            target.write().await.append(&mut batches);
756        }
757
758        Ok(row_count as u64)
759    }
760}
761
762#[cfg(test)]
763mod memory_source_tests {
764    use std::sync::Arc;
765
766    use crate::memory::MemorySourceConfig;
767    use crate::source::DataSourceExec;
768    use datafusion_physical_plan::ExecutionPlan;
769
770    use arrow::compute::SortOptions;
771    use arrow::datatypes::{DataType, Field, Schema};
772    use datafusion_physical_expr::expressions::col;
773    use datafusion_physical_expr::PhysicalSortExpr;
774    use datafusion_physical_expr_common::sort_expr::LexOrdering;
775
776    #[test]
777    fn test_memory_order_eq() -> datafusion_common::Result<()> {
778        let schema = Arc::new(Schema::new(vec![
779            Field::new("a", DataType::Int64, false),
780            Field::new("b", DataType::Int64, false),
781            Field::new("c", DataType::Int64, false),
782        ]));
783        let sort1 = LexOrdering::new(vec![
784            PhysicalSortExpr {
785                expr: col("a", &schema)?,
786                options: SortOptions::default(),
787            },
788            PhysicalSortExpr {
789                expr: col("b", &schema)?,
790                options: SortOptions::default(),
791            },
792        ]);
793        let sort2 = LexOrdering::new(vec![PhysicalSortExpr {
794            expr: col("c", &schema)?,
795            options: SortOptions::default(),
796        }]);
797        let mut expected_output_order = LexOrdering::default();
798        expected_output_order.extend(sort1.clone());
799        expected_output_order.extend(sort2.clone());
800
801        let sort_information = vec![sort1.clone(), sort2.clone()];
802        let mem_exec = DataSourceExec::from_data_source(
803            MemorySourceConfig::try_new(&[vec![]], schema, None)?
804                .try_with_sort_information(sort_information)?,
805        );
806
807        assert_eq!(
808            mem_exec.properties().output_ordering().unwrap(),
809            &expected_output_order
810        );
811        let eq_properties = mem_exec.properties().equivalence_properties();
812        assert!(eq_properties.oeq_class().contains(&sort1));
813        assert!(eq_properties.oeq_class().contains(&sort2));
814        Ok(())
815    }
816}
817
818#[cfg(test)]
819mod tests {
820    use crate::test_util::col;
821    use crate::tests::{aggr_test_schema, make_partition};
822
823    use super::*;
824
825    use arrow::array::{ArrayRef, Int32Array, Int64Array, StringArray};
826    use arrow::compute::SortOptions;
827    use datafusion_physical_expr::PhysicalSortExpr;
828    use datafusion_physical_plan::expressions::lit;
829
830    use arrow::datatypes::{DataType, Field};
831    use datafusion_common::assert_batches_eq;
832    use datafusion_common::stats::{ColumnStatistics, Precision};
833    use futures::StreamExt;
834
835    #[tokio::test]
836    async fn exec_with_limit() -> Result<()> {
837        let task_ctx = Arc::new(TaskContext::default());
838        let batch = make_partition(7);
839        let schema = batch.schema();
840        let batches = vec![batch.clone(), batch];
841
842        let exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap();
843        assert_eq!(exec.fetch(), None);
844
845        let exec = exec.with_fetch(Some(4)).unwrap();
846        assert_eq!(exec.fetch(), Some(4));
847
848        let mut it = exec.execute(0, task_ctx)?;
849        let mut results = vec![];
850        while let Some(batch) = it.next().await {
851            results.push(batch?);
852        }
853
854        let expected = [
855            "+---+", "| i |", "+---+", "| 0 |", "| 1 |", "| 2 |", "| 3 |", "+---+",
856        ];
857        assert_batches_eq!(expected, &results);
858        Ok(())
859    }
860
861    #[tokio::test]
862    async fn values_empty_case() -> Result<()> {
863        let schema = aggr_test_schema();
864        let empty = MemorySourceConfig::try_new_as_values(schema, vec![]);
865        assert!(empty.is_err());
866        Ok(())
867    }
868
869    #[test]
870    fn new_exec_with_batches() {
871        let batch = make_partition(7);
872        let schema = batch.schema();
873        let batches = vec![batch.clone(), batch];
874        let _exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap();
875    }
876
877    #[test]
878    fn new_exec_with_batches_empty() {
879        let batch = make_partition(7);
880        let schema = batch.schema();
881        let _ = MemorySourceConfig::try_new_from_batches(schema, Vec::new()).unwrap_err();
882    }
883
884    #[test]
885    fn new_exec_with_batches_invalid_schema() {
886        let batch = make_partition(7);
887        let batches = vec![batch.clone(), batch];
888
889        let invalid_schema = Arc::new(Schema::new(vec![
890            Field::new("col0", DataType::UInt32, false),
891            Field::new("col1", DataType::Utf8, false),
892        ]));
893        let _ = MemorySourceConfig::try_new_from_batches(invalid_schema, batches)
894            .unwrap_err();
895    }
896
897    // Test issue: https://github.com/apache/datafusion/issues/8763
898    #[test]
899    fn new_exec_with_non_nullable_schema() {
900        let schema = Arc::new(Schema::new(vec![Field::new(
901            "col0",
902            DataType::UInt32,
903            false,
904        )]));
905        let _ = MemorySourceConfig::try_new_as_values(
906            Arc::clone(&schema),
907            vec![vec![lit(1u32)]],
908        )
909        .unwrap();
910        // Test that a null value is rejected
911        let _ = MemorySourceConfig::try_new_as_values(
912            schema,
913            vec![vec![lit(ScalarValue::UInt32(None))]],
914        )
915        .unwrap_err();
916    }
917
918    #[test]
919    fn values_stats_with_nulls_only() -> Result<()> {
920        let data = vec![
921            vec![lit(ScalarValue::Null)],
922            vec![lit(ScalarValue::Null)],
923            vec![lit(ScalarValue::Null)],
924        ];
925        let rows = data.len();
926        let values = MemorySourceConfig::try_new_as_values(
927            Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)])),
928            data,
929        )?;
930
931        assert_eq!(
932            values.partition_statistics(None)?,
933            Statistics {
934                num_rows: Precision::Exact(rows),
935                total_byte_size: Precision::Exact(8), // not important
936                column_statistics: vec![ColumnStatistics {
937                    null_count: Precision::Exact(rows), // there are only nulls
938                    distinct_count: Precision::Absent,
939                    max_value: Precision::Absent,
940                    min_value: Precision::Absent,
941                    sum_value: Precision::Absent,
942                },],
943            }
944        );
945
946        Ok(())
947    }
948
949    fn batch(row_size: usize) -> RecordBatch {
950        let a: ArrayRef = Arc::new(Int32Array::from(vec![1; row_size]));
951        let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("foo"); row_size]));
952        let c: ArrayRef = Arc::new(Int64Array::from_iter(vec![1; row_size]));
953        RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
954    }
955
956    fn schema() -> SchemaRef {
957        batch(1).schema()
958    }
959
960    fn memorysrcconfig_no_partitions(
961        sort_information: Vec<LexOrdering>,
962    ) -> Result<MemorySourceConfig> {
963        let partitions = vec![];
964        MemorySourceConfig::try_new(&partitions, schema(), None)?
965            .try_with_sort_information(sort_information)
966    }
967
968    fn memorysrcconfig_1_partition_1_batch(
969        sort_information: Vec<LexOrdering>,
970    ) -> Result<MemorySourceConfig> {
971        let partitions = vec![vec![batch(100)]];
972        MemorySourceConfig::try_new(&partitions, schema(), None)?
973            .try_with_sort_information(sort_information)
974    }
975
976    fn memorysrcconfig_3_partitions_1_batch_each(
977        sort_information: Vec<LexOrdering>,
978    ) -> Result<MemorySourceConfig> {
979        let partitions = vec![vec![batch(100)], vec![batch(100)], vec![batch(100)]];
980        MemorySourceConfig::try_new(&partitions, schema(), None)?
981            .try_with_sort_information(sort_information)
982    }
983
984    fn memorysrcconfig_3_partitions_with_2_batches_each(
985        sort_information: Vec<LexOrdering>,
986    ) -> Result<MemorySourceConfig> {
987        let partitions = vec![
988            vec![batch(100), batch(100)],
989            vec![batch(100), batch(100)],
990            vec![batch(100), batch(100)],
991        ];
992        MemorySourceConfig::try_new(&partitions, schema(), None)?
993            .try_with_sort_information(sort_information)
994    }
995
996    /// Batches of different sizes, with batches ordered by size (100_000, 10_000, 100, 1)
997    /// in the Memtable partition (a.k.a. vector of batches).
998    fn memorysrcconfig_1_partition_with_different_sized_batches(
999        sort_information: Vec<LexOrdering>,
1000    ) -> Result<MemorySourceConfig> {
1001        let partitions = vec![vec![batch(100_000), batch(10_000), batch(100), batch(1)]];
1002        MemorySourceConfig::try_new(&partitions, schema(), None)?
1003            .try_with_sort_information(sort_information)
1004    }
1005
1006    /// Same as [`memorysrcconfig_1_partition_with_different_sized_batches`],
1007    /// but the batches are ordered differently (not by size)
1008    /// in the Memtable partition (a.k.a. vector of batches).
1009    fn memorysrcconfig_1_partition_with_ordering_not_matching_size(
1010        sort_information: Vec<LexOrdering>,
1011    ) -> Result<MemorySourceConfig> {
1012        let partitions = vec![vec![batch(100_000), batch(1), batch(100), batch(10_000)]];
1013        MemorySourceConfig::try_new(&partitions, schema(), None)?
1014            .try_with_sort_information(sort_information)
1015    }
1016
1017    fn memorysrcconfig_2_partition_with_different_sized_batches(
1018        sort_information: Vec<LexOrdering>,
1019    ) -> Result<MemorySourceConfig> {
1020        let partitions = vec![
1021            vec![batch(100_000), batch(10_000), batch(1_000)],
1022            vec![batch(2_000), batch(20)],
1023        ];
1024        MemorySourceConfig::try_new(&partitions, schema(), None)?
1025            .try_with_sort_information(sort_information)
1026    }
1027
1028    fn memorysrcconfig_2_partition_with_extreme_sized_batches(
1029        sort_information: Vec<LexOrdering>,
1030    ) -> Result<MemorySourceConfig> {
1031        let partitions = vec![
1032            vec![
1033                batch(100_000),
1034                batch(1),
1035                batch(1),
1036                batch(1),
1037                batch(1),
1038                batch(0),
1039            ],
1040            vec![batch(1), batch(1), batch(1), batch(1), batch(0), batch(100)],
1041        ];
1042        MemorySourceConfig::try_new(&partitions, schema(), None)?
1043            .try_with_sort_information(sort_information)
1044    }
1045
1046    /// Assert that we get the expected count of partitions after repartitioning.
1047    ///
1048    /// If None, then we expected the [`DataSource::repartitioned`] to return None.
1049    fn assert_partitioning(
1050        partitioned_datasrc: Option<Arc<dyn DataSource>>,
1051        partition_cnt: Option<usize>,
1052    ) {
1053        let should_exist = if let Some(partition_cnt) = partition_cnt {
1054            format!("new datasource should exist and have {partition_cnt:?} partitions")
1055        } else {
1056            "new datasource should not exist".into()
1057        };
1058
1059        let actual = partitioned_datasrc
1060            .map(|datasrc| datasrc.output_partitioning().partition_count());
1061        assert_eq!(
1062            actual,
1063            partition_cnt,
1064            "partitioned datasrc does not match expected, we expected {should_exist}, instead found {actual:?}"
1065        );
1066    }
1067
1068    fn run_all_test_scenarios(
1069        output_ordering: Option<LexOrdering>,
1070        sort_information_on_config: Vec<LexOrdering>,
1071    ) -> Result<()> {
1072        let not_used = usize::MAX;
1073
1074        // src has no partitions
1075        let mem_src_config =
1076            memorysrcconfig_no_partitions(sort_information_on_config.clone())?;
1077        let partitioned_datasrc =
1078            mem_src_config.repartitioned(1, not_used, output_ordering.clone())?;
1079        assert_partitioning(partitioned_datasrc, None);
1080
1081        // src has partitions == target partitions (=1)
1082        let target_partitions = 1;
1083        let mem_src_config =
1084            memorysrcconfig_1_partition_1_batch(sort_information_on_config.clone())?;
1085        let partitioned_datasrc = mem_src_config.repartitioned(
1086            target_partitions,
1087            not_used,
1088            output_ordering.clone(),
1089        )?;
1090        assert_partitioning(partitioned_datasrc, None);
1091
1092        // src has partitions == target partitions (=3)
1093        let target_partitions = 3;
1094        let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
1095            sort_information_on_config.clone(),
1096        )?;
1097        let partitioned_datasrc = mem_src_config.repartitioned(
1098            target_partitions,
1099            not_used,
1100            output_ordering.clone(),
1101        )?;
1102        assert_partitioning(partitioned_datasrc, None);
1103
1104        // src has partitions > target partitions, but we don't merge them
1105        let target_partitions = 2;
1106        let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
1107            sort_information_on_config.clone(),
1108        )?;
1109        let partitioned_datasrc = mem_src_config.repartitioned(
1110            target_partitions,
1111            not_used,
1112            output_ordering.clone(),
1113        )?;
1114        assert_partitioning(partitioned_datasrc, None);
1115
1116        // src has partitions < target partitions, but not enough batches (per partition) to split into more partitions
1117        let target_partitions = 4;
1118        let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
1119            sort_information_on_config.clone(),
1120        )?;
1121        let partitioned_datasrc = mem_src_config.repartitioned(
1122            target_partitions,
1123            not_used,
1124            output_ordering.clone(),
1125        )?;
1126        assert_partitioning(partitioned_datasrc, None);
1127
1128        // src has partitions < target partitions, and can split to sufficient amount
1129        // has 6 batches across 3 partitions. Will need to split 2 of it's partitions.
1130        let target_partitions = 5;
1131        let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
1132            sort_information_on_config.clone(),
1133        )?;
1134        let partitioned_datasrc = mem_src_config.repartitioned(
1135            target_partitions,
1136            not_used,
1137            output_ordering.clone(),
1138        )?;
1139        assert_partitioning(partitioned_datasrc, Some(5));
1140
1141        // src has partitions < target partitions, and can split to sufficient amount
1142        // has 6 batches across 3 partitions. Will need to split all of it's partitions.
1143        let target_partitions = 6;
1144        let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
1145            sort_information_on_config.clone(),
1146        )?;
1147        let partitioned_datasrc = mem_src_config.repartitioned(
1148            target_partitions,
1149            not_used,
1150            output_ordering.clone(),
1151        )?;
1152        assert_partitioning(partitioned_datasrc, Some(6));
1153
1154        // src has partitions < target partitions, but not enough total batches to fulfill the split (desired target_partitions)
1155        let target_partitions = 3 * 2 + 1;
1156        let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
1157            sort_information_on_config.clone(),
1158        )?;
1159        let partitioned_datasrc = mem_src_config.repartitioned(
1160            target_partitions,
1161            not_used,
1162            output_ordering.clone(),
1163        )?;
1164        assert_partitioning(partitioned_datasrc, None);
1165
1166        // src has 1 partition with many batches of lopsided sizes
1167        // make sure it handles the split properly
1168        let target_partitions = 2;
1169        let mem_src_config = memorysrcconfig_1_partition_with_different_sized_batches(
1170            sort_information_on_config,
1171        )?;
1172        let partitioned_datasrc = mem_src_config.clone().repartitioned(
1173            target_partitions,
1174            not_used,
1175            output_ordering,
1176        )?;
1177        assert_partitioning(partitioned_datasrc.clone(), Some(2));
1178        // Starting = batch(100_000), batch(10_000), batch(100), batch(1).
1179        // It should have split as p1=batch(100_000), p2=[batch(10_000), batch(100), batch(1)]
1180        let partitioned_datasrc = partitioned_datasrc.unwrap();
1181        let Some(mem_src_config) = partitioned_datasrc
1182            .as_any()
1183            .downcast_ref::<MemorySourceConfig>()
1184        else {
1185            unreachable!()
1186        };
1187        let repartitioned_raw_batches = mem_src_config.partitions.clone();
1188        assert_eq!(repartitioned_raw_batches.len(), 2);
1189        let [ref p1, ref p2] = repartitioned_raw_batches[..] else {
1190            unreachable!()
1191        };
1192        // p1=batch(100_000)
1193        assert_eq!(p1.len(), 1);
1194        assert_eq!(p1[0].num_rows(), 100_000);
1195        // p2=[batch(10_000), batch(100), batch(1)]
1196        assert_eq!(p2.len(), 3);
1197        assert_eq!(p2[0].num_rows(), 10_000);
1198        assert_eq!(p2[1].num_rows(), 100);
1199        assert_eq!(p2[2].num_rows(), 1);
1200
1201        Ok(())
1202    }
1203
1204    #[test]
1205    fn test_repartition_no_sort_information_no_output_ordering() -> Result<()> {
1206        let no_sort = vec![];
1207        let no_output_ordering = None;
1208
1209        // Test: common set of functionality
1210        run_all_test_scenarios(no_output_ordering.clone(), no_sort.clone())?;
1211
1212        // Test: how no-sort-order divides differently.
1213        //    * does not preserve separate partitions (with own internal ordering) on even split,
1214        //    * nor does it preserve ordering (re-orders batch(2_000) vs batch(1_000)).
1215        let target_partitions = 3;
1216        let mem_src_config =
1217            memorysrcconfig_2_partition_with_different_sized_batches(no_sort)?;
1218        let partitioned_datasrc = mem_src_config.clone().repartitioned(
1219            target_partitions,
1220            usize::MAX,
1221            no_output_ordering,
1222        )?;
1223        assert_partitioning(partitioned_datasrc.clone(), Some(3));
1224        // Starting = batch(100_000), batch(10_000), batch(1_000), batch(2_000), batch(20)
1225        // It should have split as p1=batch(100_000), p2=batch(10_000),  p3=rest(mixed across original partitions)
1226        let repartitioned_raw_batches = mem_src_config
1227            .repartition_evenly_by_size(target_partitions)?
1228            .unwrap();
1229        assert_eq!(repartitioned_raw_batches.len(), 3);
1230        let [ref p1, ref p2, ref p3] = repartitioned_raw_batches[..] else {
1231            unreachable!()
1232        };
1233        // p1=batch(100_000)
1234        assert_eq!(p1.len(), 1);
1235        assert_eq!(p1[0].num_rows(), 100_000);
1236        // p2=batch(10_000)
1237        assert_eq!(p2.len(), 1);
1238        assert_eq!(p2[0].num_rows(), 10_000);
1239        // p3= batch(2_000), batch(1_000), batch(20)
1240        assert_eq!(p3.len(), 3);
1241        assert_eq!(p3[0].num_rows(), 2_000);
1242        assert_eq!(p3[1].num_rows(), 1_000);
1243        assert_eq!(p3[2].num_rows(), 20);
1244
1245        Ok(())
1246    }
1247
1248    #[test]
1249    fn test_repartition_no_sort_information_no_output_ordering_lopsized_batches(
1250    ) -> Result<()> {
1251        let no_sort = vec![];
1252        let no_output_ordering = None;
1253
1254        // Test: case has two input partitions:
1255        //     b(100_000), b(1), b(1), b(1), b(1), b(0)
1256        //     b(1), b(1), b(1), b(1), b(0), b(100)
1257        //
1258        // We want an output with target_partitions=5, which means the ideal division is:
1259        //     b(100_000)
1260        //     b(100)
1261        //     b(1), b(1), b(1)
1262        //     b(1), b(1), b(1)
1263        //     b(1), b(1), b(0)
1264        let target_partitions = 5;
1265        let mem_src_config =
1266            memorysrcconfig_2_partition_with_extreme_sized_batches(no_sort)?;
1267        let partitioned_datasrc = mem_src_config.clone().repartitioned(
1268            target_partitions,
1269            usize::MAX,
1270            no_output_ordering,
1271        )?;
1272        assert_partitioning(partitioned_datasrc.clone(), Some(5));
1273        // Starting partition 1 = batch(100_000), batch(1), batch(1), batch(1), batch(1), batch(0)
1274        // Starting partition 1 = batch(1), batch(1), batch(1), batch(1), batch(0), batch(100)
1275        // It should have split as p1=batch(100_000), p2=batch(100), p3=[batch(1),batch(1)], p4=[batch(1),batch(1)], p5=[batch(1),batch(1),batch(0),batch(0)]
1276        let repartitioned_raw_batches = mem_src_config
1277            .repartition_evenly_by_size(target_partitions)?
1278            .unwrap();
1279        assert_eq!(repartitioned_raw_batches.len(), 5);
1280        let [ref p1, ref p2, ref p3, ref p4, ref p5] = repartitioned_raw_batches[..]
1281        else {
1282            unreachable!()
1283        };
1284        // p1=batch(100_000)
1285        assert_eq!(p1.len(), 1);
1286        assert_eq!(p1[0].num_rows(), 100_000);
1287        // p2=batch(100)
1288        assert_eq!(p2.len(), 1);
1289        assert_eq!(p2[0].num_rows(), 100);
1290        // p3=[batch(1),batch(1),batch(1)]
1291        assert_eq!(p3.len(), 3);
1292        assert_eq!(p3[0].num_rows(), 1);
1293        assert_eq!(p3[1].num_rows(), 1);
1294        assert_eq!(p3[2].num_rows(), 1);
1295        // p4=[batch(1),batch(1),batch(1)]
1296        assert_eq!(p4.len(), 3);
1297        assert_eq!(p4[0].num_rows(), 1);
1298        assert_eq!(p4[1].num_rows(), 1);
1299        assert_eq!(p4[2].num_rows(), 1);
1300        // p5=[batch(1),batch(1),batch(0),batch(0)]
1301        assert_eq!(p5.len(), 4);
1302        assert_eq!(p5[0].num_rows(), 1);
1303        assert_eq!(p5[1].num_rows(), 1);
1304        assert_eq!(p5[2].num_rows(), 0);
1305        assert_eq!(p5[3].num_rows(), 0);
1306
1307        Ok(())
1308    }
1309
1310    #[test]
1311    fn test_repartition_with_sort_information() -> Result<()> {
1312        let schema = schema();
1313        let sort_key = LexOrdering::new(vec![PhysicalSortExpr {
1314            expr: col("c", &schema).unwrap(),
1315            options: SortOptions::default(),
1316        }]);
1317        let has_sort = vec![sort_key.clone()];
1318        let output_ordering = Some(sort_key);
1319
1320        // Test: common set of functionality
1321        run_all_test_scenarios(output_ordering.clone(), has_sort.clone())?;
1322
1323        // Test: DOES preserve separate partitions (with own internal ordering)
1324        let target_partitions = 3;
1325        let mem_src_config =
1326            memorysrcconfig_2_partition_with_different_sized_batches(has_sort)?;
1327        let partitioned_datasrc = mem_src_config.clone().repartitioned(
1328            target_partitions,
1329            usize::MAX,
1330            output_ordering.clone(),
1331        )?;
1332        assert_partitioning(partitioned_datasrc.clone(), Some(3));
1333        // Starting = batch(100_000), batch(10_000), batch(1_000), batch(2_000), batch(20)
1334        // It should have split as p1=batch(100_000), p2=[batch(10_000),batch(1_000)],  p3=<other_partition>
1335        let Some(output_ord) = output_ordering else {
1336            unreachable!()
1337        };
1338        let repartitioned_raw_batches = mem_src_config
1339            .repartition_preserving_order(target_partitions, output_ord)?
1340            .unwrap();
1341        assert_eq!(repartitioned_raw_batches.len(), 3);
1342        let [ref p1, ref p2, ref p3] = repartitioned_raw_batches[..] else {
1343            unreachable!()
1344        };
1345        // p1=batch(100_000)
1346        assert_eq!(p1.len(), 1);
1347        assert_eq!(p1[0].num_rows(), 100_000);
1348        // p2=[batch(10_000),batch(1_000)]
1349        assert_eq!(p2.len(), 2);
1350        assert_eq!(p2[0].num_rows(), 10_000);
1351        assert_eq!(p2[1].num_rows(), 1_000);
1352        // p3=batch(2_000), batch(20)
1353        assert_eq!(p3.len(), 2);
1354        assert_eq!(p3[0].num_rows(), 2_000);
1355        assert_eq!(p3[1].num_rows(), 20);
1356
1357        Ok(())
1358    }
1359
1360    #[test]
1361    fn test_repartition_with_batch_ordering_not_matching_sizing() -> Result<()> {
1362        let schema = schema();
1363        let sort_key = LexOrdering::new(vec![PhysicalSortExpr {
1364            expr: col("c", &schema).unwrap(),
1365            options: SortOptions::default(),
1366        }]);
1367        let has_sort = vec![sort_key.clone()];
1368        let output_ordering = Some(sort_key);
1369
1370        // src has 1 partition with many batches of lopsided sizes
1371        // note that the input vector of batches are not ordered by decreasing size
1372        let target_partitions = 2;
1373        let mem_src_config =
1374            memorysrcconfig_1_partition_with_ordering_not_matching_size(has_sort)?;
1375        let partitioned_datasrc = mem_src_config.clone().repartitioned(
1376            target_partitions,
1377            usize::MAX,
1378            output_ordering,
1379        )?;
1380        assert_partitioning(partitioned_datasrc.clone(), Some(2));
1381        // Starting = batch(100_000), batch(1), batch(100), batch(10_000).
1382        // It should have split as p1=batch(100_000), p2=[batch(1), batch(100), batch(10_000)]
1383        let partitioned_datasrc = partitioned_datasrc.unwrap();
1384        let Some(mem_src_config) = partitioned_datasrc
1385            .as_any()
1386            .downcast_ref::<MemorySourceConfig>()
1387        else {
1388            unreachable!()
1389        };
1390        let repartitioned_raw_batches = mem_src_config.partitions.clone();
1391        assert_eq!(repartitioned_raw_batches.len(), 2);
1392        let [ref p1, ref p2] = repartitioned_raw_batches[..] else {
1393            unreachable!()
1394        };
1395        // p1=batch(100_000)
1396        assert_eq!(p1.len(), 1);
1397        assert_eq!(p1[0].num_rows(), 100_000);
1398        // p2=[batch(1), batch(100), batch(10_000)] -- **this is preserving the partition order**
1399        assert_eq!(p2.len(), 3);
1400        assert_eq!(p2[0].num_rows(), 1);
1401        assert_eq!(p2[1].num_rows(), 100);
1402        assert_eq!(p2[2].num_rows(), 10_000);
1403
1404        Ok(())
1405    }
1406}