1use std::any::Any;
19use std::cmp::Ordering;
20use std::collections::BinaryHeap;
21use std::fmt;
22use std::fmt::Debug;
23use std::ops::Deref;
24use std::sync::Arc;
25
26use crate::sink::DataSink;
27use crate::source::{DataSource, DataSourceExec};
28
29use arrow::array::{RecordBatch, RecordBatchOptions};
30use arrow::datatypes::{Schema, SchemaRef};
31use datafusion_common::{internal_err, plan_err, project_schema, Result, ScalarValue};
32use datafusion_execution::TaskContext;
33use datafusion_physical_expr::equivalence::{
34 OrderingEquivalenceClass, ProjectionMapping,
35};
36use datafusion_physical_expr::expressions::Column;
37use datafusion_physical_expr::utils::collect_columns;
38use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
39use datafusion_physical_plan::memory::MemoryStream;
40use datafusion_physical_plan::projection::{
41 all_alias_free_columns, new_projections_for_columns, ProjectionExpr,
42};
43use datafusion_physical_plan::{
44 common, ColumnarValue, DisplayAs, DisplayFormatType, Partitioning, PhysicalExpr,
45 SendableRecordBatchStream, Statistics,
46};
47
48use async_trait::async_trait;
49use datafusion_physical_plan::coop::cooperative;
50use datafusion_physical_plan::execution_plan::SchedulingType;
51use futures::StreamExt;
52use itertools::Itertools;
53use tokio::sync::RwLock;
54
55#[derive(Clone, Debug)]
57pub struct MemorySourceConfig {
58 partitions: Vec<Vec<RecordBatch>>,
62 schema: SchemaRef,
64 projected_schema: SchemaRef,
66 projection: Option<Vec<usize>>,
68 sort_information: Vec<LexOrdering>,
70 show_sizes: bool,
72 fetch: Option<usize>,
75}
76
77impl DataSource for MemorySourceConfig {
78 fn open(
79 &self,
80 partition: usize,
81 _context: Arc<TaskContext>,
82 ) -> Result<SendableRecordBatchStream> {
83 Ok(Box::pin(cooperative(
84 MemoryStream::try_new(
85 self.partitions[partition].clone(),
86 Arc::clone(&self.projected_schema),
87 self.projection.clone(),
88 )?
89 .with_fetch(self.fetch),
90 )))
91 }
92
93 fn as_any(&self) -> &dyn Any {
94 self
95 }
96
97 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
98 match t {
99 DisplayFormatType::Default | DisplayFormatType::Verbose => {
100 let partition_sizes: Vec<_> =
101 self.partitions.iter().map(|b| b.len()).collect();
102
103 let output_ordering = self
104 .sort_information
105 .first()
106 .map(|output_ordering| format!(", output_ordering={output_ordering}"))
107 .unwrap_or_default();
108
109 let eq_properties = self.eq_properties();
110 let constraints = eq_properties.constraints();
111 let constraints = if constraints.is_empty() {
112 String::new()
113 } else {
114 format!(", {constraints}")
115 };
116
117 let limit = self
118 .fetch
119 .map_or(String::new(), |limit| format!(", fetch={limit}"));
120 if self.show_sizes {
121 write!(
122 f,
123 "partitions={}, partition_sizes={partition_sizes:?}{limit}{output_ordering}{constraints}",
124 partition_sizes.len(),
125 )
126 } else {
127 write!(
128 f,
129 "partitions={}{limit}{output_ordering}{constraints}",
130 partition_sizes.len(),
131 )
132 }
133 }
134 DisplayFormatType::TreeRender => {
135 let total_rows = self.partitions.iter().map(|b| b.len()).sum::<usize>();
136 let total_bytes: usize = self
137 .partitions
138 .iter()
139 .flatten()
140 .map(|batch| batch.get_array_memory_size())
141 .sum();
142 writeln!(f, "format=memory")?;
143 writeln!(f, "rows={total_rows}")?;
144 writeln!(f, "bytes={total_bytes}")?;
145 Ok(())
146 }
147 }
148 }
149
150 fn repartitioned(
155 &self,
156 target_partitions: usize,
157 _repartition_file_min_size: usize,
158 output_ordering: Option<LexOrdering>,
159 ) -> Result<Option<Arc<dyn DataSource>>> {
160 if self.partitions.is_empty() || self.partitions.len() >= target_partitions
161 {
163 return Ok(None);
164 }
165
166 let maybe_repartitioned = if let Some(output_ordering) = output_ordering {
167 self.repartition_preserving_order(target_partitions, output_ordering)?
168 } else {
169 self.repartition_evenly_by_size(target_partitions)?
170 };
171
172 if let Some(repartitioned) = maybe_repartitioned {
173 Ok(Some(Arc::new(Self::try_new(
174 &repartitioned,
175 self.original_schema(),
176 self.projection.clone(),
177 )?)))
178 } else {
179 Ok(None)
180 }
181 }
182
183 fn output_partitioning(&self) -> Partitioning {
184 Partitioning::UnknownPartitioning(self.partitions.len())
185 }
186
187 fn eq_properties(&self) -> EquivalenceProperties {
188 EquivalenceProperties::new_with_orderings(
189 Arc::clone(&self.projected_schema),
190 self.sort_information.clone(),
191 )
192 }
193
194 fn scheduling_type(&self) -> SchedulingType {
195 SchedulingType::Cooperative
196 }
197
198 fn statistics(&self) -> Result<Statistics> {
199 Ok(common::compute_record_batch_statistics(
200 &self.partitions,
201 &self.schema,
202 self.projection.clone(),
203 ))
204 }
205
206 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
207 let source = self.clone();
208 Some(Arc::new(source.with_limit(limit)))
209 }
210
211 fn fetch(&self) -> Option<usize> {
212 self.fetch
213 }
214
215 fn try_swapping_with_projection(
216 &self,
217 projection: &[ProjectionExpr],
218 ) -> Result<Option<Arc<dyn DataSource>>> {
219 all_alias_free_columns(projection)
222 .then(|| {
223 let all_projections = (0..self.schema.fields().len()).collect();
224 let new_projections = new_projections_for_columns(
225 projection,
226 self.projection().as_ref().unwrap_or(&all_projections),
227 );
228
229 MemorySourceConfig::try_new(
230 self.partitions(),
231 self.original_schema(),
232 Some(new_projections),
233 )
234 .map(|s| Arc::new(s) as Arc<dyn DataSource>)
235 })
236 .transpose()
237 }
238}
239
240impl MemorySourceConfig {
241 pub fn try_new(
244 partitions: &[Vec<RecordBatch>],
245 schema: SchemaRef,
246 projection: Option<Vec<usize>>,
247 ) -> Result<Self> {
248 let projected_schema = project_schema(&schema, projection.as_ref())?;
249 Ok(Self {
250 partitions: partitions.to_vec(),
251 schema,
252 projected_schema,
253 projection,
254 sort_information: vec![],
255 show_sizes: true,
256 fetch: None,
257 })
258 }
259
260 pub fn try_new_exec(
263 partitions: &[Vec<RecordBatch>],
264 schema: SchemaRef,
265 projection: Option<Vec<usize>>,
266 ) -> Result<Arc<DataSourceExec>> {
267 let source = Self::try_new(partitions, schema, projection)?;
268 Ok(DataSourceExec::from_data_source(source))
269 }
270
271 pub fn try_new_as_values(
273 schema: SchemaRef,
274 data: Vec<Vec<Arc<dyn PhysicalExpr>>>,
275 ) -> Result<Arc<DataSourceExec>> {
276 if data.is_empty() {
277 return plan_err!("Values list cannot be empty");
278 }
279
280 let n_row = data.len();
281 let n_col = schema.fields().len();
282
283 let placeholder_schema = Arc::new(Schema::empty());
286 let placeholder_batch = RecordBatch::try_new_with_options(
287 Arc::clone(&placeholder_schema),
288 vec![],
289 &RecordBatchOptions::new().with_row_count(Some(1)),
290 )?;
291
292 let arrays = (0..n_col)
294 .map(|j| {
295 (0..n_row)
296 .map(|i| {
297 let expr = &data[i][j];
298 let result = expr.evaluate(&placeholder_batch)?;
299
300 match result {
301 ColumnarValue::Scalar(scalar) => Ok(scalar),
302 ColumnarValue::Array(array) if array.len() == 1 => {
303 ScalarValue::try_from_array(&array, 0)
304 }
305 ColumnarValue::Array(_) => {
306 plan_err!("Cannot have array values in a values list")
307 }
308 }
309 })
310 .collect::<Result<Vec<_>>>()
311 .and_then(ScalarValue::iter_to_array)
312 })
313 .collect::<Result<Vec<_>>>()?;
314
315 let batch = RecordBatch::try_new_with_options(
316 Arc::clone(&schema),
317 arrays,
318 &RecordBatchOptions::new().with_row_count(Some(n_row)),
319 )?;
320
321 let partitions = vec![batch];
322 Self::try_new_from_batches(Arc::clone(&schema), partitions)
323 }
324
325 pub fn try_new_from_batches(
330 schema: SchemaRef,
331 batches: Vec<RecordBatch>,
332 ) -> Result<Arc<DataSourceExec>> {
333 if batches.is_empty() {
334 return plan_err!("Values list cannot be empty");
335 }
336
337 for batch in &batches {
338 let batch_schema = batch.schema();
339 if batch_schema != schema {
340 return plan_err!(
341 "Batch has invalid schema. Expected: {}, got: {}",
342 schema,
343 batch_schema
344 );
345 }
346 }
347
348 let partitions = vec![batches];
349 let source = Self {
350 partitions,
351 schema: Arc::clone(&schema),
352 projected_schema: Arc::clone(&schema),
353 projection: None,
354 sort_information: vec![],
355 show_sizes: true,
356 fetch: None,
357 };
358 Ok(DataSourceExec::from_data_source(source))
359 }
360
361 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
363 self.fetch = limit;
364 self
365 }
366
367 pub fn with_show_sizes(mut self, show_sizes: bool) -> Self {
369 self.show_sizes = show_sizes;
370 self
371 }
372
373 pub fn partitions(&self) -> &[Vec<RecordBatch>] {
375 &self.partitions
376 }
377
378 pub fn projection(&self) -> &Option<Vec<usize>> {
380 &self.projection
381 }
382
383 pub fn show_sizes(&self) -> bool {
385 self.show_sizes
386 }
387
388 pub fn sort_information(&self) -> &[LexOrdering] {
390 &self.sort_information
391 }
392
393 pub fn try_with_sort_information(
413 mut self,
414 mut sort_information: Vec<LexOrdering>,
415 ) -> Result<Self> {
416 let fields = self.schema.fields();
418 let ambiguous_column = sort_information
419 .iter()
420 .flat_map(|ordering| ordering.clone())
421 .flat_map(|expr| collect_columns(&expr.expr))
422 .find(|col| {
423 fields
424 .get(col.index())
425 .map(|field| field.name() != col.name())
426 .unwrap_or(true)
427 });
428 if let Some(col) = ambiguous_column {
429 return internal_err!(
430 "Column {:?} is not found in the original schema of the MemorySourceConfig",
431 col
432 );
433 }
434
435 if let Some(projection) = &self.projection {
437 let base_schema = self.original_schema();
438 let proj_exprs = projection.iter().map(|idx| {
439 let name = base_schema.field(*idx).name();
440 (Arc::new(Column::new(name, *idx)) as _, name.to_string())
441 });
442 let projection_mapping =
443 ProjectionMapping::try_new(proj_exprs, &base_schema)?;
444 let base_eqp = EquivalenceProperties::new_with_orderings(
445 Arc::clone(&base_schema),
446 sort_information,
447 );
448 let proj_eqp =
449 base_eqp.project(&projection_mapping, Arc::clone(&self.projected_schema));
450 let oeq_class: OrderingEquivalenceClass = proj_eqp.into();
451 sort_information = oeq_class.into();
452 }
453
454 self.sort_information = sort_information;
455 Ok(self)
456 }
457
458 pub fn original_schema(&self) -> SchemaRef {
460 Arc::clone(&self.schema)
461 }
462
463 fn repartition_preserving_order(
469 &self,
470 target_partitions: usize,
471 output_ordering: LexOrdering,
472 ) -> Result<Option<Vec<Vec<RecordBatch>>>> {
473 if !self.eq_properties().ordering_satisfy(output_ordering)? {
474 Ok(None)
475 } else {
476 let total_num_batches =
477 self.partitions.iter().map(|b| b.len()).sum::<usize>();
478 if total_num_batches < target_partitions {
479 return Ok(None);
481 }
482
483 let cnt_to_repartition = target_partitions - self.partitions.len();
484
485 let to_repartition = self
488 .partitions
489 .iter()
490 .enumerate()
491 .map(|(idx, batches)| RePartition {
492 idx: idx + (cnt_to_repartition * idx), row_count: batches.iter().map(|batch| batch.num_rows()).sum(),
494 batches: batches.clone(),
495 })
496 .collect_vec();
497
498 let mut max_heap = BinaryHeap::with_capacity(target_partitions);
501 for rep in to_repartition {
502 max_heap.push(CompareByRowCount(rep));
503 }
504
505 let mut cannot_split_further = Vec::with_capacity(target_partitions);
508 for _ in 0..cnt_to_repartition {
509 loop {
511 let Some(to_split) = max_heap.pop() else {
513 break;
515 };
516
517 let mut new_partitions = to_split.into_inner().split();
519 if new_partitions.len() > 1 {
520 for new_partition in new_partitions {
521 max_heap.push(CompareByRowCount(new_partition));
522 }
523 break;
525 } else {
526 cannot_split_further.push(new_partitions.remove(0));
527 }
528 }
529 }
530 let mut partitions = max_heap
531 .drain()
532 .map(CompareByRowCount::into_inner)
533 .collect_vec();
534 partitions.extend(cannot_split_further);
535
536 partitions.sort_by_key(|p| p.idx);
539 let partitions = partitions.into_iter().map(|rep| rep.batches).collect_vec();
540
541 Ok(Some(partitions))
542 }
543 }
544
545 fn repartition_evenly_by_size(
554 &self,
555 target_partitions: usize,
556 ) -> Result<Option<Vec<Vec<RecordBatch>>>> {
557 let mut flatten_batches =
559 self.partitions.clone().into_iter().flatten().collect_vec();
560 if flatten_batches.len() < target_partitions {
561 return Ok(None);
562 }
563
564 let total_num_rows = flatten_batches.iter().map(|b| b.num_rows()).sum::<usize>();
566 flatten_batches.sort_by_key(|b| std::cmp::Reverse(b.num_rows()));
568
569 let mut partitions =
571 vec![Vec::with_capacity(flatten_batches.len()); target_partitions];
572 let mut target_partition_size = total_num_rows.div_ceil(target_partitions);
573 let mut total_rows_seen = 0;
574 let mut curr_bin_row_count = 0;
575 let mut idx = 0;
576 for batch in flatten_batches {
577 let row_cnt = batch.num_rows();
578 idx = std::cmp::min(idx, target_partitions - 1);
579
580 partitions[idx].push(batch);
581 curr_bin_row_count += row_cnt;
582 total_rows_seen += row_cnt;
583
584 if curr_bin_row_count >= target_partition_size {
585 idx += 1;
586 curr_bin_row_count = 0;
587
588 if total_rows_seen < total_num_rows {
591 target_partition_size = (total_num_rows - total_rows_seen)
592 .div_ceil(target_partitions - idx);
593 }
594 }
595 }
596
597 Ok(Some(partitions))
598 }
599}
600
601struct RePartition {
605 idx: usize,
607 row_count: usize,
610 batches: Vec<RecordBatch>,
612}
613
614impl RePartition {
615 fn split(self) -> Vec<Self> {
619 if self.batches.len() == 1 {
620 return vec![self];
621 }
622
623 let new_0 = RePartition {
624 idx: self.idx, row_count: 0,
626 batches: vec![],
627 };
628 let new_1 = RePartition {
629 idx: self.idx + 1, row_count: 0,
631 batches: vec![],
632 };
633 let split_pt = self.row_count / 2;
634
635 let [new_0, new_1] = self.batches.into_iter().fold(
636 [new_0, new_1],
637 |[mut new0, mut new1], batch| {
638 if new0.row_count < split_pt {
639 new0.add_batch(batch);
640 } else {
641 new1.add_batch(batch);
642 }
643 [new0, new1]
644 },
645 );
646 vec![new_0, new_1]
647 }
648
649 fn add_batch(&mut self, batch: RecordBatch) {
650 self.row_count += batch.num_rows();
651 self.batches.push(batch);
652 }
653}
654
655impl fmt::Display for RePartition {
656 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
657 write!(
658 f,
659 "{}rows-in-{}batches@{}",
660 self.row_count,
661 self.batches.len(),
662 self.idx
663 )
664 }
665}
666
667struct CompareByRowCount(RePartition);
668impl CompareByRowCount {
669 fn into_inner(self) -> RePartition {
670 self.0
671 }
672}
673impl Ord for CompareByRowCount {
674 fn cmp(&self, other: &Self) -> Ordering {
675 self.0.row_count.cmp(&other.0.row_count)
676 }
677}
678impl PartialOrd for CompareByRowCount {
679 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
680 Some(self.cmp(other))
681 }
682}
683impl PartialEq for CompareByRowCount {
684 fn eq(&self, other: &Self) -> bool {
685 self.cmp(other) == Ordering::Equal
687 }
688}
689impl Eq for CompareByRowCount {}
690impl Deref for CompareByRowCount {
691 type Target = RePartition;
692 fn deref(&self) -> &Self::Target {
693 &self.0
694 }
695}
696
697pub type PartitionData = Arc<RwLock<Vec<RecordBatch>>>;
699
700pub struct MemSink {
704 batches: Vec<PartitionData>,
706 schema: SchemaRef,
707}
708
709impl Debug for MemSink {
710 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
711 f.debug_struct("MemSink")
712 .field("num_partitions", &self.batches.len())
713 .finish()
714 }
715}
716
717impl DisplayAs for MemSink {
718 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
719 match t {
720 DisplayFormatType::Default | DisplayFormatType::Verbose => {
721 let partition_count = self.batches.len();
722 write!(f, "MemoryTable (partitions={partition_count})")
723 }
724 DisplayFormatType::TreeRender => {
725 write!(f, "")
727 }
728 }
729 }
730}
731
732impl MemSink {
733 pub fn try_new(batches: Vec<PartitionData>, schema: SchemaRef) -> Result<Self> {
737 if batches.is_empty() {
738 return plan_err!("Cannot insert into MemTable with zero partitions");
739 }
740 Ok(Self { batches, schema })
741 }
742}
743
744#[async_trait]
745impl DataSink for MemSink {
746 fn as_any(&self) -> &dyn Any {
747 self
748 }
749
750 fn schema(&self) -> &SchemaRef {
751 &self.schema
752 }
753
754 async fn write_all(
755 &self,
756 mut data: SendableRecordBatchStream,
757 _context: &Arc<TaskContext>,
758 ) -> Result<u64> {
759 let num_partitions = self.batches.len();
760
761 let mut new_batches = vec![vec![]; num_partitions];
764 let mut i = 0;
765 let mut row_count = 0;
766 while let Some(batch) = data.next().await.transpose()? {
767 row_count += batch.num_rows();
768 new_batches[i].push(batch);
769 i = (i + 1) % num_partitions;
770 }
771
772 for (target, mut batches) in self.batches.iter().zip(new_batches.into_iter()) {
774 target.write().await.append(&mut batches);
776 }
777
778 Ok(row_count as u64)
779 }
780}
781
782#[cfg(test)]
783mod memory_source_tests {
784 use std::sync::Arc;
785
786 use crate::memory::MemorySourceConfig;
787 use crate::source::DataSourceExec;
788
789 use arrow::compute::SortOptions;
790 use arrow::datatypes::{DataType, Field, Schema};
791 use datafusion_common::Result;
792 use datafusion_physical_expr::expressions::col;
793 use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
794 use datafusion_physical_plan::ExecutionPlan;
795
796 #[test]
797 fn test_memory_order_eq() -> Result<()> {
798 let schema = Arc::new(Schema::new(vec![
799 Field::new("a", DataType::Int64, false),
800 Field::new("b", DataType::Int64, false),
801 Field::new("c", DataType::Int64, false),
802 ]));
803 let sort1: LexOrdering = [
804 PhysicalSortExpr {
805 expr: col("a", &schema)?,
806 options: SortOptions::default(),
807 },
808 PhysicalSortExpr {
809 expr: col("b", &schema)?,
810 options: SortOptions::default(),
811 },
812 ]
813 .into();
814 let sort2: LexOrdering = [PhysicalSortExpr {
815 expr: col("c", &schema)?,
816 options: SortOptions::default(),
817 }]
818 .into();
819 let mut expected_output_order = sort1.clone();
820 expected_output_order.extend(sort2.clone());
821
822 let sort_information = vec![sort1.clone(), sort2.clone()];
823 let mem_exec = DataSourceExec::from_data_source(
824 MemorySourceConfig::try_new(&[vec![]], schema, None)?
825 .try_with_sort_information(sort_information)?,
826 );
827
828 assert_eq!(
829 mem_exec.properties().output_ordering().unwrap(),
830 &expected_output_order
831 );
832 let eq_properties = mem_exec.properties().equivalence_properties();
833 assert!(eq_properties.oeq_class().contains(&sort1));
834 assert!(eq_properties.oeq_class().contains(&sort2));
835 Ok(())
836 }
837}
838
839#[cfg(test)]
840mod tests {
841 use super::*;
842 use crate::test_util::col;
843 use crate::tests::{aggr_test_schema, make_partition};
844
845 use arrow::array::{ArrayRef, Int32Array, Int64Array, StringArray};
846 use arrow::datatypes::{DataType, Field};
847 use datafusion_common::assert_batches_eq;
848 use datafusion_common::stats::{ColumnStatistics, Precision};
849 use datafusion_physical_expr::PhysicalSortExpr;
850 use datafusion_physical_plan::expressions::lit;
851
852 use datafusion_physical_plan::ExecutionPlan;
853 use futures::StreamExt;
854
855 #[tokio::test]
856 async fn exec_with_limit() -> Result<()> {
857 let task_ctx = Arc::new(TaskContext::default());
858 let batch = make_partition(7);
859 let schema = batch.schema();
860 let batches = vec![batch.clone(), batch];
861
862 let exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap();
863 assert_eq!(exec.fetch(), None);
864
865 let exec = exec.with_fetch(Some(4)).unwrap();
866 assert_eq!(exec.fetch(), Some(4));
867
868 let mut it = exec.execute(0, task_ctx)?;
869 let mut results = vec![];
870 while let Some(batch) = it.next().await {
871 results.push(batch?);
872 }
873
874 let expected = [
875 "+---+", "| i |", "+---+", "| 0 |", "| 1 |", "| 2 |", "| 3 |", "+---+",
876 ];
877 assert_batches_eq!(expected, &results);
878 Ok(())
879 }
880
881 #[tokio::test]
882 async fn values_empty_case() -> Result<()> {
883 let schema = aggr_test_schema();
884 let empty = MemorySourceConfig::try_new_as_values(schema, vec![]);
885 assert!(empty.is_err());
886 Ok(())
887 }
888
889 #[test]
890 fn new_exec_with_batches() {
891 let batch = make_partition(7);
892 let schema = batch.schema();
893 let batches = vec![batch.clone(), batch];
894 let _exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap();
895 }
896
897 #[test]
898 fn new_exec_with_batches_empty() {
899 let batch = make_partition(7);
900 let schema = batch.schema();
901 let _ = MemorySourceConfig::try_new_from_batches(schema, Vec::new()).unwrap_err();
902 }
903
904 #[test]
905 fn new_exec_with_batches_invalid_schema() {
906 let batch = make_partition(7);
907 let batches = vec![batch.clone(), batch];
908
909 let invalid_schema = Arc::new(Schema::new(vec![
910 Field::new("col0", DataType::UInt32, false),
911 Field::new("col1", DataType::Utf8, false),
912 ]));
913 let _ = MemorySourceConfig::try_new_from_batches(invalid_schema, batches)
914 .unwrap_err();
915 }
916
917 #[test]
919 fn new_exec_with_non_nullable_schema() {
920 let schema = Arc::new(Schema::new(vec![Field::new(
921 "col0",
922 DataType::UInt32,
923 false,
924 )]));
925 let _ = MemorySourceConfig::try_new_as_values(
926 Arc::clone(&schema),
927 vec![vec![lit(1u32)]],
928 )
929 .unwrap();
930 let _ = MemorySourceConfig::try_new_as_values(
932 schema,
933 vec![vec![lit(ScalarValue::UInt32(None))]],
934 )
935 .unwrap_err();
936 }
937
938 #[test]
939 fn values_stats_with_nulls_only() -> Result<()> {
940 let data = vec![
941 vec![lit(ScalarValue::Null)],
942 vec![lit(ScalarValue::Null)],
943 vec![lit(ScalarValue::Null)],
944 ];
945 let rows = data.len();
946 let values = MemorySourceConfig::try_new_as_values(
947 Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)])),
948 data,
949 )?;
950
951 assert_eq!(
952 values.partition_statistics(None)?,
953 Statistics {
954 num_rows: Precision::Exact(rows),
955 total_byte_size: Precision::Exact(8), column_statistics: vec![ColumnStatistics {
957 null_count: Precision::Exact(rows), distinct_count: Precision::Absent,
959 max_value: Precision::Absent,
960 min_value: Precision::Absent,
961 sum_value: Precision::Absent,
962 },],
963 }
964 );
965
966 Ok(())
967 }
968
969 fn batch(row_size: usize) -> RecordBatch {
970 let a: ArrayRef = Arc::new(Int32Array::from(vec![1; row_size]));
971 let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("foo"); row_size]));
972 let c: ArrayRef = Arc::new(Int64Array::from_iter(vec![1; row_size]));
973 RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
974 }
975
976 fn schema() -> SchemaRef {
977 batch(1).schema()
978 }
979
980 fn memorysrcconfig_no_partitions(
981 sort_information: Vec<LexOrdering>,
982 ) -> Result<MemorySourceConfig> {
983 let partitions = vec![];
984 MemorySourceConfig::try_new(&partitions, schema(), None)?
985 .try_with_sort_information(sort_information)
986 }
987
988 fn memorysrcconfig_1_partition_1_batch(
989 sort_information: Vec<LexOrdering>,
990 ) -> Result<MemorySourceConfig> {
991 let partitions = vec![vec![batch(100)]];
992 MemorySourceConfig::try_new(&partitions, schema(), None)?
993 .try_with_sort_information(sort_information)
994 }
995
996 fn memorysrcconfig_3_partitions_1_batch_each(
997 sort_information: Vec<LexOrdering>,
998 ) -> Result<MemorySourceConfig> {
999 let partitions = vec![vec![batch(100)], vec![batch(100)], vec![batch(100)]];
1000 MemorySourceConfig::try_new(&partitions, schema(), None)?
1001 .try_with_sort_information(sort_information)
1002 }
1003
1004 fn memorysrcconfig_3_partitions_with_2_batches_each(
1005 sort_information: Vec<LexOrdering>,
1006 ) -> Result<MemorySourceConfig> {
1007 let partitions = vec![
1008 vec![batch(100), batch(100)],
1009 vec![batch(100), batch(100)],
1010 vec![batch(100), batch(100)],
1011 ];
1012 MemorySourceConfig::try_new(&partitions, schema(), None)?
1013 .try_with_sort_information(sort_information)
1014 }
1015
1016 fn memorysrcconfig_1_partition_with_different_sized_batches(
1019 sort_information: Vec<LexOrdering>,
1020 ) -> Result<MemorySourceConfig> {
1021 let partitions = vec![vec![batch(100_000), batch(10_000), batch(100), batch(1)]];
1022 MemorySourceConfig::try_new(&partitions, schema(), None)?
1023 .try_with_sort_information(sort_information)
1024 }
1025
1026 fn memorysrcconfig_1_partition_with_ordering_not_matching_size(
1030 sort_information: Vec<LexOrdering>,
1031 ) -> Result<MemorySourceConfig> {
1032 let partitions = vec![vec![batch(100_000), batch(1), batch(100), batch(10_000)]];
1033 MemorySourceConfig::try_new(&partitions, schema(), None)?
1034 .try_with_sort_information(sort_information)
1035 }
1036
1037 fn memorysrcconfig_2_partition_with_different_sized_batches(
1038 sort_information: Vec<LexOrdering>,
1039 ) -> Result<MemorySourceConfig> {
1040 let partitions = vec![
1041 vec![batch(100_000), batch(10_000), batch(1_000)],
1042 vec![batch(2_000), batch(20)],
1043 ];
1044 MemorySourceConfig::try_new(&partitions, schema(), None)?
1045 .try_with_sort_information(sort_information)
1046 }
1047
1048 fn memorysrcconfig_2_partition_with_extreme_sized_batches(
1049 sort_information: Vec<LexOrdering>,
1050 ) -> Result<MemorySourceConfig> {
1051 let partitions = vec![
1052 vec![
1053 batch(100_000),
1054 batch(1),
1055 batch(1),
1056 batch(1),
1057 batch(1),
1058 batch(0),
1059 ],
1060 vec![batch(1), batch(1), batch(1), batch(1), batch(0), batch(100)],
1061 ];
1062 MemorySourceConfig::try_new(&partitions, schema(), None)?
1063 .try_with_sort_information(sort_information)
1064 }
1065
1066 fn assert_partitioning(
1070 partitioned_datasrc: Option<Arc<dyn DataSource>>,
1071 partition_cnt: Option<usize>,
1072 ) {
1073 let should_exist = if let Some(partition_cnt) = partition_cnt {
1074 format!("new datasource should exist and have {partition_cnt:?} partitions")
1075 } else {
1076 "new datasource should not exist".into()
1077 };
1078
1079 let actual = partitioned_datasrc
1080 .map(|datasrc| datasrc.output_partitioning().partition_count());
1081 assert_eq!(
1082 actual,
1083 partition_cnt,
1084 "partitioned datasrc does not match expected, we expected {should_exist}, instead found {actual:?}"
1085 );
1086 }
1087
1088 fn run_all_test_scenarios(
1089 output_ordering: Option<LexOrdering>,
1090 sort_information_on_config: Vec<LexOrdering>,
1091 ) -> Result<()> {
1092 let not_used = usize::MAX;
1093
1094 let mem_src_config =
1096 memorysrcconfig_no_partitions(sort_information_on_config.clone())?;
1097 let partitioned_datasrc =
1098 mem_src_config.repartitioned(1, not_used, output_ordering.clone())?;
1099 assert_partitioning(partitioned_datasrc, None);
1100
1101 let target_partitions = 1;
1103 let mem_src_config =
1104 memorysrcconfig_1_partition_1_batch(sort_information_on_config.clone())?;
1105 let partitioned_datasrc = mem_src_config.repartitioned(
1106 target_partitions,
1107 not_used,
1108 output_ordering.clone(),
1109 )?;
1110 assert_partitioning(partitioned_datasrc, None);
1111
1112 let target_partitions = 3;
1114 let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
1115 sort_information_on_config.clone(),
1116 )?;
1117 let partitioned_datasrc = mem_src_config.repartitioned(
1118 target_partitions,
1119 not_used,
1120 output_ordering.clone(),
1121 )?;
1122 assert_partitioning(partitioned_datasrc, None);
1123
1124 let target_partitions = 2;
1126 let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
1127 sort_information_on_config.clone(),
1128 )?;
1129 let partitioned_datasrc = mem_src_config.repartitioned(
1130 target_partitions,
1131 not_used,
1132 output_ordering.clone(),
1133 )?;
1134 assert_partitioning(partitioned_datasrc, None);
1135
1136 let target_partitions = 4;
1138 let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
1139 sort_information_on_config.clone(),
1140 )?;
1141 let partitioned_datasrc = mem_src_config.repartitioned(
1142 target_partitions,
1143 not_used,
1144 output_ordering.clone(),
1145 )?;
1146 assert_partitioning(partitioned_datasrc, None);
1147
1148 let target_partitions = 5;
1151 let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
1152 sort_information_on_config.clone(),
1153 )?;
1154 let partitioned_datasrc = mem_src_config.repartitioned(
1155 target_partitions,
1156 not_used,
1157 output_ordering.clone(),
1158 )?;
1159 assert_partitioning(partitioned_datasrc, Some(5));
1160
1161 let target_partitions = 6;
1164 let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
1165 sort_information_on_config.clone(),
1166 )?;
1167 let partitioned_datasrc = mem_src_config.repartitioned(
1168 target_partitions,
1169 not_used,
1170 output_ordering.clone(),
1171 )?;
1172 assert_partitioning(partitioned_datasrc, Some(6));
1173
1174 let target_partitions = 3 * 2 + 1;
1176 let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
1177 sort_information_on_config.clone(),
1178 )?;
1179 let partitioned_datasrc = mem_src_config.repartitioned(
1180 target_partitions,
1181 not_used,
1182 output_ordering.clone(),
1183 )?;
1184 assert_partitioning(partitioned_datasrc, None);
1185
1186 let target_partitions = 2;
1189 let mem_src_config = memorysrcconfig_1_partition_with_different_sized_batches(
1190 sort_information_on_config,
1191 )?;
1192 let partitioned_datasrc = mem_src_config.clone().repartitioned(
1193 target_partitions,
1194 not_used,
1195 output_ordering,
1196 )?;
1197 assert_partitioning(partitioned_datasrc.clone(), Some(2));
1198 let partitioned_datasrc = partitioned_datasrc.unwrap();
1201 let Some(mem_src_config) = partitioned_datasrc
1202 .as_any()
1203 .downcast_ref::<MemorySourceConfig>()
1204 else {
1205 unreachable!()
1206 };
1207 let repartitioned_raw_batches = mem_src_config.partitions.clone();
1208 assert_eq!(repartitioned_raw_batches.len(), 2);
1209 let [ref p1, ref p2] = repartitioned_raw_batches[..] else {
1210 unreachable!()
1211 };
1212 assert_eq!(p1.len(), 1);
1214 assert_eq!(p1[0].num_rows(), 100_000);
1215 assert_eq!(p2.len(), 3);
1217 assert_eq!(p2[0].num_rows(), 10_000);
1218 assert_eq!(p2[1].num_rows(), 100);
1219 assert_eq!(p2[2].num_rows(), 1);
1220
1221 Ok(())
1222 }
1223
1224 #[test]
1225 fn test_repartition_no_sort_information_no_output_ordering() -> Result<()> {
1226 let no_sort = vec![];
1227 let no_output_ordering = None;
1228
1229 run_all_test_scenarios(no_output_ordering.clone(), no_sort.clone())?;
1231
1232 let target_partitions = 3;
1236 let mem_src_config =
1237 memorysrcconfig_2_partition_with_different_sized_batches(no_sort)?;
1238 let partitioned_datasrc = mem_src_config.clone().repartitioned(
1239 target_partitions,
1240 usize::MAX,
1241 no_output_ordering,
1242 )?;
1243 assert_partitioning(partitioned_datasrc.clone(), Some(3));
1244 let repartitioned_raw_batches = mem_src_config
1247 .repartition_evenly_by_size(target_partitions)?
1248 .unwrap();
1249 assert_eq!(repartitioned_raw_batches.len(), 3);
1250 let [ref p1, ref p2, ref p3] = repartitioned_raw_batches[..] else {
1251 unreachable!()
1252 };
1253 assert_eq!(p1.len(), 1);
1255 assert_eq!(p1[0].num_rows(), 100_000);
1256 assert_eq!(p2.len(), 1);
1258 assert_eq!(p2[0].num_rows(), 10_000);
1259 assert_eq!(p3.len(), 3);
1261 assert_eq!(p3[0].num_rows(), 2_000);
1262 assert_eq!(p3[1].num_rows(), 1_000);
1263 assert_eq!(p3[2].num_rows(), 20);
1264
1265 Ok(())
1266 }
1267
1268 #[test]
1269 fn test_repartition_no_sort_information_no_output_ordering_lopsized_batches(
1270 ) -> Result<()> {
1271 let no_sort = vec![];
1272 let no_output_ordering = None;
1273
1274 let target_partitions = 5;
1285 let mem_src_config =
1286 memorysrcconfig_2_partition_with_extreme_sized_batches(no_sort)?;
1287 let partitioned_datasrc = mem_src_config.clone().repartitioned(
1288 target_partitions,
1289 usize::MAX,
1290 no_output_ordering,
1291 )?;
1292 assert_partitioning(partitioned_datasrc.clone(), Some(5));
1293 let repartitioned_raw_batches = mem_src_config
1297 .repartition_evenly_by_size(target_partitions)?
1298 .unwrap();
1299 assert_eq!(repartitioned_raw_batches.len(), 5);
1300 let [ref p1, ref p2, ref p3, ref p4, ref p5] = repartitioned_raw_batches[..]
1301 else {
1302 unreachable!()
1303 };
1304 assert_eq!(p1.len(), 1);
1306 assert_eq!(p1[0].num_rows(), 100_000);
1307 assert_eq!(p2.len(), 1);
1309 assert_eq!(p2[0].num_rows(), 100);
1310 assert_eq!(p3.len(), 3);
1312 assert_eq!(p3[0].num_rows(), 1);
1313 assert_eq!(p3[1].num_rows(), 1);
1314 assert_eq!(p3[2].num_rows(), 1);
1315 assert_eq!(p4.len(), 3);
1317 assert_eq!(p4[0].num_rows(), 1);
1318 assert_eq!(p4[1].num_rows(), 1);
1319 assert_eq!(p4[2].num_rows(), 1);
1320 assert_eq!(p5.len(), 4);
1322 assert_eq!(p5[0].num_rows(), 1);
1323 assert_eq!(p5[1].num_rows(), 1);
1324 assert_eq!(p5[2].num_rows(), 0);
1325 assert_eq!(p5[3].num_rows(), 0);
1326
1327 Ok(())
1328 }
1329
1330 #[test]
1331 fn test_repartition_with_sort_information() -> Result<()> {
1332 let schema = schema();
1333 let sort_key: LexOrdering =
1334 [PhysicalSortExpr::new_default(col("c", &schema)?)].into();
1335 let has_sort = vec![sort_key.clone()];
1336 let output_ordering = Some(sort_key);
1337
1338 run_all_test_scenarios(output_ordering.clone(), has_sort.clone())?;
1340
1341 let target_partitions = 3;
1343 let mem_src_config =
1344 memorysrcconfig_2_partition_with_different_sized_batches(has_sort)?;
1345 let partitioned_datasrc = mem_src_config.clone().repartitioned(
1346 target_partitions,
1347 usize::MAX,
1348 output_ordering.clone(),
1349 )?;
1350 assert_partitioning(partitioned_datasrc.clone(), Some(3));
1351 let Some(output_ord) = output_ordering else {
1354 unreachable!()
1355 };
1356 let repartitioned_raw_batches = mem_src_config
1357 .repartition_preserving_order(target_partitions, output_ord)?
1358 .unwrap();
1359 assert_eq!(repartitioned_raw_batches.len(), 3);
1360 let [ref p1, ref p2, ref p3] = repartitioned_raw_batches[..] else {
1361 unreachable!()
1362 };
1363 assert_eq!(p1.len(), 1);
1365 assert_eq!(p1[0].num_rows(), 100_000);
1366 assert_eq!(p2.len(), 2);
1368 assert_eq!(p2[0].num_rows(), 10_000);
1369 assert_eq!(p2[1].num_rows(), 1_000);
1370 assert_eq!(p3.len(), 2);
1372 assert_eq!(p3[0].num_rows(), 2_000);
1373 assert_eq!(p3[1].num_rows(), 20);
1374
1375 Ok(())
1376 }
1377
1378 #[test]
1379 fn test_repartition_with_batch_ordering_not_matching_sizing() -> Result<()> {
1380 let schema = schema();
1381 let sort_key: LexOrdering =
1382 [PhysicalSortExpr::new_default(col("c", &schema)?)].into();
1383 let has_sort = vec![sort_key.clone()];
1384 let output_ordering = Some(sort_key);
1385
1386 let target_partitions = 2;
1389 let mem_src_config =
1390 memorysrcconfig_1_partition_with_ordering_not_matching_size(has_sort)?;
1391 let partitioned_datasrc = mem_src_config.clone().repartitioned(
1392 target_partitions,
1393 usize::MAX,
1394 output_ordering,
1395 )?;
1396 assert_partitioning(partitioned_datasrc.clone(), Some(2));
1397 let partitioned_datasrc = partitioned_datasrc.unwrap();
1400 let Some(mem_src_config) = partitioned_datasrc
1401 .as_any()
1402 .downcast_ref::<MemorySourceConfig>()
1403 else {
1404 unreachable!()
1405 };
1406 let repartitioned_raw_batches = mem_src_config.partitions.clone();
1407 assert_eq!(repartitioned_raw_batches.len(), 2);
1408 let [ref p1, ref p2] = repartitioned_raw_batches[..] else {
1409 unreachable!()
1410 };
1411 assert_eq!(p1.len(), 1);
1413 assert_eq!(p1[0].num_rows(), 100_000);
1414 assert_eq!(p2.len(), 3);
1416 assert_eq!(p2[0].num_rows(), 1);
1417 assert_eq!(p2[1].num_rows(), 100);
1418 assert_eq!(p2[2].num_rows(), 10_000);
1419
1420 Ok(())
1421 }
1422}