1use std::any::Any;
19use std::cmp::Ordering;
20use std::collections::BinaryHeap;
21use std::fmt;
22use std::fmt::Debug;
23use std::ops::Deref;
24use std::slice::from_ref;
25use std::sync::Arc;
26
27use crate::sink::DataSink;
28use crate::source::{DataSource, DataSourceExec};
29
30use arrow::array::{RecordBatch, RecordBatchOptions};
31use arrow::datatypes::{Schema, SchemaRef};
32use datafusion_common::{
33 Result, ScalarValue, assert_or_internal_err, plan_err, project_schema,
34};
35use datafusion_execution::TaskContext;
36use datafusion_physical_expr::equivalence::project_orderings;
37use datafusion_physical_expr::projection::ProjectionExprs;
38use datafusion_physical_expr::utils::collect_columns;
39use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
40use datafusion_physical_plan::memory::MemoryStream;
41use datafusion_physical_plan::projection::{
42 all_alias_free_columns, new_projections_for_columns,
43};
44use datafusion_physical_plan::{
45 ColumnarValue, DisplayAs, DisplayFormatType, Partitioning, PhysicalExpr,
46 SendableRecordBatchStream, Statistics, common,
47};
48
49use async_trait::async_trait;
50use datafusion_physical_plan::coop::cooperative;
51use datafusion_physical_plan::execution_plan::SchedulingType;
52use futures::StreamExt;
53use itertools::Itertools;
54use tokio::sync::RwLock;
55
56#[derive(Clone, Debug)]
58pub struct MemorySourceConfig {
59 partitions: Vec<Vec<RecordBatch>>,
63 schema: SchemaRef,
65 projected_schema: SchemaRef,
67 projection: Option<Vec<usize>>,
69 sort_information: Vec<LexOrdering>,
71 show_sizes: bool,
73 fetch: Option<usize>,
76}
77
78impl DataSource for MemorySourceConfig {
79 fn open(
80 &self,
81 partition: usize,
82 _context: Arc<TaskContext>,
83 ) -> Result<SendableRecordBatchStream> {
84 Ok(Box::pin(cooperative(
85 MemoryStream::try_new(
86 self.partitions[partition].clone(),
87 Arc::clone(&self.projected_schema),
88 self.projection.clone(),
89 )?
90 .with_fetch(self.fetch),
91 )))
92 }
93
94 fn as_any(&self) -> &dyn Any {
95 self
96 }
97
98 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
99 match t {
100 DisplayFormatType::Default | DisplayFormatType::Verbose => {
101 let partition_sizes: Vec<_> =
102 self.partitions.iter().map(|b| b.len()).collect();
103
104 let output_ordering = self
105 .sort_information
106 .first()
107 .map(|output_ordering| format!(", output_ordering={output_ordering}"))
108 .unwrap_or_default();
109
110 let eq_properties = self.eq_properties();
111 let constraints = eq_properties.constraints();
112 let constraints = if constraints.is_empty() {
113 String::new()
114 } else {
115 format!(", {constraints}")
116 };
117
118 let limit = self
119 .fetch
120 .map_or(String::new(), |limit| format!(", fetch={limit}"));
121 if self.show_sizes {
122 write!(
123 f,
124 "partitions={}, partition_sizes={partition_sizes:?}{limit}{output_ordering}{constraints}",
125 partition_sizes.len(),
126 )
127 } else {
128 write!(
129 f,
130 "partitions={}{limit}{output_ordering}{constraints}",
131 partition_sizes.len(),
132 )
133 }
134 }
135 DisplayFormatType::TreeRender => {
136 let total_rows = self.partitions.iter().map(|b| b.len()).sum::<usize>();
137 let total_bytes: usize = self
138 .partitions
139 .iter()
140 .flatten()
141 .map(|batch| batch.get_array_memory_size())
142 .sum();
143 writeln!(f, "format=memory")?;
144 writeln!(f, "rows={total_rows}")?;
145 writeln!(f, "bytes={total_bytes}")?;
146 Ok(())
147 }
148 }
149 }
150
151 fn repartitioned(
156 &self,
157 target_partitions: usize,
158 _repartition_file_min_size: usize,
159 output_ordering: Option<LexOrdering>,
160 ) -> Result<Option<Arc<dyn DataSource>>> {
161 if self.partitions.is_empty() || self.partitions.len() >= target_partitions
162 {
164 return Ok(None);
165 }
166
167 let maybe_repartitioned = if let Some(output_ordering) = output_ordering {
168 self.repartition_preserving_order(target_partitions, output_ordering)?
169 } else {
170 self.repartition_evenly_by_size(target_partitions)?
171 };
172
173 if let Some(repartitioned) = maybe_repartitioned {
174 Ok(Some(Arc::new(Self::try_new(
175 &repartitioned,
176 self.original_schema(),
177 self.projection.clone(),
178 )?)))
179 } else {
180 Ok(None)
181 }
182 }
183
184 fn output_partitioning(&self) -> Partitioning {
185 Partitioning::UnknownPartitioning(self.partitions.len())
186 }
187
188 fn eq_properties(&self) -> EquivalenceProperties {
189 EquivalenceProperties::new_with_orderings(
190 Arc::clone(&self.projected_schema),
191 self.sort_information.clone(),
192 )
193 }
194
195 fn scheduling_type(&self) -> SchedulingType {
196 SchedulingType::Cooperative
197 }
198
199 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
200 if let Some(partition) = partition {
201 if let Some(batches) = self.partitions.get(partition) {
203 Ok(common::compute_record_batch_statistics(
204 from_ref(batches),
205 &self.schema,
206 self.projection.clone(),
207 ))
208 } else {
209 Ok(Statistics::new_unknown(&self.projected_schema))
211 }
212 } else {
213 Ok(common::compute_record_batch_statistics(
215 &self.partitions,
216 &self.schema,
217 self.projection.clone(),
218 ))
219 }
220 }
221
222 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
223 let source = self.clone();
224 Some(Arc::new(source.with_limit(limit)))
225 }
226
227 fn fetch(&self) -> Option<usize> {
228 self.fetch
229 }
230
231 fn try_swapping_with_projection(
232 &self,
233 projection: &ProjectionExprs,
234 ) -> Result<Option<Arc<dyn DataSource>>> {
235 let exprs = projection.iter().cloned().collect_vec();
238 all_alias_free_columns(exprs.as_slice())
239 .then(|| {
240 let all_projections = (0..self.schema.fields().len()).collect();
241 let new_projections = new_projections_for_columns(
242 &exprs,
243 self.projection().as_ref().unwrap_or(&all_projections),
244 );
245
246 MemorySourceConfig::try_new(
247 self.partitions(),
248 self.original_schema(),
249 Some(new_projections),
250 )
251 .map(|s| Arc::new(s) as Arc<dyn DataSource>)
252 })
253 .transpose()
254 }
255}
256
257impl MemorySourceConfig {
258 pub fn try_new(
261 partitions: &[Vec<RecordBatch>],
262 schema: SchemaRef,
263 projection: Option<Vec<usize>>,
264 ) -> Result<Self> {
265 let projected_schema = project_schema(&schema, projection.as_ref())?;
266 Ok(Self {
267 partitions: partitions.to_vec(),
268 schema,
269 projected_schema,
270 projection,
271 sort_information: vec![],
272 show_sizes: true,
273 fetch: None,
274 })
275 }
276
277 pub fn try_new_exec(
280 partitions: &[Vec<RecordBatch>],
281 schema: SchemaRef,
282 projection: Option<Vec<usize>>,
283 ) -> Result<Arc<DataSourceExec>> {
284 let source = Self::try_new(partitions, schema, projection)?;
285 Ok(DataSourceExec::from_data_source(source))
286 }
287
288 #[expect(clippy::needless_pass_by_value)]
290 pub fn try_new_as_values(
291 schema: SchemaRef,
292 data: Vec<Vec<Arc<dyn PhysicalExpr>>>,
293 ) -> Result<Arc<DataSourceExec>> {
294 if data.is_empty() {
295 return plan_err!("Values list cannot be empty");
296 }
297
298 let n_row = data.len();
299 let n_col = schema.fields().len();
300
301 let placeholder_schema = Arc::new(Schema::empty());
304 let placeholder_batch = RecordBatch::try_new_with_options(
305 Arc::clone(&placeholder_schema),
306 vec![],
307 &RecordBatchOptions::new().with_row_count(Some(1)),
308 )?;
309
310 let arrays = (0..n_col)
312 .map(|j| {
313 (0..n_row)
314 .map(|i| {
315 let expr = &data[i][j];
316 let result = expr.evaluate(&placeholder_batch)?;
317
318 match result {
319 ColumnarValue::Scalar(scalar) => Ok(scalar),
320 ColumnarValue::Array(array) if array.len() == 1 => {
321 ScalarValue::try_from_array(&array, 0)
322 }
323 ColumnarValue::Array(_) => {
324 plan_err!("Cannot have array values in a values list")
325 }
326 }
327 })
328 .collect::<Result<Vec<_>>>()
329 .and_then(ScalarValue::iter_to_array)
330 })
331 .collect::<Result<Vec<_>>>()?;
332
333 let batch = RecordBatch::try_new_with_options(
334 Arc::clone(&schema),
335 arrays,
336 &RecordBatchOptions::new().with_row_count(Some(n_row)),
337 )?;
338
339 let partitions = vec![batch];
340 Self::try_new_from_batches(Arc::clone(&schema), partitions)
341 }
342
343 #[expect(clippy::needless_pass_by_value)]
348 pub fn try_new_from_batches(
349 schema: SchemaRef,
350 batches: Vec<RecordBatch>,
351 ) -> Result<Arc<DataSourceExec>> {
352 if batches.is_empty() {
353 return plan_err!("Values list cannot be empty");
354 }
355
356 for batch in &batches {
357 let batch_schema = batch.schema();
358 if batch_schema != schema {
359 return plan_err!(
360 "Batch has invalid schema. Expected: {}, got: {}",
361 schema,
362 batch_schema
363 );
364 }
365 }
366
367 let partitions = vec![batches];
368 let source = Self {
369 partitions,
370 schema: Arc::clone(&schema),
371 projected_schema: Arc::clone(&schema),
372 projection: None,
373 sort_information: vec![],
374 show_sizes: true,
375 fetch: None,
376 };
377 Ok(DataSourceExec::from_data_source(source))
378 }
379
380 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
382 self.fetch = limit;
383 self
384 }
385
386 pub fn with_show_sizes(mut self, show_sizes: bool) -> Self {
388 self.show_sizes = show_sizes;
389 self
390 }
391
392 pub fn partitions(&self) -> &[Vec<RecordBatch>] {
394 &self.partitions
395 }
396
397 pub fn projection(&self) -> &Option<Vec<usize>> {
399 &self.projection
400 }
401
402 pub fn show_sizes(&self) -> bool {
404 self.show_sizes
405 }
406
407 pub fn sort_information(&self) -> &[LexOrdering] {
409 &self.sort_information
410 }
411
412 pub fn try_with_sort_information(
432 mut self,
433 mut sort_information: Vec<LexOrdering>,
434 ) -> Result<Self> {
435 let fields = self.schema.fields();
437 let ambiguous_column = sort_information
438 .iter()
439 .flat_map(|ordering| ordering.clone())
440 .flat_map(|expr| collect_columns(&expr.expr))
441 .find(|col| {
442 fields
443 .get(col.index())
444 .map(|field| field.name() != col.name())
445 .unwrap_or(true)
446 });
447 assert_or_internal_err!(
448 ambiguous_column.is_none(),
449 "Column {:?} is not found in the original schema of the MemorySourceConfig",
450 ambiguous_column.as_ref().unwrap()
451 );
452
453 if self.projection.is_some() {
455 sort_information =
456 project_orderings(&sort_information, &self.projected_schema);
457 }
458
459 self.sort_information = sort_information;
460 Ok(self)
461 }
462
463 pub fn original_schema(&self) -> SchemaRef {
465 Arc::clone(&self.schema)
466 }
467
468 fn repartition_preserving_order(
474 &self,
475 target_partitions: usize,
476 output_ordering: LexOrdering,
477 ) -> Result<Option<Vec<Vec<RecordBatch>>>> {
478 if !self.eq_properties().ordering_satisfy(output_ordering)? {
479 Ok(None)
480 } else {
481 let total_num_batches =
482 self.partitions.iter().map(|b| b.len()).sum::<usize>();
483 if total_num_batches < target_partitions {
484 return Ok(None);
486 }
487
488 let cnt_to_repartition = target_partitions - self.partitions.len();
489
490 let to_repartition = self
493 .partitions
494 .iter()
495 .enumerate()
496 .map(|(idx, batches)| RePartition {
497 idx: idx + (cnt_to_repartition * idx), row_count: batches.iter().map(|batch| batch.num_rows()).sum(),
499 batches: batches.clone(),
500 })
501 .collect_vec();
502
503 let mut max_heap = BinaryHeap::with_capacity(target_partitions);
506 for rep in to_repartition {
507 max_heap.push(CompareByRowCount(rep));
508 }
509
510 let mut cannot_split_further = Vec::with_capacity(target_partitions);
513 for _ in 0..cnt_to_repartition {
514 loop {
516 let Some(to_split) = max_heap.pop() else {
518 break;
520 };
521
522 let mut new_partitions = to_split.into_inner().split();
524 if new_partitions.len() > 1 {
525 for new_partition in new_partitions {
526 max_heap.push(CompareByRowCount(new_partition));
527 }
528 break;
530 } else {
531 cannot_split_further.push(new_partitions.remove(0));
532 }
533 }
534 }
535 let mut partitions = max_heap
536 .drain()
537 .map(CompareByRowCount::into_inner)
538 .collect_vec();
539 partitions.extend(cannot_split_further);
540
541 partitions.sort_by_key(|p| p.idx);
544 let partitions = partitions.into_iter().map(|rep| rep.batches).collect_vec();
545
546 Ok(Some(partitions))
547 }
548 }
549
550 fn repartition_evenly_by_size(
559 &self,
560 target_partitions: usize,
561 ) -> Result<Option<Vec<Vec<RecordBatch>>>> {
562 let mut flatten_batches =
564 self.partitions.clone().into_iter().flatten().collect_vec();
565 if flatten_batches.len() < target_partitions {
566 return Ok(None);
567 }
568
569 let total_num_rows = flatten_batches.iter().map(|b| b.num_rows()).sum::<usize>();
571 flatten_batches.sort_by_key(|b| std::cmp::Reverse(b.num_rows()));
573
574 let mut partitions =
576 vec![Vec::with_capacity(flatten_batches.len()); target_partitions];
577 let mut target_partition_size = total_num_rows.div_ceil(target_partitions);
578 let mut total_rows_seen = 0;
579 let mut curr_bin_row_count = 0;
580 let mut idx = 0;
581 for batch in flatten_batches {
582 let row_cnt = batch.num_rows();
583 idx = std::cmp::min(idx, target_partitions - 1);
584
585 partitions[idx].push(batch);
586 curr_bin_row_count += row_cnt;
587 total_rows_seen += row_cnt;
588
589 if curr_bin_row_count >= target_partition_size {
590 idx += 1;
591 curr_bin_row_count = 0;
592
593 if total_rows_seen < total_num_rows {
596 target_partition_size = (total_num_rows - total_rows_seen)
597 .div_ceil(target_partitions - idx);
598 }
599 }
600 }
601
602 Ok(Some(partitions))
603 }
604}
605
606struct RePartition {
610 idx: usize,
612 row_count: usize,
615 batches: Vec<RecordBatch>,
617}
618
619impl RePartition {
620 fn split(self) -> Vec<Self> {
624 if self.batches.len() == 1 {
625 return vec![self];
626 }
627
628 let new_0 = RePartition {
629 idx: self.idx, row_count: 0,
631 batches: vec![],
632 };
633 let new_1 = RePartition {
634 idx: self.idx + 1, row_count: 0,
636 batches: vec![],
637 };
638 let split_pt = self.row_count / 2;
639
640 let [new_0, new_1] = self.batches.into_iter().fold(
641 [new_0, new_1],
642 |[mut new0, mut new1], batch| {
643 if new0.row_count < split_pt {
644 new0.add_batch(batch);
645 } else {
646 new1.add_batch(batch);
647 }
648 [new0, new1]
649 },
650 );
651 vec![new_0, new_1]
652 }
653
654 fn add_batch(&mut self, batch: RecordBatch) {
655 self.row_count += batch.num_rows();
656 self.batches.push(batch);
657 }
658}
659
660impl fmt::Display for RePartition {
661 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
662 write!(
663 f,
664 "{}rows-in-{}batches@{}",
665 self.row_count,
666 self.batches.len(),
667 self.idx
668 )
669 }
670}
671
672struct CompareByRowCount(RePartition);
673impl CompareByRowCount {
674 fn into_inner(self) -> RePartition {
675 self.0
676 }
677}
678impl Ord for CompareByRowCount {
679 fn cmp(&self, other: &Self) -> Ordering {
680 self.0.row_count.cmp(&other.0.row_count)
681 }
682}
683impl PartialOrd for CompareByRowCount {
684 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
685 Some(self.cmp(other))
686 }
687}
688impl PartialEq for CompareByRowCount {
689 fn eq(&self, other: &Self) -> bool {
690 self.cmp(other) == Ordering::Equal
692 }
693}
694impl Eq for CompareByRowCount {}
695impl Deref for CompareByRowCount {
696 type Target = RePartition;
697 fn deref(&self) -> &Self::Target {
698 &self.0
699 }
700}
701
702pub type PartitionData = Arc<RwLock<Vec<RecordBatch>>>;
704
705pub struct MemSink {
709 batches: Vec<PartitionData>,
711 schema: SchemaRef,
712}
713
714impl Debug for MemSink {
715 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
716 f.debug_struct("MemSink")
717 .field("num_partitions", &self.batches.len())
718 .finish()
719 }
720}
721
722impl DisplayAs for MemSink {
723 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
724 match t {
725 DisplayFormatType::Default | DisplayFormatType::Verbose => {
726 let partition_count = self.batches.len();
727 write!(f, "MemoryTable (partitions={partition_count})")
728 }
729 DisplayFormatType::TreeRender => {
730 write!(f, "")
732 }
733 }
734 }
735}
736
737impl MemSink {
738 pub fn try_new(batches: Vec<PartitionData>, schema: SchemaRef) -> Result<Self> {
742 if batches.is_empty() {
743 return plan_err!("Cannot insert into MemTable with zero partitions");
744 }
745 Ok(Self { batches, schema })
746 }
747}
748
749#[async_trait]
750impl DataSink for MemSink {
751 fn as_any(&self) -> &dyn Any {
752 self
753 }
754
755 fn schema(&self) -> &SchemaRef {
756 &self.schema
757 }
758
759 async fn write_all(
760 &self,
761 mut data: SendableRecordBatchStream,
762 _context: &Arc<TaskContext>,
763 ) -> Result<u64> {
764 let num_partitions = self.batches.len();
765
766 let mut new_batches = vec![vec![]; num_partitions];
769 let mut i = 0;
770 let mut row_count = 0;
771 while let Some(batch) = data.next().await.transpose()? {
772 row_count += batch.num_rows();
773 new_batches[i].push(batch);
774 i = (i + 1) % num_partitions;
775 }
776
777 for (target, mut batches) in self.batches.iter().zip(new_batches.into_iter()) {
779 target.write().await.append(&mut batches);
781 }
782
783 Ok(row_count as u64)
784 }
785}
786
787#[cfg(test)]
788mod memory_source_tests {
789 use std::sync::Arc;
790
791 use crate::memory::MemorySourceConfig;
792 use crate::source::DataSourceExec;
793
794 use arrow::compute::SortOptions;
795 use arrow::datatypes::{DataType, Field, Schema};
796 use datafusion_common::Result;
797 use datafusion_physical_expr::expressions::col;
798 use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
799 use datafusion_physical_plan::ExecutionPlan;
800
801 #[test]
802 fn test_memory_order_eq() -> Result<()> {
803 let schema = Arc::new(Schema::new(vec![
804 Field::new("a", DataType::Int64, false),
805 Field::new("b", DataType::Int64, false),
806 Field::new("c", DataType::Int64, false),
807 ]));
808 let sort1: LexOrdering = [
809 PhysicalSortExpr {
810 expr: col("a", &schema)?,
811 options: SortOptions::default(),
812 },
813 PhysicalSortExpr {
814 expr: col("b", &schema)?,
815 options: SortOptions::default(),
816 },
817 ]
818 .into();
819 let sort2: LexOrdering = [PhysicalSortExpr {
820 expr: col("c", &schema)?,
821 options: SortOptions::default(),
822 }]
823 .into();
824 let mut expected_output_order = sort1.clone();
825 expected_output_order.extend(sort2.clone());
826
827 let sort_information = vec![sort1.clone(), sort2.clone()];
828 let mem_exec = DataSourceExec::from_data_source(
829 MemorySourceConfig::try_new(&[vec![]], schema, None)?
830 .try_with_sort_information(sort_information)?,
831 );
832
833 assert_eq!(
834 mem_exec.properties().output_ordering().unwrap(),
835 &expected_output_order
836 );
837 let eq_properties = mem_exec.properties().equivalence_properties();
838 assert!(eq_properties.oeq_class().contains(&sort1));
839 assert!(eq_properties.oeq_class().contains(&sort2));
840 Ok(())
841 }
842}
843
844#[cfg(test)]
845mod tests {
846 use super::*;
847 use crate::test_util::col;
848 use crate::tests::{aggr_test_schema, make_partition};
849
850 use arrow::array::{ArrayRef, Int32Array, Int64Array, StringArray};
851 use arrow::datatypes::{DataType, Field};
852 use datafusion_common::assert_batches_eq;
853 use datafusion_common::stats::{ColumnStatistics, Precision};
854 use datafusion_physical_expr::PhysicalSortExpr;
855 use datafusion_physical_plan::expressions::lit;
856
857 use datafusion_physical_plan::ExecutionPlan;
858 use futures::StreamExt;
859
860 #[tokio::test]
861 async fn exec_with_limit() -> Result<()> {
862 let task_ctx = Arc::new(TaskContext::default());
863 let batch = make_partition(7);
864 let schema = batch.schema();
865 let batches = vec![batch.clone(), batch];
866
867 let exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap();
868 assert_eq!(exec.fetch(), None);
869
870 let exec = exec.with_fetch(Some(4)).unwrap();
871 assert_eq!(exec.fetch(), Some(4));
872
873 let mut it = exec.execute(0, task_ctx)?;
874 let mut results = vec![];
875 while let Some(batch) = it.next().await {
876 results.push(batch?);
877 }
878
879 let expected = [
880 "+---+", "| i |", "+---+", "| 0 |", "| 1 |", "| 2 |", "| 3 |", "+---+",
881 ];
882 assert_batches_eq!(expected, &results);
883 Ok(())
884 }
885
886 #[tokio::test]
887 async fn values_empty_case() -> Result<()> {
888 let schema = aggr_test_schema();
889 let empty = MemorySourceConfig::try_new_as_values(schema, vec![]);
890 assert!(empty.is_err());
891 Ok(())
892 }
893
894 #[test]
895 fn new_exec_with_batches() {
896 let batch = make_partition(7);
897 let schema = batch.schema();
898 let batches = vec![batch.clone(), batch];
899 let _exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap();
900 }
901
902 #[test]
903 fn new_exec_with_batches_empty() {
904 let batch = make_partition(7);
905 let schema = batch.schema();
906 let _ = MemorySourceConfig::try_new_from_batches(schema, Vec::new()).unwrap_err();
907 }
908
909 #[test]
910 fn new_exec_with_batches_invalid_schema() {
911 let batch = make_partition(7);
912 let batches = vec![batch.clone(), batch];
913
914 let invalid_schema = Arc::new(Schema::new(vec![
915 Field::new("col0", DataType::UInt32, false),
916 Field::new("col1", DataType::Utf8, false),
917 ]));
918 let _ = MemorySourceConfig::try_new_from_batches(invalid_schema, batches)
919 .unwrap_err();
920 }
921
922 #[test]
924 fn new_exec_with_non_nullable_schema() {
925 let schema = Arc::new(Schema::new(vec![Field::new(
926 "col0",
927 DataType::UInt32,
928 false,
929 )]));
930 let _ = MemorySourceConfig::try_new_as_values(
931 Arc::clone(&schema),
932 vec![vec![lit(1u32)]],
933 )
934 .unwrap();
935 let _ = MemorySourceConfig::try_new_as_values(
937 schema,
938 vec![vec![lit(ScalarValue::UInt32(None))]],
939 )
940 .unwrap_err();
941 }
942
943 #[test]
944 fn values_stats_with_nulls_only() -> Result<()> {
945 let data = vec![
946 vec![lit(ScalarValue::Null)],
947 vec![lit(ScalarValue::Null)],
948 vec![lit(ScalarValue::Null)],
949 ];
950 let rows = data.len();
951 let schema =
952 Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)]));
953 let values = MemorySourceConfig::try_new_as_values(schema, data)?;
954
955 assert_eq!(
956 values.partition_statistics(None)?,
957 Statistics {
958 num_rows: Precision::Exact(rows),
959 total_byte_size: Precision::Exact(8), column_statistics: vec![ColumnStatistics {
961 null_count: Precision::Exact(rows), distinct_count: Precision::Absent,
963 max_value: Precision::Absent,
964 min_value: Precision::Absent,
965 sum_value: Precision::Absent,
966 byte_size: Precision::Absent,
967 },],
968 }
969 );
970
971 Ok(())
972 }
973
974 fn batch(row_size: usize) -> RecordBatch {
975 let a: ArrayRef = Arc::new(Int32Array::from(vec![1; row_size]));
976 let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("foo"); row_size]));
977 let c: ArrayRef = Arc::new(Int64Array::from_iter(vec![1; row_size]));
978 RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
979 }
980
981 fn schema() -> SchemaRef {
982 batch(1).schema()
983 }
984
985 fn memorysrcconfig_no_partitions(
986 sort_information: Vec<LexOrdering>,
987 ) -> Result<MemorySourceConfig> {
988 let partitions = vec![];
989 MemorySourceConfig::try_new(&partitions, schema(), None)?
990 .try_with_sort_information(sort_information)
991 }
992
993 fn memorysrcconfig_1_partition_1_batch(
994 sort_information: Vec<LexOrdering>,
995 ) -> Result<MemorySourceConfig> {
996 let partitions = vec![vec![batch(100)]];
997 MemorySourceConfig::try_new(&partitions, schema(), None)?
998 .try_with_sort_information(sort_information)
999 }
1000
1001 fn memorysrcconfig_3_partitions_1_batch_each(
1002 sort_information: Vec<LexOrdering>,
1003 ) -> Result<MemorySourceConfig> {
1004 let partitions = vec![vec![batch(100)], vec![batch(100)], vec![batch(100)]];
1005 MemorySourceConfig::try_new(&partitions, schema(), None)?
1006 .try_with_sort_information(sort_information)
1007 }
1008
1009 fn memorysrcconfig_3_partitions_with_2_batches_each(
1010 sort_information: Vec<LexOrdering>,
1011 ) -> Result<MemorySourceConfig> {
1012 let partitions = vec![
1013 vec![batch(100), batch(100)],
1014 vec![batch(100), batch(100)],
1015 vec![batch(100), batch(100)],
1016 ];
1017 MemorySourceConfig::try_new(&partitions, schema(), None)?
1018 .try_with_sort_information(sort_information)
1019 }
1020
1021 fn memorysrcconfig_1_partition_with_different_sized_batches(
1024 sort_information: Vec<LexOrdering>,
1025 ) -> Result<MemorySourceConfig> {
1026 let partitions = vec![vec![batch(100_000), batch(10_000), batch(100), batch(1)]];
1027 MemorySourceConfig::try_new(&partitions, schema(), None)?
1028 .try_with_sort_information(sort_information)
1029 }
1030
1031 fn memorysrcconfig_1_partition_with_ordering_not_matching_size(
1035 sort_information: Vec<LexOrdering>,
1036 ) -> Result<MemorySourceConfig> {
1037 let partitions = vec![vec![batch(100_000), batch(1), batch(100), batch(10_000)]];
1038 MemorySourceConfig::try_new(&partitions, schema(), None)?
1039 .try_with_sort_information(sort_information)
1040 }
1041
1042 fn memorysrcconfig_2_partition_with_different_sized_batches(
1043 sort_information: Vec<LexOrdering>,
1044 ) -> Result<MemorySourceConfig> {
1045 let partitions = vec![
1046 vec![batch(100_000), batch(10_000), batch(1_000)],
1047 vec![batch(2_000), batch(20)],
1048 ];
1049 MemorySourceConfig::try_new(&partitions, schema(), None)?
1050 .try_with_sort_information(sort_information)
1051 }
1052
1053 fn memorysrcconfig_2_partition_with_extreme_sized_batches(
1054 sort_information: Vec<LexOrdering>,
1055 ) -> Result<MemorySourceConfig> {
1056 let partitions = vec![
1057 vec![
1058 batch(100_000),
1059 batch(1),
1060 batch(1),
1061 batch(1),
1062 batch(1),
1063 batch(0),
1064 ],
1065 vec![batch(1), batch(1), batch(1), batch(1), batch(0), batch(100)],
1066 ];
1067 MemorySourceConfig::try_new(&partitions, schema(), None)?
1068 .try_with_sort_information(sort_information)
1069 }
1070
1071 fn assert_partitioning(
1075 partitioned_datasrc: Option<Arc<dyn DataSource>>,
1076 partition_cnt: Option<usize>,
1077 ) {
1078 let should_exist = if let Some(partition_cnt) = partition_cnt {
1079 format!("new datasource should exist and have {partition_cnt:?} partitions")
1080 } else {
1081 "new datasource should not exist".into()
1082 };
1083
1084 let actual = partitioned_datasrc
1085 .map(|datasrc| datasrc.output_partitioning().partition_count());
1086 assert_eq!(
1087 actual, partition_cnt,
1088 "partitioned datasrc does not match expected, we expected {should_exist}, instead found {actual:?}"
1089 );
1090 }
1091
1092 fn run_all_test_scenarios(
1093 output_ordering: Option<LexOrdering>,
1094 sort_information_on_config: Vec<LexOrdering>,
1095 ) -> Result<()> {
1096 let not_used = usize::MAX;
1097
1098 let mem_src_config =
1100 memorysrcconfig_no_partitions(sort_information_on_config.clone())?;
1101 let partitioned_datasrc =
1102 mem_src_config.repartitioned(1, not_used, output_ordering.clone())?;
1103 assert_partitioning(partitioned_datasrc, None);
1104
1105 let target_partitions = 1;
1107 let mem_src_config =
1108 memorysrcconfig_1_partition_1_batch(sort_information_on_config.clone())?;
1109 let partitioned_datasrc = mem_src_config.repartitioned(
1110 target_partitions,
1111 not_used,
1112 output_ordering.clone(),
1113 )?;
1114 assert_partitioning(partitioned_datasrc, None);
1115
1116 let target_partitions = 3;
1118 let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
1119 sort_information_on_config.clone(),
1120 )?;
1121 let partitioned_datasrc = mem_src_config.repartitioned(
1122 target_partitions,
1123 not_used,
1124 output_ordering.clone(),
1125 )?;
1126 assert_partitioning(partitioned_datasrc, None);
1127
1128 let target_partitions = 2;
1130 let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
1131 sort_information_on_config.clone(),
1132 )?;
1133 let partitioned_datasrc = mem_src_config.repartitioned(
1134 target_partitions,
1135 not_used,
1136 output_ordering.clone(),
1137 )?;
1138 assert_partitioning(partitioned_datasrc, None);
1139
1140 let target_partitions = 4;
1142 let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
1143 sort_information_on_config.clone(),
1144 )?;
1145 let partitioned_datasrc = mem_src_config.repartitioned(
1146 target_partitions,
1147 not_used,
1148 output_ordering.clone(),
1149 )?;
1150 assert_partitioning(partitioned_datasrc, None);
1151
1152 let target_partitions = 5;
1155 let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
1156 sort_information_on_config.clone(),
1157 )?;
1158 let partitioned_datasrc = mem_src_config.repartitioned(
1159 target_partitions,
1160 not_used,
1161 output_ordering.clone(),
1162 )?;
1163 assert_partitioning(partitioned_datasrc, Some(5));
1164
1165 let target_partitions = 6;
1168 let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
1169 sort_information_on_config.clone(),
1170 )?;
1171 let partitioned_datasrc = mem_src_config.repartitioned(
1172 target_partitions,
1173 not_used,
1174 output_ordering.clone(),
1175 )?;
1176 assert_partitioning(partitioned_datasrc, Some(6));
1177
1178 let target_partitions = 3 * 2 + 1;
1180 let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
1181 sort_information_on_config.clone(),
1182 )?;
1183 let partitioned_datasrc = mem_src_config.repartitioned(
1184 target_partitions,
1185 not_used,
1186 output_ordering.clone(),
1187 )?;
1188 assert_partitioning(partitioned_datasrc, None);
1189
1190 let target_partitions = 2;
1193 let mem_src_config = memorysrcconfig_1_partition_with_different_sized_batches(
1194 sort_information_on_config,
1195 )?;
1196 let partitioned_datasrc = mem_src_config.clone().repartitioned(
1197 target_partitions,
1198 not_used,
1199 output_ordering,
1200 )?;
1201 assert_partitioning(partitioned_datasrc.clone(), Some(2));
1202 let partitioned_datasrc = partitioned_datasrc.unwrap();
1205 let Some(mem_src_config) = partitioned_datasrc
1206 .as_any()
1207 .downcast_ref::<MemorySourceConfig>()
1208 else {
1209 unreachable!()
1210 };
1211 let repartitioned_raw_batches = mem_src_config.partitions.clone();
1212 assert_eq!(repartitioned_raw_batches.len(), 2);
1213 let [ref p1, ref p2] = repartitioned_raw_batches[..] else {
1214 unreachable!()
1215 };
1216 assert_eq!(p1.len(), 1);
1218 assert_eq!(p1[0].num_rows(), 100_000);
1219 assert_eq!(p2.len(), 3);
1221 assert_eq!(p2[0].num_rows(), 10_000);
1222 assert_eq!(p2[1].num_rows(), 100);
1223 assert_eq!(p2[2].num_rows(), 1);
1224
1225 Ok(())
1226 }
1227
1228 #[test]
1229 fn test_repartition_no_sort_information_no_output_ordering() -> Result<()> {
1230 let no_sort = vec![];
1231 let no_output_ordering = None;
1232
1233 run_all_test_scenarios(no_output_ordering.clone(), no_sort.clone())?;
1235
1236 let target_partitions = 3;
1240 let mem_src_config =
1241 memorysrcconfig_2_partition_with_different_sized_batches(no_sort)?;
1242 let partitioned_datasrc = mem_src_config.clone().repartitioned(
1243 target_partitions,
1244 usize::MAX,
1245 no_output_ordering,
1246 )?;
1247 assert_partitioning(partitioned_datasrc.clone(), Some(3));
1248 let repartitioned_raw_batches = mem_src_config
1251 .repartition_evenly_by_size(target_partitions)?
1252 .unwrap();
1253 assert_eq!(repartitioned_raw_batches.len(), 3);
1254 let [ref p1, ref p2, ref p3] = repartitioned_raw_batches[..] else {
1255 unreachable!()
1256 };
1257 assert_eq!(p1.len(), 1);
1259 assert_eq!(p1[0].num_rows(), 100_000);
1260 assert_eq!(p2.len(), 1);
1262 assert_eq!(p2[0].num_rows(), 10_000);
1263 assert_eq!(p3.len(), 3);
1265 assert_eq!(p3[0].num_rows(), 2_000);
1266 assert_eq!(p3[1].num_rows(), 1_000);
1267 assert_eq!(p3[2].num_rows(), 20);
1268
1269 Ok(())
1270 }
1271
1272 #[test]
1273 fn test_repartition_no_sort_information_no_output_ordering_lopsized_batches()
1274 -> Result<()> {
1275 let no_sort = vec![];
1276 let no_output_ordering = None;
1277
1278 let target_partitions = 5;
1289 let mem_src_config =
1290 memorysrcconfig_2_partition_with_extreme_sized_batches(no_sort)?;
1291 let partitioned_datasrc = mem_src_config.clone().repartitioned(
1292 target_partitions,
1293 usize::MAX,
1294 no_output_ordering,
1295 )?;
1296 assert_partitioning(partitioned_datasrc.clone(), Some(5));
1297 let repartitioned_raw_batches = mem_src_config
1301 .repartition_evenly_by_size(target_partitions)?
1302 .unwrap();
1303 assert_eq!(repartitioned_raw_batches.len(), 5);
1304 let [ref p1, ref p2, ref p3, ref p4, ref p5] = repartitioned_raw_batches[..]
1305 else {
1306 unreachable!()
1307 };
1308 assert_eq!(p1.len(), 1);
1310 assert_eq!(p1[0].num_rows(), 100_000);
1311 assert_eq!(p2.len(), 1);
1313 assert_eq!(p2[0].num_rows(), 100);
1314 assert_eq!(p3.len(), 3);
1316 assert_eq!(p3[0].num_rows(), 1);
1317 assert_eq!(p3[1].num_rows(), 1);
1318 assert_eq!(p3[2].num_rows(), 1);
1319 assert_eq!(p4.len(), 3);
1321 assert_eq!(p4[0].num_rows(), 1);
1322 assert_eq!(p4[1].num_rows(), 1);
1323 assert_eq!(p4[2].num_rows(), 1);
1324 assert_eq!(p5.len(), 4);
1326 assert_eq!(p5[0].num_rows(), 1);
1327 assert_eq!(p5[1].num_rows(), 1);
1328 assert_eq!(p5[2].num_rows(), 0);
1329 assert_eq!(p5[3].num_rows(), 0);
1330
1331 Ok(())
1332 }
1333
1334 #[test]
1335 fn test_repartition_with_sort_information() -> Result<()> {
1336 let schema = schema();
1337 let sort_key: LexOrdering =
1338 [PhysicalSortExpr::new_default(col("c", &schema)?)].into();
1339 let has_sort = vec![sort_key.clone()];
1340 let output_ordering = Some(sort_key);
1341
1342 run_all_test_scenarios(output_ordering.clone(), has_sort.clone())?;
1344
1345 let target_partitions = 3;
1347 let mem_src_config =
1348 memorysrcconfig_2_partition_with_different_sized_batches(has_sort)?;
1349 let partitioned_datasrc = mem_src_config.clone().repartitioned(
1350 target_partitions,
1351 usize::MAX,
1352 output_ordering.clone(),
1353 )?;
1354 assert_partitioning(partitioned_datasrc.clone(), Some(3));
1355 let Some(output_ord) = output_ordering else {
1358 unreachable!()
1359 };
1360 let repartitioned_raw_batches = mem_src_config
1361 .repartition_preserving_order(target_partitions, output_ord)?
1362 .unwrap();
1363 assert_eq!(repartitioned_raw_batches.len(), 3);
1364 let [ref p1, ref p2, ref p3] = repartitioned_raw_batches[..] else {
1365 unreachable!()
1366 };
1367 assert_eq!(p1.len(), 1);
1369 assert_eq!(p1[0].num_rows(), 100_000);
1370 assert_eq!(p2.len(), 2);
1372 assert_eq!(p2[0].num_rows(), 10_000);
1373 assert_eq!(p2[1].num_rows(), 1_000);
1374 assert_eq!(p3.len(), 2);
1376 assert_eq!(p3[0].num_rows(), 2_000);
1377 assert_eq!(p3[1].num_rows(), 20);
1378
1379 Ok(())
1380 }
1381
1382 #[test]
1383 fn test_repartition_with_batch_ordering_not_matching_sizing() -> Result<()> {
1384 let schema = schema();
1385 let sort_key: LexOrdering =
1386 [PhysicalSortExpr::new_default(col("c", &schema)?)].into();
1387 let has_sort = vec![sort_key.clone()];
1388 let output_ordering = Some(sort_key);
1389
1390 let target_partitions = 2;
1393 let mem_src_config =
1394 memorysrcconfig_1_partition_with_ordering_not_matching_size(has_sort)?;
1395 let partitioned_datasrc = mem_src_config.clone().repartitioned(
1396 target_partitions,
1397 usize::MAX,
1398 output_ordering,
1399 )?;
1400 assert_partitioning(partitioned_datasrc.clone(), Some(2));
1401 let partitioned_datasrc = partitioned_datasrc.unwrap();
1404 let Some(mem_src_config) = partitioned_datasrc
1405 .as_any()
1406 .downcast_ref::<MemorySourceConfig>()
1407 else {
1408 unreachable!()
1409 };
1410 let repartitioned_raw_batches = mem_src_config.partitions.clone();
1411 assert_eq!(repartitioned_raw_batches.len(), 2);
1412 let [ref p1, ref p2] = repartitioned_raw_batches[..] else {
1413 unreachable!()
1414 };
1415 assert_eq!(p1.len(), 1);
1417 assert_eq!(p1[0].num_rows(), 100_000);
1418 assert_eq!(p2.len(), 3);
1420 assert_eq!(p2[0].num_rows(), 1);
1421 assert_eq!(p2[1].num_rows(), 100);
1422 assert_eq!(p2[2].num_rows(), 10_000);
1423
1424 Ok(())
1425 }
1426}