1use std::any::Any;
19use std::cmp::Ordering;
20use std::collections::BinaryHeap;
21use std::fmt;
22use std::fmt::Debug;
23use std::sync::Arc;
24
25use crate::sink::DataSink;
26use crate::source::{DataSource, DataSourceExec};
27
28use arrow::array::{RecordBatch, RecordBatchOptions};
29use arrow::datatypes::{Schema, SchemaRef};
30use datafusion_common::{internal_err, plan_err, project_schema, Result, ScalarValue};
31use datafusion_execution::TaskContext;
32use datafusion_physical_expr::equivalence::{
33 OrderingEquivalenceClass, ProjectionMapping,
34};
35use datafusion_physical_expr::expressions::Column;
36use datafusion_physical_expr::utils::collect_columns;
37use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
38use datafusion_physical_plan::memory::MemoryStream;
39use datafusion_physical_plan::projection::{
40 all_alias_free_columns, new_projections_for_columns, ProjectionExec,
41};
42use datafusion_physical_plan::{
43 common, ColumnarValue, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
44 PhysicalExpr, SendableRecordBatchStream, Statistics,
45};
46
47use async_trait::async_trait;
48use datafusion_physical_plan::coop::cooperative;
49use datafusion_physical_plan::execution_plan::SchedulingType;
50use futures::StreamExt;
51use itertools::Itertools;
52use tokio::sync::RwLock;
53
54#[derive(Clone, Debug)]
56pub struct MemorySourceConfig {
57 partitions: Vec<Vec<RecordBatch>>,
61 schema: SchemaRef,
63 projected_schema: SchemaRef,
65 projection: Option<Vec<usize>>,
67 sort_information: Vec<LexOrdering>,
69 show_sizes: bool,
71 fetch: Option<usize>,
74}
75
76impl DataSource for MemorySourceConfig {
77 fn open(
78 &self,
79 partition: usize,
80 _context: Arc<TaskContext>,
81 ) -> Result<SendableRecordBatchStream> {
82 Ok(Box::pin(cooperative(
83 MemoryStream::try_new(
84 self.partitions[partition].clone(),
85 Arc::clone(&self.projected_schema),
86 self.projection.clone(),
87 )?
88 .with_fetch(self.fetch),
89 )))
90 }
91
92 fn as_any(&self) -> &dyn Any {
93 self
94 }
95
96 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
97 match t {
98 DisplayFormatType::Default | DisplayFormatType::Verbose => {
99 let partition_sizes: Vec<_> =
100 self.partitions.iter().map(|b| b.len()).collect();
101
102 let output_ordering = self
103 .sort_information
104 .first()
105 .map(|output_ordering| format!(", output_ordering={output_ordering}"))
106 .unwrap_or_default();
107
108 let eq_properties = self.eq_properties();
109 let constraints = eq_properties.constraints();
110 let constraints = if constraints.is_empty() {
111 String::new()
112 } else {
113 format!(", {constraints}")
114 };
115
116 let limit = self
117 .fetch
118 .map_or(String::new(), |limit| format!(", fetch={limit}"));
119 if self.show_sizes {
120 write!(
121 f,
122 "partitions={}, partition_sizes={partition_sizes:?}{limit}{output_ordering}{constraints}",
123 partition_sizes.len(),
124 )
125 } else {
126 write!(
127 f,
128 "partitions={}{limit}{output_ordering}{constraints}",
129 partition_sizes.len(),
130 )
131 }
132 }
133 DisplayFormatType::TreeRender => {
134 let total_rows = self.partitions.iter().map(|b| b.len()).sum::<usize>();
135 let total_bytes: usize = self
136 .partitions
137 .iter()
138 .flatten()
139 .map(|batch| batch.get_array_memory_size())
140 .sum();
141 writeln!(f, "format=memory")?;
142 writeln!(f, "rows={total_rows}")?;
143 writeln!(f, "bytes={total_bytes}")?;
144 Ok(())
145 }
146 }
147 }
148
149 fn repartitioned(
154 &self,
155 target_partitions: usize,
156 _repartition_file_min_size: usize,
157 output_ordering: Option<LexOrdering>,
158 ) -> Result<Option<Arc<dyn DataSource>>> {
159 if self.partitions.is_empty() || self.partitions.len() >= target_partitions
160 {
162 return Ok(None);
163 }
164
165 let maybe_repartitioned = if let Some(output_ordering) = output_ordering {
166 self.repartition_preserving_order(target_partitions, output_ordering)?
167 } else {
168 self.repartition_evenly_by_size(target_partitions)?
169 };
170
171 if let Some(repartitioned) = maybe_repartitioned {
172 Ok(Some(Arc::new(Self::try_new(
173 &repartitioned,
174 self.original_schema(),
175 self.projection.clone(),
176 )?)))
177 } else {
178 Ok(None)
179 }
180 }
181
182 fn output_partitioning(&self) -> Partitioning {
183 Partitioning::UnknownPartitioning(self.partitions.len())
184 }
185
186 fn eq_properties(&self) -> EquivalenceProperties {
187 EquivalenceProperties::new_with_orderings(
188 Arc::clone(&self.projected_schema),
189 self.sort_information.clone(),
190 )
191 }
192
193 fn scheduling_type(&self) -> SchedulingType {
194 SchedulingType::Cooperative
195 }
196
197 fn statistics(&self) -> Result<Statistics> {
198 Ok(common::compute_record_batch_statistics(
199 &self.partitions,
200 &self.schema,
201 self.projection.clone(),
202 ))
203 }
204
205 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
206 let source = self.clone();
207 Some(Arc::new(source.with_limit(limit)))
208 }
209
210 fn fetch(&self) -> Option<usize> {
211 self.fetch
212 }
213
214 fn try_swapping_with_projection(
215 &self,
216 projection: &ProjectionExec,
217 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
218 all_alias_free_columns(projection.expr())
221 .then(|| {
222 let all_projections = (0..self.schema.fields().len()).collect();
223 let new_projections = new_projections_for_columns(
224 projection,
225 self.projection().as_ref().unwrap_or(&all_projections),
226 );
227
228 MemorySourceConfig::try_new_exec(
229 self.partitions(),
230 self.original_schema(),
231 Some(new_projections),
232 )
233 .map(|e| e as _)
234 })
235 .transpose()
236 }
237}
238
239impl MemorySourceConfig {
240 pub fn try_new(
243 partitions: &[Vec<RecordBatch>],
244 schema: SchemaRef,
245 projection: Option<Vec<usize>>,
246 ) -> Result<Self> {
247 let projected_schema = project_schema(&schema, projection.as_ref())?;
248 Ok(Self {
249 partitions: partitions.to_vec(),
250 schema,
251 projected_schema,
252 projection,
253 sort_information: vec![],
254 show_sizes: true,
255 fetch: None,
256 })
257 }
258
259 pub fn try_new_exec(
262 partitions: &[Vec<RecordBatch>],
263 schema: SchemaRef,
264 projection: Option<Vec<usize>>,
265 ) -> Result<Arc<DataSourceExec>> {
266 let source = Self::try_new(partitions, schema, projection)?;
267 Ok(DataSourceExec::from_data_source(source))
268 }
269
270 pub fn try_new_as_values(
272 schema: SchemaRef,
273 data: Vec<Vec<Arc<dyn PhysicalExpr>>>,
274 ) -> Result<Arc<DataSourceExec>> {
275 if data.is_empty() {
276 return plan_err!("Values list cannot be empty");
277 }
278
279 let n_row = data.len();
280 let n_col = schema.fields().len();
281
282 let placeholder_schema = Arc::new(Schema::empty());
285 let placeholder_batch = RecordBatch::try_new_with_options(
286 Arc::clone(&placeholder_schema),
287 vec![],
288 &RecordBatchOptions::new().with_row_count(Some(1)),
289 )?;
290
291 let arrays = (0..n_col)
293 .map(|j| {
294 (0..n_row)
295 .map(|i| {
296 let expr = &data[i][j];
297 let result = expr.evaluate(&placeholder_batch)?;
298
299 match result {
300 ColumnarValue::Scalar(scalar) => Ok(scalar),
301 ColumnarValue::Array(array) if array.len() == 1 => {
302 ScalarValue::try_from_array(&array, 0)
303 }
304 ColumnarValue::Array(_) => {
305 plan_err!("Cannot have array values in a values list")
306 }
307 }
308 })
309 .collect::<Result<Vec<_>>>()
310 .and_then(ScalarValue::iter_to_array)
311 })
312 .collect::<Result<Vec<_>>>()?;
313
314 let batch = RecordBatch::try_new_with_options(
315 Arc::clone(&schema),
316 arrays,
317 &RecordBatchOptions::new().with_row_count(Some(n_row)),
318 )?;
319
320 let partitions = vec![batch];
321 Self::try_new_from_batches(Arc::clone(&schema), partitions)
322 }
323
324 pub fn try_new_from_batches(
329 schema: SchemaRef,
330 batches: Vec<RecordBatch>,
331 ) -> Result<Arc<DataSourceExec>> {
332 if batches.is_empty() {
333 return plan_err!("Values list cannot be empty");
334 }
335
336 for batch in &batches {
337 let batch_schema = batch.schema();
338 if batch_schema != schema {
339 return plan_err!(
340 "Batch has invalid schema. Expected: {}, got: {}",
341 schema,
342 batch_schema
343 );
344 }
345 }
346
347 let partitions = vec![batches];
348 let source = Self {
349 partitions,
350 schema: Arc::clone(&schema),
351 projected_schema: Arc::clone(&schema),
352 projection: None,
353 sort_information: vec![],
354 show_sizes: true,
355 fetch: None,
356 };
357 Ok(DataSourceExec::from_data_source(source))
358 }
359
360 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
362 self.fetch = limit;
363 self
364 }
365
366 pub fn with_show_sizes(mut self, show_sizes: bool) -> Self {
368 self.show_sizes = show_sizes;
369 self
370 }
371
372 pub fn partitions(&self) -> &[Vec<RecordBatch>] {
374 &self.partitions
375 }
376
377 pub fn projection(&self) -> &Option<Vec<usize>> {
379 &self.projection
380 }
381
382 pub fn show_sizes(&self) -> bool {
384 self.show_sizes
385 }
386
387 pub fn sort_information(&self) -> &[LexOrdering] {
389 &self.sort_information
390 }
391
392 pub fn try_with_sort_information(
412 mut self,
413 mut sort_information: Vec<LexOrdering>,
414 ) -> Result<Self> {
415 let fields = self.schema.fields();
417 let ambiguous_column = sort_information
418 .iter()
419 .flat_map(|ordering| ordering.clone())
420 .flat_map(|expr| collect_columns(&expr.expr))
421 .find(|col| {
422 fields
423 .get(col.index())
424 .map(|field| field.name() != col.name())
425 .unwrap_or(true)
426 });
427 if let Some(col) = ambiguous_column {
428 return internal_err!(
429 "Column {:?} is not found in the original schema of the MemorySourceConfig",
430 col
431 );
432 }
433
434 if let Some(projection) = &self.projection {
436 let base_schema = self.original_schema();
437 let proj_exprs = projection.iter().map(|idx| {
438 let name = base_schema.field(*idx).name();
439 (Arc::new(Column::new(name, *idx)) as _, name.to_string())
440 });
441 let projection_mapping =
442 ProjectionMapping::try_new(proj_exprs, &base_schema)?;
443 let base_eqp = EquivalenceProperties::new_with_orderings(
444 Arc::clone(&base_schema),
445 sort_information,
446 );
447 let proj_eqp =
448 base_eqp.project(&projection_mapping, Arc::clone(&self.projected_schema));
449 let oeq_class: OrderingEquivalenceClass = proj_eqp.into();
450 sort_information = oeq_class.into();
451 }
452
453 self.sort_information = sort_information;
454 Ok(self)
455 }
456
457 pub fn original_schema(&self) -> SchemaRef {
459 Arc::clone(&self.schema)
460 }
461
462 fn repartition_preserving_order(
468 &self,
469 target_partitions: usize,
470 output_ordering: LexOrdering,
471 ) -> Result<Option<Vec<Vec<RecordBatch>>>> {
472 if !self.eq_properties().ordering_satisfy(output_ordering)? {
473 Ok(None)
474 } else {
475 let total_num_batches =
476 self.partitions.iter().map(|b| b.len()).sum::<usize>();
477 if total_num_batches < target_partitions {
478 return Ok(None);
480 }
481
482 let cnt_to_repartition = target_partitions - self.partitions.len();
483
484 let to_repartition = self
487 .partitions
488 .iter()
489 .enumerate()
490 .map(|(idx, batches)| RePartition {
491 idx: idx + (cnt_to_repartition * idx), row_count: batches.iter().map(|batch| batch.num_rows()).sum(),
493 batches: batches.clone(),
494 })
495 .collect_vec();
496
497 let mut max_heap = BinaryHeap::with_capacity(target_partitions);
500 for rep in to_repartition {
501 max_heap.push(rep);
502 }
503
504 let mut cannot_split_further = Vec::with_capacity(target_partitions);
507 for _ in 0..cnt_to_repartition {
508 loop {
510 let Some(to_split) = max_heap.pop() else {
512 break;
514 };
515
516 let mut new_partitions = to_split.split();
518 if new_partitions.len() > 1 {
519 for new_partition in new_partitions {
520 max_heap.push(new_partition);
521 }
522 break;
524 } else {
525 cannot_split_further.push(new_partitions.remove(0));
526 }
527 }
528 }
529 let mut partitions = max_heap.drain().collect_vec();
530 partitions.extend(cannot_split_further);
531
532 partitions.sort_by_key(|p| p.idx);
535 let partitions = partitions.into_iter().map(|rep| rep.batches).collect_vec();
536
537 Ok(Some(partitions))
538 }
539 }
540
541 fn repartition_evenly_by_size(
550 &self,
551 target_partitions: usize,
552 ) -> Result<Option<Vec<Vec<RecordBatch>>>> {
553 let mut flatten_batches =
555 self.partitions.clone().into_iter().flatten().collect_vec();
556 if flatten_batches.len() < target_partitions {
557 return Ok(None);
558 }
559
560 let total_num_rows = flatten_batches.iter().map(|b| b.num_rows()).sum::<usize>();
562 flatten_batches.sort_by_key(|b| std::cmp::Reverse(b.num_rows()));
564
565 let mut partitions =
567 vec![Vec::with_capacity(flatten_batches.len()); target_partitions];
568 let mut target_partition_size = total_num_rows.div_ceil(target_partitions);
569 let mut total_rows_seen = 0;
570 let mut curr_bin_row_count = 0;
571 let mut idx = 0;
572 for batch in flatten_batches {
573 let row_cnt = batch.num_rows();
574 idx = std::cmp::min(idx, target_partitions - 1);
575
576 partitions[idx].push(batch);
577 curr_bin_row_count += row_cnt;
578 total_rows_seen += row_cnt;
579
580 if curr_bin_row_count >= target_partition_size {
581 idx += 1;
582 curr_bin_row_count = 0;
583
584 if total_rows_seen < total_num_rows {
587 target_partition_size = (total_num_rows - total_rows_seen)
588 .div_ceil(target_partitions - idx);
589 }
590 }
591 }
592
593 Ok(Some(partitions))
594 }
595}
596
597struct RePartition {
601 idx: usize,
603 row_count: usize,
606 batches: Vec<RecordBatch>,
608}
609
610impl RePartition {
611 fn split(self) -> Vec<Self> {
615 if self.batches.len() == 1 {
616 return vec![self];
617 }
618
619 let new_0 = RePartition {
620 idx: self.idx, row_count: 0,
622 batches: vec![],
623 };
624 let new_1 = RePartition {
625 idx: self.idx + 1, row_count: 0,
627 batches: vec![],
628 };
629 let split_pt = self.row_count / 2;
630
631 let [new_0, new_1] = self.batches.into_iter().fold(
632 [new_0, new_1],
633 |[mut new0, mut new1], batch| {
634 if new0.row_count < split_pt {
635 new0.add_batch(batch);
636 } else {
637 new1.add_batch(batch);
638 }
639 [new0, new1]
640 },
641 );
642 vec![new_0, new_1]
643 }
644
645 fn add_batch(&mut self, batch: RecordBatch) {
646 self.row_count += batch.num_rows();
647 self.batches.push(batch);
648 }
649}
650
651impl PartialOrd for RePartition {
652 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
653 Some(self.row_count.cmp(&other.row_count))
654 }
655}
656
657impl Ord for RePartition {
658 fn cmp(&self, other: &Self) -> Ordering {
659 self.row_count.cmp(&other.row_count)
660 }
661}
662
663impl PartialEq for RePartition {
664 fn eq(&self, other: &Self) -> bool {
665 self.row_count.eq(&other.row_count)
666 }
667}
668
669impl Eq for RePartition {}
670
671impl fmt::Display for RePartition {
672 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
673 write!(
674 f,
675 "{}rows-in-{}batches@{}",
676 self.row_count,
677 self.batches.len(),
678 self.idx
679 )
680 }
681}
682
683pub type PartitionData = Arc<RwLock<Vec<RecordBatch>>>;
685
686pub struct MemSink {
690 batches: Vec<PartitionData>,
692 schema: SchemaRef,
693}
694
695impl Debug for MemSink {
696 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
697 f.debug_struct("MemSink")
698 .field("num_partitions", &self.batches.len())
699 .finish()
700 }
701}
702
703impl DisplayAs for MemSink {
704 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
705 match t {
706 DisplayFormatType::Default | DisplayFormatType::Verbose => {
707 let partition_count = self.batches.len();
708 write!(f, "MemoryTable (partitions={partition_count})")
709 }
710 DisplayFormatType::TreeRender => {
711 write!(f, "")
713 }
714 }
715 }
716}
717
718impl MemSink {
719 pub fn try_new(batches: Vec<PartitionData>, schema: SchemaRef) -> Result<Self> {
723 if batches.is_empty() {
724 return plan_err!("Cannot insert into MemTable with zero partitions");
725 }
726 Ok(Self { batches, schema })
727 }
728}
729
730#[async_trait]
731impl DataSink for MemSink {
732 fn as_any(&self) -> &dyn Any {
733 self
734 }
735
736 fn schema(&self) -> &SchemaRef {
737 &self.schema
738 }
739
740 async fn write_all(
741 &self,
742 mut data: SendableRecordBatchStream,
743 _context: &Arc<TaskContext>,
744 ) -> Result<u64> {
745 let num_partitions = self.batches.len();
746
747 let mut new_batches = vec![vec![]; num_partitions];
750 let mut i = 0;
751 let mut row_count = 0;
752 while let Some(batch) = data.next().await.transpose()? {
753 row_count += batch.num_rows();
754 new_batches[i].push(batch);
755 i = (i + 1) % num_partitions;
756 }
757
758 for (target, mut batches) in self.batches.iter().zip(new_batches.into_iter()) {
760 target.write().await.append(&mut batches);
762 }
763
764 Ok(row_count as u64)
765 }
766}
767
768#[cfg(test)]
769mod memory_source_tests {
770 use std::sync::Arc;
771
772 use crate::memory::MemorySourceConfig;
773 use crate::source::DataSourceExec;
774
775 use arrow::compute::SortOptions;
776 use arrow::datatypes::{DataType, Field, Schema};
777 use datafusion_common::Result;
778 use datafusion_physical_expr::expressions::col;
779 use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
780 use datafusion_physical_plan::ExecutionPlan;
781
782 #[test]
783 fn test_memory_order_eq() -> Result<()> {
784 let schema = Arc::new(Schema::new(vec![
785 Field::new("a", DataType::Int64, false),
786 Field::new("b", DataType::Int64, false),
787 Field::new("c", DataType::Int64, false),
788 ]));
789 let sort1: LexOrdering = [
790 PhysicalSortExpr {
791 expr: col("a", &schema)?,
792 options: SortOptions::default(),
793 },
794 PhysicalSortExpr {
795 expr: col("b", &schema)?,
796 options: SortOptions::default(),
797 },
798 ]
799 .into();
800 let sort2: LexOrdering = [PhysicalSortExpr {
801 expr: col("c", &schema)?,
802 options: SortOptions::default(),
803 }]
804 .into();
805 let mut expected_output_order = sort1.clone();
806 expected_output_order.extend(sort2.clone());
807
808 let sort_information = vec![sort1.clone(), sort2.clone()];
809 let mem_exec = DataSourceExec::from_data_source(
810 MemorySourceConfig::try_new(&[vec![]], schema, None)?
811 .try_with_sort_information(sort_information)?,
812 );
813
814 assert_eq!(
815 mem_exec.properties().output_ordering().unwrap(),
816 &expected_output_order
817 );
818 let eq_properties = mem_exec.properties().equivalence_properties();
819 assert!(eq_properties.oeq_class().contains(&sort1));
820 assert!(eq_properties.oeq_class().contains(&sort2));
821 Ok(())
822 }
823}
824
825#[cfg(test)]
826mod tests {
827 use super::*;
828 use crate::test_util::col;
829 use crate::tests::{aggr_test_schema, make_partition};
830
831 use arrow::array::{ArrayRef, Int32Array, Int64Array, StringArray};
832 use arrow::datatypes::{DataType, Field};
833 use datafusion_common::assert_batches_eq;
834 use datafusion_common::stats::{ColumnStatistics, Precision};
835 use datafusion_physical_expr::PhysicalSortExpr;
836 use datafusion_physical_plan::expressions::lit;
837
838 use futures::StreamExt;
839
840 #[tokio::test]
841 async fn exec_with_limit() -> Result<()> {
842 let task_ctx = Arc::new(TaskContext::default());
843 let batch = make_partition(7);
844 let schema = batch.schema();
845 let batches = vec![batch.clone(), batch];
846
847 let exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap();
848 assert_eq!(exec.fetch(), None);
849
850 let exec = exec.with_fetch(Some(4)).unwrap();
851 assert_eq!(exec.fetch(), Some(4));
852
853 let mut it = exec.execute(0, task_ctx)?;
854 let mut results = vec![];
855 while let Some(batch) = it.next().await {
856 results.push(batch?);
857 }
858
859 let expected = [
860 "+---+", "| i |", "+---+", "| 0 |", "| 1 |", "| 2 |", "| 3 |", "+---+",
861 ];
862 assert_batches_eq!(expected, &results);
863 Ok(())
864 }
865
866 #[tokio::test]
867 async fn values_empty_case() -> Result<()> {
868 let schema = aggr_test_schema();
869 let empty = MemorySourceConfig::try_new_as_values(schema, vec![]);
870 assert!(empty.is_err());
871 Ok(())
872 }
873
874 #[test]
875 fn new_exec_with_batches() {
876 let batch = make_partition(7);
877 let schema = batch.schema();
878 let batches = vec![batch.clone(), batch];
879 let _exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap();
880 }
881
882 #[test]
883 fn new_exec_with_batches_empty() {
884 let batch = make_partition(7);
885 let schema = batch.schema();
886 let _ = MemorySourceConfig::try_new_from_batches(schema, Vec::new()).unwrap_err();
887 }
888
889 #[test]
890 fn new_exec_with_batches_invalid_schema() {
891 let batch = make_partition(7);
892 let batches = vec![batch.clone(), batch];
893
894 let invalid_schema = Arc::new(Schema::new(vec![
895 Field::new("col0", DataType::UInt32, false),
896 Field::new("col1", DataType::Utf8, false),
897 ]));
898 let _ = MemorySourceConfig::try_new_from_batches(invalid_schema, batches)
899 .unwrap_err();
900 }
901
902 #[test]
904 fn new_exec_with_non_nullable_schema() {
905 let schema = Arc::new(Schema::new(vec![Field::new(
906 "col0",
907 DataType::UInt32,
908 false,
909 )]));
910 let _ = MemorySourceConfig::try_new_as_values(
911 Arc::clone(&schema),
912 vec![vec![lit(1u32)]],
913 )
914 .unwrap();
915 let _ = MemorySourceConfig::try_new_as_values(
917 schema,
918 vec![vec![lit(ScalarValue::UInt32(None))]],
919 )
920 .unwrap_err();
921 }
922
923 #[test]
924 fn values_stats_with_nulls_only() -> Result<()> {
925 let data = vec![
926 vec![lit(ScalarValue::Null)],
927 vec![lit(ScalarValue::Null)],
928 vec![lit(ScalarValue::Null)],
929 ];
930 let rows = data.len();
931 let values = MemorySourceConfig::try_new_as_values(
932 Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)])),
933 data,
934 )?;
935
936 assert_eq!(
937 values.partition_statistics(None)?,
938 Statistics {
939 num_rows: Precision::Exact(rows),
940 total_byte_size: Precision::Exact(8), column_statistics: vec![ColumnStatistics {
942 null_count: Precision::Exact(rows), distinct_count: Precision::Absent,
944 max_value: Precision::Absent,
945 min_value: Precision::Absent,
946 sum_value: Precision::Absent,
947 },],
948 }
949 );
950
951 Ok(())
952 }
953
954 fn batch(row_size: usize) -> RecordBatch {
955 let a: ArrayRef = Arc::new(Int32Array::from(vec![1; row_size]));
956 let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("foo"); row_size]));
957 let c: ArrayRef = Arc::new(Int64Array::from_iter(vec![1; row_size]));
958 RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
959 }
960
961 fn schema() -> SchemaRef {
962 batch(1).schema()
963 }
964
965 fn memorysrcconfig_no_partitions(
966 sort_information: Vec<LexOrdering>,
967 ) -> Result<MemorySourceConfig> {
968 let partitions = vec![];
969 MemorySourceConfig::try_new(&partitions, schema(), None)?
970 .try_with_sort_information(sort_information)
971 }
972
973 fn memorysrcconfig_1_partition_1_batch(
974 sort_information: Vec<LexOrdering>,
975 ) -> Result<MemorySourceConfig> {
976 let partitions = vec![vec![batch(100)]];
977 MemorySourceConfig::try_new(&partitions, schema(), None)?
978 .try_with_sort_information(sort_information)
979 }
980
981 fn memorysrcconfig_3_partitions_1_batch_each(
982 sort_information: Vec<LexOrdering>,
983 ) -> Result<MemorySourceConfig> {
984 let partitions = vec![vec![batch(100)], vec![batch(100)], vec![batch(100)]];
985 MemorySourceConfig::try_new(&partitions, schema(), None)?
986 .try_with_sort_information(sort_information)
987 }
988
989 fn memorysrcconfig_3_partitions_with_2_batches_each(
990 sort_information: Vec<LexOrdering>,
991 ) -> Result<MemorySourceConfig> {
992 let partitions = vec![
993 vec![batch(100), batch(100)],
994 vec![batch(100), batch(100)],
995 vec![batch(100), batch(100)],
996 ];
997 MemorySourceConfig::try_new(&partitions, schema(), None)?
998 .try_with_sort_information(sort_information)
999 }
1000
1001 fn memorysrcconfig_1_partition_with_different_sized_batches(
1004 sort_information: Vec<LexOrdering>,
1005 ) -> Result<MemorySourceConfig> {
1006 let partitions = vec![vec![batch(100_000), batch(10_000), batch(100), batch(1)]];
1007 MemorySourceConfig::try_new(&partitions, schema(), None)?
1008 .try_with_sort_information(sort_information)
1009 }
1010
1011 fn memorysrcconfig_1_partition_with_ordering_not_matching_size(
1015 sort_information: Vec<LexOrdering>,
1016 ) -> Result<MemorySourceConfig> {
1017 let partitions = vec![vec![batch(100_000), batch(1), batch(100), batch(10_000)]];
1018 MemorySourceConfig::try_new(&partitions, schema(), None)?
1019 .try_with_sort_information(sort_information)
1020 }
1021
1022 fn memorysrcconfig_2_partition_with_different_sized_batches(
1023 sort_information: Vec<LexOrdering>,
1024 ) -> Result<MemorySourceConfig> {
1025 let partitions = vec![
1026 vec![batch(100_000), batch(10_000), batch(1_000)],
1027 vec![batch(2_000), batch(20)],
1028 ];
1029 MemorySourceConfig::try_new(&partitions, schema(), None)?
1030 .try_with_sort_information(sort_information)
1031 }
1032
1033 fn memorysrcconfig_2_partition_with_extreme_sized_batches(
1034 sort_information: Vec<LexOrdering>,
1035 ) -> Result<MemorySourceConfig> {
1036 let partitions = vec![
1037 vec![
1038 batch(100_000),
1039 batch(1),
1040 batch(1),
1041 batch(1),
1042 batch(1),
1043 batch(0),
1044 ],
1045 vec![batch(1), batch(1), batch(1), batch(1), batch(0), batch(100)],
1046 ];
1047 MemorySourceConfig::try_new(&partitions, schema(), None)?
1048 .try_with_sort_information(sort_information)
1049 }
1050
1051 fn assert_partitioning(
1055 partitioned_datasrc: Option<Arc<dyn DataSource>>,
1056 partition_cnt: Option<usize>,
1057 ) {
1058 let should_exist = if let Some(partition_cnt) = partition_cnt {
1059 format!("new datasource should exist and have {partition_cnt:?} partitions")
1060 } else {
1061 "new datasource should not exist".into()
1062 };
1063
1064 let actual = partitioned_datasrc
1065 .map(|datasrc| datasrc.output_partitioning().partition_count());
1066 assert_eq!(
1067 actual,
1068 partition_cnt,
1069 "partitioned datasrc does not match expected, we expected {should_exist}, instead found {actual:?}"
1070 );
1071 }
1072
1073 fn run_all_test_scenarios(
1074 output_ordering: Option<LexOrdering>,
1075 sort_information_on_config: Vec<LexOrdering>,
1076 ) -> Result<()> {
1077 let not_used = usize::MAX;
1078
1079 let mem_src_config =
1081 memorysrcconfig_no_partitions(sort_information_on_config.clone())?;
1082 let partitioned_datasrc =
1083 mem_src_config.repartitioned(1, not_used, output_ordering.clone())?;
1084 assert_partitioning(partitioned_datasrc, None);
1085
1086 let target_partitions = 1;
1088 let mem_src_config =
1089 memorysrcconfig_1_partition_1_batch(sort_information_on_config.clone())?;
1090 let partitioned_datasrc = mem_src_config.repartitioned(
1091 target_partitions,
1092 not_used,
1093 output_ordering.clone(),
1094 )?;
1095 assert_partitioning(partitioned_datasrc, None);
1096
1097 let target_partitions = 3;
1099 let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
1100 sort_information_on_config.clone(),
1101 )?;
1102 let partitioned_datasrc = mem_src_config.repartitioned(
1103 target_partitions,
1104 not_used,
1105 output_ordering.clone(),
1106 )?;
1107 assert_partitioning(partitioned_datasrc, None);
1108
1109 let target_partitions = 2;
1111 let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
1112 sort_information_on_config.clone(),
1113 )?;
1114 let partitioned_datasrc = mem_src_config.repartitioned(
1115 target_partitions,
1116 not_used,
1117 output_ordering.clone(),
1118 )?;
1119 assert_partitioning(partitioned_datasrc, None);
1120
1121 let target_partitions = 4;
1123 let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
1124 sort_information_on_config.clone(),
1125 )?;
1126 let partitioned_datasrc = mem_src_config.repartitioned(
1127 target_partitions,
1128 not_used,
1129 output_ordering.clone(),
1130 )?;
1131 assert_partitioning(partitioned_datasrc, None);
1132
1133 let target_partitions = 5;
1136 let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
1137 sort_information_on_config.clone(),
1138 )?;
1139 let partitioned_datasrc = mem_src_config.repartitioned(
1140 target_partitions,
1141 not_used,
1142 output_ordering.clone(),
1143 )?;
1144 assert_partitioning(partitioned_datasrc, Some(5));
1145
1146 let target_partitions = 6;
1149 let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
1150 sort_information_on_config.clone(),
1151 )?;
1152 let partitioned_datasrc = mem_src_config.repartitioned(
1153 target_partitions,
1154 not_used,
1155 output_ordering.clone(),
1156 )?;
1157 assert_partitioning(partitioned_datasrc, Some(6));
1158
1159 let target_partitions = 3 * 2 + 1;
1161 let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
1162 sort_information_on_config.clone(),
1163 )?;
1164 let partitioned_datasrc = mem_src_config.repartitioned(
1165 target_partitions,
1166 not_used,
1167 output_ordering.clone(),
1168 )?;
1169 assert_partitioning(partitioned_datasrc, None);
1170
1171 let target_partitions = 2;
1174 let mem_src_config = memorysrcconfig_1_partition_with_different_sized_batches(
1175 sort_information_on_config,
1176 )?;
1177 let partitioned_datasrc = mem_src_config.clone().repartitioned(
1178 target_partitions,
1179 not_used,
1180 output_ordering,
1181 )?;
1182 assert_partitioning(partitioned_datasrc.clone(), Some(2));
1183 let partitioned_datasrc = partitioned_datasrc.unwrap();
1186 let Some(mem_src_config) = partitioned_datasrc
1187 .as_any()
1188 .downcast_ref::<MemorySourceConfig>()
1189 else {
1190 unreachable!()
1191 };
1192 let repartitioned_raw_batches = mem_src_config.partitions.clone();
1193 assert_eq!(repartitioned_raw_batches.len(), 2);
1194 let [ref p1, ref p2] = repartitioned_raw_batches[..] else {
1195 unreachable!()
1196 };
1197 assert_eq!(p1.len(), 1);
1199 assert_eq!(p1[0].num_rows(), 100_000);
1200 assert_eq!(p2.len(), 3);
1202 assert_eq!(p2[0].num_rows(), 10_000);
1203 assert_eq!(p2[1].num_rows(), 100);
1204 assert_eq!(p2[2].num_rows(), 1);
1205
1206 Ok(())
1207 }
1208
1209 #[test]
1210 fn test_repartition_no_sort_information_no_output_ordering() -> Result<()> {
1211 let no_sort = vec![];
1212 let no_output_ordering = None;
1213
1214 run_all_test_scenarios(no_output_ordering.clone(), no_sort.clone())?;
1216
1217 let target_partitions = 3;
1221 let mem_src_config =
1222 memorysrcconfig_2_partition_with_different_sized_batches(no_sort)?;
1223 let partitioned_datasrc = mem_src_config.clone().repartitioned(
1224 target_partitions,
1225 usize::MAX,
1226 no_output_ordering,
1227 )?;
1228 assert_partitioning(partitioned_datasrc.clone(), Some(3));
1229 let repartitioned_raw_batches = mem_src_config
1232 .repartition_evenly_by_size(target_partitions)?
1233 .unwrap();
1234 assert_eq!(repartitioned_raw_batches.len(), 3);
1235 let [ref p1, ref p2, ref p3] = repartitioned_raw_batches[..] else {
1236 unreachable!()
1237 };
1238 assert_eq!(p1.len(), 1);
1240 assert_eq!(p1[0].num_rows(), 100_000);
1241 assert_eq!(p2.len(), 1);
1243 assert_eq!(p2[0].num_rows(), 10_000);
1244 assert_eq!(p3.len(), 3);
1246 assert_eq!(p3[0].num_rows(), 2_000);
1247 assert_eq!(p3[1].num_rows(), 1_000);
1248 assert_eq!(p3[2].num_rows(), 20);
1249
1250 Ok(())
1251 }
1252
1253 #[test]
1254 fn test_repartition_no_sort_information_no_output_ordering_lopsized_batches(
1255 ) -> Result<()> {
1256 let no_sort = vec![];
1257 let no_output_ordering = None;
1258
1259 let target_partitions = 5;
1270 let mem_src_config =
1271 memorysrcconfig_2_partition_with_extreme_sized_batches(no_sort)?;
1272 let partitioned_datasrc = mem_src_config.clone().repartitioned(
1273 target_partitions,
1274 usize::MAX,
1275 no_output_ordering,
1276 )?;
1277 assert_partitioning(partitioned_datasrc.clone(), Some(5));
1278 let repartitioned_raw_batches = mem_src_config
1282 .repartition_evenly_by_size(target_partitions)?
1283 .unwrap();
1284 assert_eq!(repartitioned_raw_batches.len(), 5);
1285 let [ref p1, ref p2, ref p3, ref p4, ref p5] = repartitioned_raw_batches[..]
1286 else {
1287 unreachable!()
1288 };
1289 assert_eq!(p1.len(), 1);
1291 assert_eq!(p1[0].num_rows(), 100_000);
1292 assert_eq!(p2.len(), 1);
1294 assert_eq!(p2[0].num_rows(), 100);
1295 assert_eq!(p3.len(), 3);
1297 assert_eq!(p3[0].num_rows(), 1);
1298 assert_eq!(p3[1].num_rows(), 1);
1299 assert_eq!(p3[2].num_rows(), 1);
1300 assert_eq!(p4.len(), 3);
1302 assert_eq!(p4[0].num_rows(), 1);
1303 assert_eq!(p4[1].num_rows(), 1);
1304 assert_eq!(p4[2].num_rows(), 1);
1305 assert_eq!(p5.len(), 4);
1307 assert_eq!(p5[0].num_rows(), 1);
1308 assert_eq!(p5[1].num_rows(), 1);
1309 assert_eq!(p5[2].num_rows(), 0);
1310 assert_eq!(p5[3].num_rows(), 0);
1311
1312 Ok(())
1313 }
1314
1315 #[test]
1316 fn test_repartition_with_sort_information() -> Result<()> {
1317 let schema = schema();
1318 let sort_key: LexOrdering =
1319 [PhysicalSortExpr::new_default(col("c", &schema)?)].into();
1320 let has_sort = vec![sort_key.clone()];
1321 let output_ordering = Some(sort_key);
1322
1323 run_all_test_scenarios(output_ordering.clone(), has_sort.clone())?;
1325
1326 let target_partitions = 3;
1328 let mem_src_config =
1329 memorysrcconfig_2_partition_with_different_sized_batches(has_sort)?;
1330 let partitioned_datasrc = mem_src_config.clone().repartitioned(
1331 target_partitions,
1332 usize::MAX,
1333 output_ordering.clone(),
1334 )?;
1335 assert_partitioning(partitioned_datasrc.clone(), Some(3));
1336 let Some(output_ord) = output_ordering else {
1339 unreachable!()
1340 };
1341 let repartitioned_raw_batches = mem_src_config
1342 .repartition_preserving_order(target_partitions, output_ord)?
1343 .unwrap();
1344 assert_eq!(repartitioned_raw_batches.len(), 3);
1345 let [ref p1, ref p2, ref p3] = repartitioned_raw_batches[..] else {
1346 unreachable!()
1347 };
1348 assert_eq!(p1.len(), 1);
1350 assert_eq!(p1[0].num_rows(), 100_000);
1351 assert_eq!(p2.len(), 2);
1353 assert_eq!(p2[0].num_rows(), 10_000);
1354 assert_eq!(p2[1].num_rows(), 1_000);
1355 assert_eq!(p3.len(), 2);
1357 assert_eq!(p3[0].num_rows(), 2_000);
1358 assert_eq!(p3[1].num_rows(), 20);
1359
1360 Ok(())
1361 }
1362
1363 #[test]
1364 fn test_repartition_with_batch_ordering_not_matching_sizing() -> Result<()> {
1365 let schema = schema();
1366 let sort_key: LexOrdering =
1367 [PhysicalSortExpr::new_default(col("c", &schema)?)].into();
1368 let has_sort = vec![sort_key.clone()];
1369 let output_ordering = Some(sort_key);
1370
1371 let target_partitions = 2;
1374 let mem_src_config =
1375 memorysrcconfig_1_partition_with_ordering_not_matching_size(has_sort)?;
1376 let partitioned_datasrc = mem_src_config.clone().repartitioned(
1377 target_partitions,
1378 usize::MAX,
1379 output_ordering,
1380 )?;
1381 assert_partitioning(partitioned_datasrc.clone(), Some(2));
1382 let partitioned_datasrc = partitioned_datasrc.unwrap();
1385 let Some(mem_src_config) = partitioned_datasrc
1386 .as_any()
1387 .downcast_ref::<MemorySourceConfig>()
1388 else {
1389 unreachable!()
1390 };
1391 let repartitioned_raw_batches = mem_src_config.partitions.clone();
1392 assert_eq!(repartitioned_raw_batches.len(), 2);
1393 let [ref p1, ref p2] = repartitioned_raw_batches[..] else {
1394 unreachable!()
1395 };
1396 assert_eq!(p1.len(), 1);
1398 assert_eq!(p1[0].num_rows(), 100_000);
1399 assert_eq!(p2.len(), 3);
1401 assert_eq!(p2[0].num_rows(), 1);
1402 assert_eq!(p2[1].num_rows(), 100);
1403 assert_eq!(p2[2].num_rows(), 10_000);
1404
1405 Ok(())
1406 }
1407}