1use std::cmp::Ordering;
19use std::collections::BinaryHeap;
20use std::fmt;
21use std::fmt::Debug;
22use std::ops::Deref;
23use std::slice::from_ref;
24use std::sync::Arc;
25
26use crate::sink::DataSink;
27use crate::source::{DataSource, DataSourceExec};
28
29use arrow::array::{RecordBatch, RecordBatchOptions};
30use arrow::datatypes::{Schema, SchemaRef};
31use datafusion_common::{
32 Result, ScalarValue, assert_or_internal_err, plan_err, project_schema,
33};
34use datafusion_execution::TaskContext;
35use datafusion_physical_expr::equivalence::project_orderings;
36use datafusion_physical_expr::projection::ProjectionExprs;
37use datafusion_physical_expr::utils::collect_columns;
38use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
39use datafusion_physical_plan::memory::MemoryStream;
40use datafusion_physical_plan::projection::{
41 all_alias_free_columns, new_projections_for_columns,
42};
43use datafusion_physical_plan::{
44 ColumnarValue, DisplayAs, DisplayFormatType, Partitioning, PhysicalExpr,
45 SendableRecordBatchStream, Statistics, common,
46};
47
48use async_trait::async_trait;
49use datafusion_physical_plan::coop::cooperative;
50use datafusion_physical_plan::execution_plan::SchedulingType;
51use futures::StreamExt;
52use itertools::Itertools;
53use tokio::sync::RwLock;
54
55#[derive(Clone, Debug)]
57pub struct MemorySourceConfig {
58 partitions: Vec<Vec<RecordBatch>>,
62 schema: SchemaRef,
64 projected_schema: SchemaRef,
66 projection: Option<Vec<usize>>,
68 sort_information: Vec<LexOrdering>,
70 show_sizes: bool,
72 fetch: Option<usize>,
75}
76
77impl DataSource for MemorySourceConfig {
78 fn open(
79 &self,
80 partition: usize,
81 _context: Arc<TaskContext>,
82 ) -> Result<SendableRecordBatchStream> {
83 Ok(Box::pin(cooperative(
84 MemoryStream::try_new(
85 self.partitions[partition].clone(),
86 Arc::clone(&self.projected_schema),
87 self.projection.clone(),
88 )?
89 .with_fetch(self.fetch),
90 )))
91 }
92
93 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
94 match t {
95 DisplayFormatType::Default | DisplayFormatType::Verbose => {
96 let partition_sizes: Vec<_> =
97 self.partitions.iter().map(|b| b.len()).collect();
98
99 let output_ordering = self
100 .sort_information
101 .first()
102 .map(|output_ordering| format!(", output_ordering={output_ordering}"))
103 .unwrap_or_default();
104
105 let eq_properties = self.eq_properties();
106 let constraints = eq_properties.constraints();
107 let constraints = if constraints.is_empty() {
108 String::new()
109 } else {
110 format!(", {constraints}")
111 };
112
113 let limit = self
114 .fetch
115 .map_or(String::new(), |limit| format!(", fetch={limit}"));
116 if self.show_sizes {
117 write!(
118 f,
119 "partitions={}, partition_sizes={partition_sizes:?}{limit}{output_ordering}{constraints}",
120 partition_sizes.len(),
121 )
122 } else {
123 write!(
124 f,
125 "partitions={}{limit}{output_ordering}{constraints}",
126 partition_sizes.len(),
127 )
128 }
129 }
130 DisplayFormatType::TreeRender => {
131 let total_rows = self.partitions.iter().map(|b| b.len()).sum::<usize>();
132 let total_bytes: usize = self
133 .partitions
134 .iter()
135 .flatten()
136 .map(|batch| batch.get_array_memory_size())
137 .sum();
138 writeln!(f, "format=memory")?;
139 writeln!(f, "rows={total_rows}")?;
140 writeln!(f, "bytes={total_bytes}")?;
141 Ok(())
142 }
143 }
144 }
145
146 fn repartitioned(
151 &self,
152 target_partitions: usize,
153 _repartition_file_min_size: usize,
154 output_ordering: Option<LexOrdering>,
155 ) -> Result<Option<Arc<dyn DataSource>>> {
156 if self.partitions.is_empty() || self.partitions.len() >= target_partitions
157 {
159 return Ok(None);
160 }
161
162 let maybe_repartitioned = if let Some(output_ordering) = output_ordering {
163 self.repartition_preserving_order(target_partitions, output_ordering)?
164 } else {
165 self.repartition_evenly_by_size(target_partitions)?
166 };
167
168 if let Some(repartitioned) = maybe_repartitioned {
169 Ok(Some(Arc::new(Self::try_new(
170 &repartitioned,
171 self.original_schema(),
172 self.projection.clone(),
173 )?)))
174 } else {
175 Ok(None)
176 }
177 }
178
179 fn output_partitioning(&self) -> Partitioning {
180 Partitioning::UnknownPartitioning(self.partitions.len())
181 }
182
183 fn eq_properties(&self) -> EquivalenceProperties {
184 EquivalenceProperties::new_with_orderings(
185 Arc::clone(&self.projected_schema),
186 self.sort_information.clone(),
187 )
188 }
189
190 fn scheduling_type(&self) -> SchedulingType {
191 SchedulingType::Cooperative
192 }
193
194 fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
195 if let Some(partition) = partition {
196 if let Some(batches) = self.partitions.get(partition) {
198 Ok(Arc::new(common::compute_record_batch_statistics(
199 from_ref(batches),
200 &self.schema,
201 self.projection.clone(),
202 )))
203 } else {
204 Ok(Arc::new(Statistics::new_unknown(&self.projected_schema)))
206 }
207 } else {
208 Ok(Arc::new(common::compute_record_batch_statistics(
210 &self.partitions,
211 &self.schema,
212 self.projection.clone(),
213 )))
214 }
215 }
216
217 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
218 let source = self.clone();
219 Some(Arc::new(source.with_limit(limit)))
220 }
221
222 fn fetch(&self) -> Option<usize> {
223 self.fetch
224 }
225
226 fn try_swapping_with_projection(
227 &self,
228 projection: &ProjectionExprs,
229 ) -> Result<Option<Arc<dyn DataSource>>> {
230 let exprs = projection.iter().cloned().collect_vec();
233 all_alias_free_columns(exprs.as_slice())
234 .then(|| {
235 let all_projections = (0..self.schema.fields().len()).collect();
236 let new_projections = new_projections_for_columns(
237 &exprs,
238 self.projection().as_ref().unwrap_or(&all_projections),
239 );
240 let projected_schema =
241 project_schema(&self.schema, Some(&new_projections));
242
243 projected_schema.map(|projected_schema| {
244 let mut new_source = self.clone();
247 new_source.projection = Some(new_projections);
248 new_source.projected_schema = projected_schema;
249 new_source.sort_information = project_orderings(
251 &new_source.sort_information,
252 &new_source.projected_schema,
253 );
254 Arc::new(new_source) as Arc<dyn DataSource>
255 })
256 })
257 .transpose()
258 }
259}
260
261impl MemorySourceConfig {
262 pub fn try_new(
265 partitions: &[Vec<RecordBatch>],
266 schema: SchemaRef,
267 projection: Option<Vec<usize>>,
268 ) -> Result<Self> {
269 let projected_schema = project_schema(&schema, projection.as_ref())?;
270 Ok(Self {
271 partitions: partitions.to_vec(),
272 schema,
273 projected_schema,
274 projection,
275 sort_information: vec![],
276 show_sizes: true,
277 fetch: None,
278 })
279 }
280
281 pub fn try_new_exec(
284 partitions: &[Vec<RecordBatch>],
285 schema: SchemaRef,
286 projection: Option<Vec<usize>>,
287 ) -> Result<Arc<DataSourceExec>> {
288 let source = Self::try_new(partitions, schema, projection)?;
289 Ok(DataSourceExec::from_data_source(source))
290 }
291
292 #[expect(clippy::needless_pass_by_value)]
294 pub fn try_new_as_values(
295 schema: SchemaRef,
296 data: Vec<Vec<Arc<dyn PhysicalExpr>>>,
297 ) -> Result<Arc<DataSourceExec>> {
298 if data.is_empty() {
299 return plan_err!("Values list cannot be empty");
300 }
301
302 let n_row = data.len();
303 let n_col = schema.fields().len();
304
305 let placeholder_schema = Arc::new(Schema::empty());
308 let placeholder_batch = RecordBatch::try_new_with_options(
309 Arc::clone(&placeholder_schema),
310 vec![],
311 &RecordBatchOptions::new().with_row_count(Some(1)),
312 )?;
313
314 let arrays = (0..n_col)
316 .map(|j| {
317 (0..n_row)
318 .map(|i| {
319 let expr = &data[i][j];
320 let result = expr.evaluate(&placeholder_batch)?;
321
322 match result {
323 ColumnarValue::Scalar(scalar) => Ok(scalar),
324 ColumnarValue::Array(array) if array.len() == 1 => {
325 ScalarValue::try_from_array(&array, 0)
326 }
327 ColumnarValue::Array(_) => {
328 plan_err!("Cannot have array values in a values list")
329 }
330 }
331 })
332 .collect::<Result<Vec<_>>>()
333 .and_then(ScalarValue::iter_to_array)
334 })
335 .collect::<Result<Vec<_>>>()?;
336
337 let batch = RecordBatch::try_new_with_options(
338 Arc::clone(&schema),
339 arrays,
340 &RecordBatchOptions::new().with_row_count(Some(n_row)),
341 )?;
342
343 let partitions = vec![batch];
344 Self::try_new_from_batches(Arc::clone(&schema), partitions)
345 }
346
347 #[expect(clippy::needless_pass_by_value)]
352 pub fn try_new_from_batches(
353 schema: SchemaRef,
354 batches: Vec<RecordBatch>,
355 ) -> Result<Arc<DataSourceExec>> {
356 if batches.is_empty() {
357 return plan_err!("Values list cannot be empty");
358 }
359
360 for batch in &batches {
361 let batch_schema = batch.schema();
362 if batch_schema != schema {
363 return plan_err!(
364 "Batch has invalid schema. Expected: {}, got: {}",
365 schema,
366 batch_schema
367 );
368 }
369 }
370
371 let partitions = vec![batches];
372 let source = Self {
373 partitions,
374 schema: Arc::clone(&schema),
375 projected_schema: Arc::clone(&schema),
376 projection: None,
377 sort_information: vec![],
378 show_sizes: true,
379 fetch: None,
380 };
381 Ok(DataSourceExec::from_data_source(source))
382 }
383
384 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
386 self.fetch = limit;
387 self
388 }
389
390 pub fn with_show_sizes(mut self, show_sizes: bool) -> Self {
392 self.show_sizes = show_sizes;
393 self
394 }
395
396 pub fn partitions(&self) -> &[Vec<RecordBatch>] {
398 &self.partitions
399 }
400
401 pub fn projection(&self) -> &Option<Vec<usize>> {
403 &self.projection
404 }
405
406 pub fn show_sizes(&self) -> bool {
408 self.show_sizes
409 }
410
411 pub fn sort_information(&self) -> &[LexOrdering] {
413 &self.sort_information
414 }
415
416 pub fn try_with_sort_information(
436 mut self,
437 mut sort_information: Vec<LexOrdering>,
438 ) -> Result<Self> {
439 let fields = self.schema.fields();
441 let ambiguous_column = sort_information
442 .iter()
443 .flat_map(|ordering| ordering.clone())
444 .flat_map(|expr| collect_columns(&expr.expr))
445 .find(|col| {
446 fields
447 .get(col.index())
448 .map(|field| field.name() != col.name())
449 .unwrap_or(true)
450 });
451 assert_or_internal_err!(
452 ambiguous_column.is_none(),
453 "Column {:?} is not found in the original schema of the MemorySourceConfig",
454 ambiguous_column.as_ref().unwrap()
455 );
456
457 if self.projection.is_some() {
459 sort_information =
460 project_orderings(&sort_information, &self.projected_schema);
461 }
462
463 self.sort_information = sort_information;
464 Ok(self)
465 }
466
467 pub fn original_schema(&self) -> SchemaRef {
469 Arc::clone(&self.schema)
470 }
471
472 fn repartition_preserving_order(
478 &self,
479 target_partitions: usize,
480 output_ordering: LexOrdering,
481 ) -> Result<Option<Vec<Vec<RecordBatch>>>> {
482 if !self.eq_properties().ordering_satisfy(output_ordering)? {
483 Ok(None)
484 } else {
485 let total_num_batches =
486 self.partitions.iter().map(|b| b.len()).sum::<usize>();
487 if total_num_batches < target_partitions {
488 return Ok(None);
490 }
491
492 let cnt_to_repartition = target_partitions - self.partitions.len();
493
494 let to_repartition = self
497 .partitions
498 .iter()
499 .enumerate()
500 .map(|(idx, batches)| RePartition {
501 idx: idx + (cnt_to_repartition * idx), row_count: batches.iter().map(|batch| batch.num_rows()).sum(),
503 batches: batches.clone(),
504 })
505 .collect_vec();
506
507 let mut max_heap = BinaryHeap::with_capacity(target_partitions);
510 for rep in to_repartition {
511 max_heap.push(CompareByRowCount(rep));
512 }
513
514 let mut cannot_split_further = Vec::with_capacity(target_partitions);
517 for _ in 0..cnt_to_repartition {
518 loop {
520 let Some(to_split) = max_heap.pop() else {
522 break;
524 };
525
526 let mut new_partitions = to_split.into_inner().split();
528 if new_partitions.len() > 1 {
529 for new_partition in new_partitions {
530 max_heap.push(CompareByRowCount(new_partition));
531 }
532 break;
534 } else {
535 cannot_split_further.push(new_partitions.remove(0));
536 }
537 }
538 }
539 let mut partitions = max_heap
540 .drain()
541 .map(CompareByRowCount::into_inner)
542 .collect_vec();
543 partitions.extend(cannot_split_further);
544
545 partitions.sort_by_key(|p| p.idx);
548 let partitions = partitions.into_iter().map(|rep| rep.batches).collect_vec();
549
550 Ok(Some(partitions))
551 }
552 }
553
554 fn repartition_evenly_by_size(
563 &self,
564 target_partitions: usize,
565 ) -> Result<Option<Vec<Vec<RecordBatch>>>> {
566 let mut flatten_batches =
568 self.partitions.clone().into_iter().flatten().collect_vec();
569 if flatten_batches.len() < target_partitions {
570 return Ok(None);
571 }
572
573 let total_num_rows = flatten_batches.iter().map(|b| b.num_rows()).sum::<usize>();
575 flatten_batches.sort_by_key(|b| std::cmp::Reverse(b.num_rows()));
577
578 let mut partitions =
580 vec![Vec::with_capacity(flatten_batches.len()); target_partitions];
581 let mut target_partition_size = total_num_rows.div_ceil(target_partitions);
582 let mut total_rows_seen = 0;
583 let mut curr_bin_row_count = 0;
584 let mut idx = 0;
585 for batch in flatten_batches {
586 let row_cnt = batch.num_rows();
587 idx = std::cmp::min(idx, target_partitions - 1);
588
589 partitions[idx].push(batch);
590 curr_bin_row_count += row_cnt;
591 total_rows_seen += row_cnt;
592
593 if curr_bin_row_count >= target_partition_size {
594 idx += 1;
595 curr_bin_row_count = 0;
596
597 if total_rows_seen < total_num_rows {
600 target_partition_size = (total_num_rows - total_rows_seen)
601 .div_ceil(target_partitions - idx);
602 }
603 }
604 }
605
606 Ok(Some(partitions))
607 }
608}
609
610struct RePartition {
614 idx: usize,
616 row_count: usize,
619 batches: Vec<RecordBatch>,
621}
622
623impl RePartition {
624 fn split(self) -> Vec<Self> {
628 if self.batches.len() == 1 {
629 return vec![self];
630 }
631
632 let new_0 = RePartition {
633 idx: self.idx, row_count: 0,
635 batches: vec![],
636 };
637 let new_1 = RePartition {
638 idx: self.idx + 1, row_count: 0,
640 batches: vec![],
641 };
642 let split_pt = self.row_count / 2;
643
644 let [new_0, new_1] = self.batches.into_iter().fold(
645 [new_0, new_1],
646 |[mut new0, mut new1], batch| {
647 if new0.row_count < split_pt {
648 new0.add_batch(batch);
649 } else {
650 new1.add_batch(batch);
651 }
652 [new0, new1]
653 },
654 );
655 vec![new_0, new_1]
656 }
657
658 fn add_batch(&mut self, batch: RecordBatch) {
659 self.row_count += batch.num_rows();
660 self.batches.push(batch);
661 }
662}
663
664impl fmt::Display for RePartition {
665 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
666 write!(
667 f,
668 "{}rows-in-{}batches@{}",
669 self.row_count,
670 self.batches.len(),
671 self.idx
672 )
673 }
674}
675
676struct CompareByRowCount(RePartition);
677impl CompareByRowCount {
678 fn into_inner(self) -> RePartition {
679 self.0
680 }
681}
682impl Ord for CompareByRowCount {
683 fn cmp(&self, other: &Self) -> Ordering {
684 self.0.row_count.cmp(&other.0.row_count)
685 }
686}
687impl PartialOrd for CompareByRowCount {
688 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
689 Some(self.cmp(other))
690 }
691}
692impl PartialEq for CompareByRowCount {
693 fn eq(&self, other: &Self) -> bool {
694 self.cmp(other) == Ordering::Equal
696 }
697}
698impl Eq for CompareByRowCount {}
699impl Deref for CompareByRowCount {
700 type Target = RePartition;
701 fn deref(&self) -> &Self::Target {
702 &self.0
703 }
704}
705
706pub type PartitionData = Arc<RwLock<Vec<RecordBatch>>>;
708
709pub struct MemSink {
713 batches: Vec<PartitionData>,
715 schema: SchemaRef,
716}
717
718impl Debug for MemSink {
719 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
720 f.debug_struct("MemSink")
721 .field("num_partitions", &self.batches.len())
722 .finish()
723 }
724}
725
726impl DisplayAs for MemSink {
727 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
728 match t {
729 DisplayFormatType::Default | DisplayFormatType::Verbose => {
730 let partition_count = self.batches.len();
731 write!(f, "MemoryTable (partitions={partition_count})")
732 }
733 DisplayFormatType::TreeRender => {
734 write!(f, "")
736 }
737 }
738 }
739}
740
741impl MemSink {
742 pub fn try_new(batches: Vec<PartitionData>, schema: SchemaRef) -> Result<Self> {
746 if batches.is_empty() {
747 return plan_err!("Cannot insert into MemTable with zero partitions");
748 }
749 Ok(Self { batches, schema })
750 }
751}
752
753#[async_trait]
754impl DataSink for MemSink {
755 fn schema(&self) -> &SchemaRef {
756 &self.schema
757 }
758
759 async fn write_all(
760 &self,
761 mut data: SendableRecordBatchStream,
762 _context: &Arc<TaskContext>,
763 ) -> Result<u64> {
764 let num_partitions = self.batches.len();
765
766 let mut new_batches = vec![vec![]; num_partitions];
769 let mut i = 0;
770 let mut row_count = 0;
771 while let Some(batch) = data.next().await.transpose()? {
772 row_count += batch.num_rows();
773 new_batches[i].push(batch);
774 i = (i + 1) % num_partitions;
775 }
776
777 for (target, mut batches) in self.batches.iter().zip(new_batches) {
779 target.write().await.append(&mut batches);
781 }
782
783 Ok(row_count as u64)
784 }
785}
786
787#[cfg(test)]
788mod memory_source_tests {
789 use std::sync::Arc;
790
791 use crate::memory::MemorySourceConfig;
792 use crate::source::DataSourceExec;
793
794 use arrow::compute::SortOptions;
795 use arrow::datatypes::{DataType, Field, Schema};
796 use datafusion_common::Result;
797 use datafusion_physical_expr::expressions::col;
798 use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
799 use datafusion_physical_plan::ExecutionPlan;
800
801 #[test]
802 fn test_memory_order_eq() -> Result<()> {
803 let schema = Arc::new(Schema::new(vec![
804 Field::new("a", DataType::Int64, false),
805 Field::new("b", DataType::Int64, false),
806 Field::new("c", DataType::Int64, false),
807 ]));
808 let sort1: LexOrdering = [
809 PhysicalSortExpr {
810 expr: col("a", &schema)?,
811 options: SortOptions::default(),
812 },
813 PhysicalSortExpr {
814 expr: col("b", &schema)?,
815 options: SortOptions::default(),
816 },
817 ]
818 .into();
819 let sort2: LexOrdering = [PhysicalSortExpr {
820 expr: col("c", &schema)?,
821 options: SortOptions::default(),
822 }]
823 .into();
824 let mut expected_output_order = sort1.clone();
825 expected_output_order.extend(sort2.clone());
826
827 let sort_information = vec![sort1.clone(), sort2.clone()];
828 let mem_exec = DataSourceExec::from_data_source(
829 MemorySourceConfig::try_new(&[vec![]], schema, None)?
830 .try_with_sort_information(sort_information)?,
831 );
832
833 assert_eq!(
834 mem_exec.properties().output_ordering().unwrap(),
835 &expected_output_order
836 );
837 let eq_properties = mem_exec.properties().equivalence_properties();
838 assert!(eq_properties.oeq_class().contains(&sort1));
839 assert!(eq_properties.oeq_class().contains(&sort2));
840 Ok(())
841 }
842}
843
844#[cfg(test)]
845mod tests {
846 use super::*;
847 use crate::test_util::col;
848 use crate::tests::{aggr_test_schema, make_partition};
849
850 use arrow::array::{ArrayRef, Int32Array, Int64Array, StringArray};
851 use arrow::datatypes::{DataType, Field};
852 use datafusion_common::assert_batches_eq;
853 use datafusion_common::stats::{ColumnStatistics, Precision};
854 use datafusion_physical_expr::PhysicalSortExpr;
855 use datafusion_physical_plan::expressions::lit;
856
857 use datafusion_physical_plan::ExecutionPlan;
858
859 #[tokio::test]
860 async fn exec_with_limit() -> Result<()> {
861 let task_ctx = Arc::new(TaskContext::default());
862 let batch = make_partition(7);
863 let schema = batch.schema();
864 let batches = vec![batch.clone(), batch];
865
866 let exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap();
867 assert_eq!(exec.fetch(), None);
868
869 let exec = exec.with_fetch(Some(4)).unwrap();
870 assert_eq!(exec.fetch(), Some(4));
871
872 let mut it = exec.execute(0, task_ctx)?;
873 let mut results = vec![];
874 while let Some(batch) = it.next().await {
875 results.push(batch?);
876 }
877
878 let expected = [
879 "+---+", "| i |", "+---+", "| 0 |", "| 1 |", "| 2 |", "| 3 |", "+---+",
880 ];
881 assert_batches_eq!(expected, &results);
882 Ok(())
883 }
884
885 #[test]
888 fn try_swapping_with_projection_preserves_fetch() {
889 use datafusion_physical_expr::projection::ProjectionExprs;
890
891 let schema = Arc::new(Schema::new(vec![
892 Field::new("a", DataType::Int32, false),
893 Field::new("b", DataType::Utf8, false),
894 Field::new("c", DataType::Int64, false),
895 ]));
896 let partitions: Vec<Vec<RecordBatch>> = vec![vec![batch(10)]];
897 let source = MemorySourceConfig::try_new(&partitions, schema.clone(), None)
898 .unwrap()
899 .with_limit(Some(5));
900
901 assert_eq!(source.fetch, Some(5));
902
903 let projection = ProjectionExprs::from_indices(&[2, 0], &schema);
905 let swapped = source
906 .try_swapping_with_projection(&projection)
907 .unwrap()
908 .unwrap();
909 let new_source = swapped.downcast_ref::<MemorySourceConfig>().unwrap();
910
911 assert_eq!(
912 new_source.fetch,
913 Some(5),
914 "fetch limit must be preserved after projection pushdown"
915 );
916 }
917
918 #[tokio::test]
919 async fn values_empty_case() -> Result<()> {
920 let schema = aggr_test_schema();
921 let empty = MemorySourceConfig::try_new_as_values(schema, vec![]);
922 assert!(empty.is_err());
923 Ok(())
924 }
925
926 #[test]
927 fn new_exec_with_batches() {
928 let batch = make_partition(7);
929 let schema = batch.schema();
930 let batches = vec![batch.clone(), batch];
931 let _exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap();
932 }
933
934 #[test]
935 fn new_exec_with_batches_empty() {
936 let batch = make_partition(7);
937 let schema = batch.schema();
938 let _ = MemorySourceConfig::try_new_from_batches(schema, Vec::new()).unwrap_err();
939 }
940
941 #[test]
942 fn new_exec_with_batches_invalid_schema() {
943 let batch = make_partition(7);
944 let batches = vec![batch.clone(), batch];
945
946 let invalid_schema = Arc::new(Schema::new(vec![
947 Field::new("col0", DataType::UInt32, false),
948 Field::new("col1", DataType::Utf8, false),
949 ]));
950 let _ = MemorySourceConfig::try_new_from_batches(invalid_schema, batches)
951 .unwrap_err();
952 }
953
954 #[test]
956 fn new_exec_with_non_nullable_schema() {
957 let schema = Arc::new(Schema::new(vec![Field::new(
958 "col0",
959 DataType::UInt32,
960 false,
961 )]));
962 let _ = MemorySourceConfig::try_new_as_values(
963 Arc::clone(&schema),
964 vec![vec![lit(1u32)]],
965 )
966 .unwrap();
967 let _ = MemorySourceConfig::try_new_as_values(
969 schema,
970 vec![vec![lit(ScalarValue::UInt32(None))]],
971 )
972 .unwrap_err();
973 }
974
975 #[test]
976 fn values_stats_with_nulls_only() -> Result<()> {
977 let data = vec![
978 vec![lit(ScalarValue::Null)],
979 vec![lit(ScalarValue::Null)],
980 vec![lit(ScalarValue::Null)],
981 ];
982 let rows = data.len();
983 let schema =
984 Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)]));
985 let values = MemorySourceConfig::try_new_as_values(schema, data)?;
986
987 assert_eq!(
988 *values.partition_statistics(None)?,
989 Statistics {
990 num_rows: Precision::Exact(rows),
991 total_byte_size: Precision::Exact(8), column_statistics: vec![ColumnStatistics {
993 null_count: Precision::Exact(rows), distinct_count: Precision::Absent,
995 max_value: Precision::Absent,
996 min_value: Precision::Absent,
997 sum_value: Precision::Absent,
998 byte_size: Precision::Absent,
999 },],
1000 }
1001 );
1002
1003 Ok(())
1004 }
1005
1006 fn batch(row_size: usize) -> RecordBatch {
1007 let a: ArrayRef = Arc::new(Int32Array::from(vec![1; row_size]));
1008 let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("foo"); row_size]));
1009 let c: ArrayRef = Arc::new(Int64Array::from_iter(vec![1; row_size]));
1010 RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
1011 }
1012
1013 fn schema() -> SchemaRef {
1014 batch(1).schema()
1015 }
1016
1017 fn memorysrcconfig_no_partitions(
1018 sort_information: Vec<LexOrdering>,
1019 ) -> Result<MemorySourceConfig> {
1020 let partitions = vec![];
1021 MemorySourceConfig::try_new(&partitions, schema(), None)?
1022 .try_with_sort_information(sort_information)
1023 }
1024
1025 fn memorysrcconfig_1_partition_1_batch(
1026 sort_information: Vec<LexOrdering>,
1027 ) -> Result<MemorySourceConfig> {
1028 let partitions = vec![vec![batch(100)]];
1029 MemorySourceConfig::try_new(&partitions, schema(), None)?
1030 .try_with_sort_information(sort_information)
1031 }
1032
1033 fn memorysrcconfig_3_partitions_1_batch_each(
1034 sort_information: Vec<LexOrdering>,
1035 ) -> Result<MemorySourceConfig> {
1036 let partitions = vec![vec![batch(100)], vec![batch(100)], vec![batch(100)]];
1037 MemorySourceConfig::try_new(&partitions, schema(), None)?
1038 .try_with_sort_information(sort_information)
1039 }
1040
1041 fn memorysrcconfig_3_partitions_with_2_batches_each(
1042 sort_information: Vec<LexOrdering>,
1043 ) -> Result<MemorySourceConfig> {
1044 let partitions = vec![
1045 vec![batch(100), batch(100)],
1046 vec![batch(100), batch(100)],
1047 vec![batch(100), batch(100)],
1048 ];
1049 MemorySourceConfig::try_new(&partitions, schema(), None)?
1050 .try_with_sort_information(sort_information)
1051 }
1052
1053 fn memorysrcconfig_1_partition_with_different_sized_batches(
1056 sort_information: Vec<LexOrdering>,
1057 ) -> Result<MemorySourceConfig> {
1058 let partitions = vec![vec![batch(100_000), batch(10_000), batch(100), batch(1)]];
1059 MemorySourceConfig::try_new(&partitions, schema(), None)?
1060 .try_with_sort_information(sort_information)
1061 }
1062
1063 fn memorysrcconfig_1_partition_with_ordering_not_matching_size(
1067 sort_information: Vec<LexOrdering>,
1068 ) -> Result<MemorySourceConfig> {
1069 let partitions = vec![vec![batch(100_000), batch(1), batch(100), batch(10_000)]];
1070 MemorySourceConfig::try_new(&partitions, schema(), None)?
1071 .try_with_sort_information(sort_information)
1072 }
1073
1074 fn memorysrcconfig_2_partition_with_different_sized_batches(
1075 sort_information: Vec<LexOrdering>,
1076 ) -> Result<MemorySourceConfig> {
1077 let partitions = vec![
1078 vec![batch(100_000), batch(10_000), batch(1_000)],
1079 vec![batch(2_000), batch(20)],
1080 ];
1081 MemorySourceConfig::try_new(&partitions, schema(), None)?
1082 .try_with_sort_information(sort_information)
1083 }
1084
1085 fn memorysrcconfig_2_partition_with_extreme_sized_batches(
1086 sort_information: Vec<LexOrdering>,
1087 ) -> Result<MemorySourceConfig> {
1088 let partitions = vec![
1089 vec![
1090 batch(100_000),
1091 batch(1),
1092 batch(1),
1093 batch(1),
1094 batch(1),
1095 batch(0),
1096 ],
1097 vec![batch(1), batch(1), batch(1), batch(1), batch(0), batch(100)],
1098 ];
1099 MemorySourceConfig::try_new(&partitions, schema(), None)?
1100 .try_with_sort_information(sort_information)
1101 }
1102
1103 fn assert_partitioning(
1107 partitioned_datasrc: Option<Arc<dyn DataSource>>,
1108 partition_cnt: Option<usize>,
1109 ) {
1110 let should_exist = if let Some(partition_cnt) = partition_cnt {
1111 format!("new datasource should exist and have {partition_cnt:?} partitions")
1112 } else {
1113 "new datasource should not exist".into()
1114 };
1115
1116 let actual = partitioned_datasrc
1117 .map(|datasrc| datasrc.output_partitioning().partition_count());
1118 assert_eq!(
1119 actual, partition_cnt,
1120 "partitioned datasrc does not match expected, we expected {should_exist}, instead found {actual:?}"
1121 );
1122 }
1123
1124 fn run_all_test_scenarios(
1125 output_ordering: Option<LexOrdering>,
1126 sort_information_on_config: Vec<LexOrdering>,
1127 ) -> Result<()> {
1128 let not_used = usize::MAX;
1129
1130 let mem_src_config =
1132 memorysrcconfig_no_partitions(sort_information_on_config.clone())?;
1133 let partitioned_datasrc =
1134 mem_src_config.repartitioned(1, not_used, output_ordering.clone())?;
1135 assert_partitioning(partitioned_datasrc, None);
1136
1137 let target_partitions = 1;
1139 let mem_src_config =
1140 memorysrcconfig_1_partition_1_batch(sort_information_on_config.clone())?;
1141 let partitioned_datasrc = mem_src_config.repartitioned(
1142 target_partitions,
1143 not_used,
1144 output_ordering.clone(),
1145 )?;
1146 assert_partitioning(partitioned_datasrc, None);
1147
1148 let target_partitions = 3;
1150 let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
1151 sort_information_on_config.clone(),
1152 )?;
1153 let partitioned_datasrc = mem_src_config.repartitioned(
1154 target_partitions,
1155 not_used,
1156 output_ordering.clone(),
1157 )?;
1158 assert_partitioning(partitioned_datasrc, None);
1159
1160 let target_partitions = 2;
1162 let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
1163 sort_information_on_config.clone(),
1164 )?;
1165 let partitioned_datasrc = mem_src_config.repartitioned(
1166 target_partitions,
1167 not_used,
1168 output_ordering.clone(),
1169 )?;
1170 assert_partitioning(partitioned_datasrc, None);
1171
1172 let target_partitions = 4;
1174 let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
1175 sort_information_on_config.clone(),
1176 )?;
1177 let partitioned_datasrc = mem_src_config.repartitioned(
1178 target_partitions,
1179 not_used,
1180 output_ordering.clone(),
1181 )?;
1182 assert_partitioning(partitioned_datasrc, None);
1183
1184 let target_partitions = 5;
1187 let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
1188 sort_information_on_config.clone(),
1189 )?;
1190 let partitioned_datasrc = mem_src_config.repartitioned(
1191 target_partitions,
1192 not_used,
1193 output_ordering.clone(),
1194 )?;
1195 assert_partitioning(partitioned_datasrc, Some(5));
1196
1197 let target_partitions = 6;
1200 let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
1201 sort_information_on_config.clone(),
1202 )?;
1203 let partitioned_datasrc = mem_src_config.repartitioned(
1204 target_partitions,
1205 not_used,
1206 output_ordering.clone(),
1207 )?;
1208 assert_partitioning(partitioned_datasrc, Some(6));
1209
1210 let target_partitions = 3 * 2 + 1;
1212 let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
1213 sort_information_on_config.clone(),
1214 )?;
1215 let partitioned_datasrc = mem_src_config.repartitioned(
1216 target_partitions,
1217 not_used,
1218 output_ordering.clone(),
1219 )?;
1220 assert_partitioning(partitioned_datasrc, None);
1221
1222 let target_partitions = 2;
1225 let mem_src_config = memorysrcconfig_1_partition_with_different_sized_batches(
1226 sort_information_on_config,
1227 )?;
1228 let partitioned_datasrc = mem_src_config.clone().repartitioned(
1229 target_partitions,
1230 not_used,
1231 output_ordering,
1232 )?;
1233 assert_partitioning(partitioned_datasrc.clone(), Some(2));
1234 let partitioned_datasrc = partitioned_datasrc.unwrap();
1237 let Some(mem_src_config) =
1238 partitioned_datasrc.downcast_ref::<MemorySourceConfig>()
1239 else {
1240 unreachable!()
1241 };
1242 let repartitioned_raw_batches = mem_src_config.partitions.clone();
1243 assert_eq!(repartitioned_raw_batches.len(), 2);
1244 let [ref p1, ref p2] = repartitioned_raw_batches[..] else {
1245 unreachable!()
1246 };
1247 assert_eq!(p1.len(), 1);
1249 assert_eq!(p1[0].num_rows(), 100_000);
1250 assert_eq!(p2.len(), 3);
1252 assert_eq!(p2[0].num_rows(), 10_000);
1253 assert_eq!(p2[1].num_rows(), 100);
1254 assert_eq!(p2[2].num_rows(), 1);
1255
1256 Ok(())
1257 }
1258
1259 #[test]
1260 fn test_repartition_no_sort_information_no_output_ordering() -> Result<()> {
1261 let no_sort = vec![];
1262 let no_output_ordering = None;
1263
1264 run_all_test_scenarios(no_output_ordering.clone(), no_sort.clone())?;
1266
1267 let target_partitions = 3;
1271 let mem_src_config =
1272 memorysrcconfig_2_partition_with_different_sized_batches(no_sort)?;
1273 let partitioned_datasrc = mem_src_config.clone().repartitioned(
1274 target_partitions,
1275 usize::MAX,
1276 no_output_ordering,
1277 )?;
1278 assert_partitioning(partitioned_datasrc.clone(), Some(3));
1279 let repartitioned_raw_batches = mem_src_config
1282 .repartition_evenly_by_size(target_partitions)?
1283 .unwrap();
1284 assert_eq!(repartitioned_raw_batches.len(), 3);
1285 let [ref p1, ref p2, ref p3] = repartitioned_raw_batches[..] else {
1286 unreachable!()
1287 };
1288 assert_eq!(p1.len(), 1);
1290 assert_eq!(p1[0].num_rows(), 100_000);
1291 assert_eq!(p2.len(), 1);
1293 assert_eq!(p2[0].num_rows(), 10_000);
1294 assert_eq!(p3.len(), 3);
1296 assert_eq!(p3[0].num_rows(), 2_000);
1297 assert_eq!(p3[1].num_rows(), 1_000);
1298 assert_eq!(p3[2].num_rows(), 20);
1299
1300 Ok(())
1301 }
1302
1303 #[test]
1304 fn test_repartition_no_sort_information_no_output_ordering_lopsized_batches()
1305 -> Result<()> {
1306 let no_sort = vec![];
1307 let no_output_ordering = None;
1308
1309 let target_partitions = 5;
1320 let mem_src_config =
1321 memorysrcconfig_2_partition_with_extreme_sized_batches(no_sort)?;
1322 let partitioned_datasrc = mem_src_config.clone().repartitioned(
1323 target_partitions,
1324 usize::MAX,
1325 no_output_ordering,
1326 )?;
1327 assert_partitioning(partitioned_datasrc.clone(), Some(5));
1328 let repartitioned_raw_batches = mem_src_config
1332 .repartition_evenly_by_size(target_partitions)?
1333 .unwrap();
1334 assert_eq!(repartitioned_raw_batches.len(), 5);
1335 let [ref p1, ref p2, ref p3, ref p4, ref p5] = repartitioned_raw_batches[..]
1336 else {
1337 unreachable!()
1338 };
1339 assert_eq!(p1.len(), 1);
1341 assert_eq!(p1[0].num_rows(), 100_000);
1342 assert_eq!(p2.len(), 1);
1344 assert_eq!(p2[0].num_rows(), 100);
1345 assert_eq!(p3.len(), 3);
1347 assert_eq!(p3[0].num_rows(), 1);
1348 assert_eq!(p3[1].num_rows(), 1);
1349 assert_eq!(p3[2].num_rows(), 1);
1350 assert_eq!(p4.len(), 3);
1352 assert_eq!(p4[0].num_rows(), 1);
1353 assert_eq!(p4[1].num_rows(), 1);
1354 assert_eq!(p4[2].num_rows(), 1);
1355 assert_eq!(p5.len(), 4);
1357 assert_eq!(p5[0].num_rows(), 1);
1358 assert_eq!(p5[1].num_rows(), 1);
1359 assert_eq!(p5[2].num_rows(), 0);
1360 assert_eq!(p5[3].num_rows(), 0);
1361
1362 Ok(())
1363 }
1364
1365 #[test]
1366 fn test_repartition_with_sort_information() -> Result<()> {
1367 let schema = schema();
1368 let sort_key: LexOrdering =
1369 [PhysicalSortExpr::new_default(col("c", &schema)?)].into();
1370 let has_sort = vec![sort_key.clone()];
1371 let output_ordering = Some(sort_key);
1372
1373 run_all_test_scenarios(output_ordering.clone(), has_sort.clone())?;
1375
1376 let target_partitions = 3;
1378 let mem_src_config =
1379 memorysrcconfig_2_partition_with_different_sized_batches(has_sort)?;
1380 let partitioned_datasrc = mem_src_config.clone().repartitioned(
1381 target_partitions,
1382 usize::MAX,
1383 output_ordering.clone(),
1384 )?;
1385 assert_partitioning(partitioned_datasrc.clone(), Some(3));
1386 let Some(output_ord) = output_ordering else {
1389 unreachable!()
1390 };
1391 let repartitioned_raw_batches = mem_src_config
1392 .repartition_preserving_order(target_partitions, output_ord)?
1393 .unwrap();
1394 assert_eq!(repartitioned_raw_batches.len(), 3);
1395 let [ref p1, ref p2, ref p3] = repartitioned_raw_batches[..] else {
1396 unreachable!()
1397 };
1398 assert_eq!(p1.len(), 1);
1400 assert_eq!(p1[0].num_rows(), 100_000);
1401 assert_eq!(p2.len(), 2);
1403 assert_eq!(p2[0].num_rows(), 10_000);
1404 assert_eq!(p2[1].num_rows(), 1_000);
1405 assert_eq!(p3.len(), 2);
1407 assert_eq!(p3[0].num_rows(), 2_000);
1408 assert_eq!(p3[1].num_rows(), 20);
1409
1410 Ok(())
1411 }
1412
1413 #[test]
1414 fn test_repartition_with_batch_ordering_not_matching_sizing() -> Result<()> {
1415 let schema = schema();
1416 let sort_key: LexOrdering =
1417 [PhysicalSortExpr::new_default(col("c", &schema)?)].into();
1418 let has_sort = vec![sort_key.clone()];
1419 let output_ordering = Some(sort_key);
1420
1421 let target_partitions = 2;
1424 let mem_src_config =
1425 memorysrcconfig_1_partition_with_ordering_not_matching_size(has_sort)?;
1426 let partitioned_datasrc = mem_src_config.clone().repartitioned(
1427 target_partitions,
1428 usize::MAX,
1429 output_ordering,
1430 )?;
1431 assert_partitioning(partitioned_datasrc.clone(), Some(2));
1432 let partitioned_datasrc = partitioned_datasrc.unwrap();
1435 let Some(mem_src_config) =
1436 partitioned_datasrc.downcast_ref::<MemorySourceConfig>()
1437 else {
1438 unreachable!()
1439 };
1440 let repartitioned_raw_batches = mem_src_config.partitions.clone();
1441 assert_eq!(repartitioned_raw_batches.len(), 2);
1442 let [ref p1, ref p2] = repartitioned_raw_batches[..] else {
1443 unreachable!()
1444 };
1445 assert_eq!(p1.len(), 1);
1447 assert_eq!(p1[0].num_rows(), 100_000);
1448 assert_eq!(p2.len(), 3);
1450 assert_eq!(p2[0].num_rows(), 1);
1451 assert_eq!(p2[1].num_rows(), 100);
1452 assert_eq!(p2[2].num_rows(), 10_000);
1453
1454 Ok(())
1455 }
1456}