1use std::any::Any;
19use std::cmp::Ordering;
20use std::collections::BinaryHeap;
21use std::fmt;
22use std::fmt::Debug;
23use std::ops::Deref;
24use std::slice::from_ref;
25use std::sync::Arc;
26
27use crate::sink::DataSink;
28use crate::source::{DataSource, DataSourceExec};
29
30use arrow::array::{RecordBatch, RecordBatchOptions};
31use arrow::datatypes::{Schema, SchemaRef};
32use datafusion_common::{internal_err, plan_err, project_schema, Result, ScalarValue};
33use datafusion_execution::TaskContext;
34use datafusion_physical_expr::equivalence::project_orderings;
35use datafusion_physical_expr::utils::collect_columns;
36use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
37use datafusion_physical_plan::memory::MemoryStream;
38use datafusion_physical_plan::projection::{
39 all_alias_free_columns, new_projections_for_columns, ProjectionExpr,
40};
41use datafusion_physical_plan::{
42 common, ColumnarValue, DisplayAs, DisplayFormatType, Partitioning, PhysicalExpr,
43 SendableRecordBatchStream, Statistics,
44};
45
46use async_trait::async_trait;
47use datafusion_physical_plan::coop::cooperative;
48use datafusion_physical_plan::execution_plan::SchedulingType;
49use futures::StreamExt;
50use itertools::Itertools;
51use tokio::sync::RwLock;
52
53#[derive(Clone, Debug)]
55pub struct MemorySourceConfig {
56 partitions: Vec<Vec<RecordBatch>>,
60 schema: SchemaRef,
62 projected_schema: SchemaRef,
64 projection: Option<Vec<usize>>,
66 sort_information: Vec<LexOrdering>,
68 show_sizes: bool,
70 fetch: Option<usize>,
73}
74
75impl DataSource for MemorySourceConfig {
76 fn open(
77 &self,
78 partition: usize,
79 _context: Arc<TaskContext>,
80 ) -> Result<SendableRecordBatchStream> {
81 Ok(Box::pin(cooperative(
82 MemoryStream::try_new(
83 self.partitions[partition].clone(),
84 Arc::clone(&self.projected_schema),
85 self.projection.clone(),
86 )?
87 .with_fetch(self.fetch),
88 )))
89 }
90
91 fn as_any(&self) -> &dyn Any {
92 self
93 }
94
95 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
96 match t {
97 DisplayFormatType::Default | DisplayFormatType::Verbose => {
98 let partition_sizes: Vec<_> =
99 self.partitions.iter().map(|b| b.len()).collect();
100
101 let output_ordering = self
102 .sort_information
103 .first()
104 .map(|output_ordering| format!(", output_ordering={output_ordering}"))
105 .unwrap_or_default();
106
107 let eq_properties = self.eq_properties();
108 let constraints = eq_properties.constraints();
109 let constraints = if constraints.is_empty() {
110 String::new()
111 } else {
112 format!(", {constraints}")
113 };
114
115 let limit = self
116 .fetch
117 .map_or(String::new(), |limit| format!(", fetch={limit}"));
118 if self.show_sizes {
119 write!(
120 f,
121 "partitions={}, partition_sizes={partition_sizes:?}{limit}{output_ordering}{constraints}",
122 partition_sizes.len(),
123 )
124 } else {
125 write!(
126 f,
127 "partitions={}{limit}{output_ordering}{constraints}",
128 partition_sizes.len(),
129 )
130 }
131 }
132 DisplayFormatType::TreeRender => {
133 let total_rows = self.partitions.iter().map(|b| b.len()).sum::<usize>();
134 let total_bytes: usize = self
135 .partitions
136 .iter()
137 .flatten()
138 .map(|batch| batch.get_array_memory_size())
139 .sum();
140 writeln!(f, "format=memory")?;
141 writeln!(f, "rows={total_rows}")?;
142 writeln!(f, "bytes={total_bytes}")?;
143 Ok(())
144 }
145 }
146 }
147
148 fn repartitioned(
153 &self,
154 target_partitions: usize,
155 _repartition_file_min_size: usize,
156 output_ordering: Option<LexOrdering>,
157 ) -> Result<Option<Arc<dyn DataSource>>> {
158 if self.partitions.is_empty() || self.partitions.len() >= target_partitions
159 {
161 return Ok(None);
162 }
163
164 let maybe_repartitioned = if let Some(output_ordering) = output_ordering {
165 self.repartition_preserving_order(target_partitions, output_ordering)?
166 } else {
167 self.repartition_evenly_by_size(target_partitions)?
168 };
169
170 if let Some(repartitioned) = maybe_repartitioned {
171 Ok(Some(Arc::new(Self::try_new(
172 &repartitioned,
173 self.original_schema(),
174 self.projection.clone(),
175 )?)))
176 } else {
177 Ok(None)
178 }
179 }
180
181 fn output_partitioning(&self) -> Partitioning {
182 Partitioning::UnknownPartitioning(self.partitions.len())
183 }
184
185 fn eq_properties(&self) -> EquivalenceProperties {
186 EquivalenceProperties::new_with_orderings(
187 Arc::clone(&self.projected_schema),
188 self.sort_information.clone(),
189 )
190 }
191
192 fn scheduling_type(&self) -> SchedulingType {
193 SchedulingType::Cooperative
194 }
195
196 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
197 if let Some(partition) = partition {
198 if let Some(batches) = self.partitions.get(partition) {
200 Ok(common::compute_record_batch_statistics(
201 from_ref(batches),
202 &self.schema,
203 self.projection.clone(),
204 ))
205 } else {
206 Ok(Statistics::new_unknown(&self.projected_schema))
208 }
209 } else {
210 Ok(common::compute_record_batch_statistics(
212 &self.partitions,
213 &self.schema,
214 self.projection.clone(),
215 ))
216 }
217 }
218
219 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
220 let source = self.clone();
221 Some(Arc::new(source.with_limit(limit)))
222 }
223
224 fn fetch(&self) -> Option<usize> {
225 self.fetch
226 }
227
228 fn try_swapping_with_projection(
229 &self,
230 projection: &[ProjectionExpr],
231 ) -> Result<Option<Arc<dyn DataSource>>> {
232 all_alias_free_columns(projection)
235 .then(|| {
236 let all_projections = (0..self.schema.fields().len()).collect();
237 let new_projections = new_projections_for_columns(
238 projection,
239 self.projection().as_ref().unwrap_or(&all_projections),
240 );
241
242 MemorySourceConfig::try_new(
243 self.partitions(),
244 self.original_schema(),
245 Some(new_projections),
246 )
247 .map(|s| Arc::new(s) as Arc<dyn DataSource>)
248 })
249 .transpose()
250 }
251}
252
253impl MemorySourceConfig {
254 pub fn try_new(
257 partitions: &[Vec<RecordBatch>],
258 schema: SchemaRef,
259 projection: Option<Vec<usize>>,
260 ) -> Result<Self> {
261 let projected_schema = project_schema(&schema, projection.as_ref())?;
262 Ok(Self {
263 partitions: partitions.to_vec(),
264 schema,
265 projected_schema,
266 projection,
267 sort_information: vec![],
268 show_sizes: true,
269 fetch: None,
270 })
271 }
272
273 pub fn try_new_exec(
276 partitions: &[Vec<RecordBatch>],
277 schema: SchemaRef,
278 projection: Option<Vec<usize>>,
279 ) -> Result<Arc<DataSourceExec>> {
280 let source = Self::try_new(partitions, schema, projection)?;
281 Ok(DataSourceExec::from_data_source(source))
282 }
283
284 pub fn try_new_as_values(
286 schema: SchemaRef,
287 data: Vec<Vec<Arc<dyn PhysicalExpr>>>,
288 ) -> Result<Arc<DataSourceExec>> {
289 if data.is_empty() {
290 return plan_err!("Values list cannot be empty");
291 }
292
293 let n_row = data.len();
294 let n_col = schema.fields().len();
295
296 let placeholder_schema = Arc::new(Schema::empty());
299 let placeholder_batch = RecordBatch::try_new_with_options(
300 Arc::clone(&placeholder_schema),
301 vec![],
302 &RecordBatchOptions::new().with_row_count(Some(1)),
303 )?;
304
305 let arrays = (0..n_col)
307 .map(|j| {
308 (0..n_row)
309 .map(|i| {
310 let expr = &data[i][j];
311 let result = expr.evaluate(&placeholder_batch)?;
312
313 match result {
314 ColumnarValue::Scalar(scalar) => Ok(scalar),
315 ColumnarValue::Array(array) if array.len() == 1 => {
316 ScalarValue::try_from_array(&array, 0)
317 }
318 ColumnarValue::Array(_) => {
319 plan_err!("Cannot have array values in a values list")
320 }
321 }
322 })
323 .collect::<Result<Vec<_>>>()
324 .and_then(ScalarValue::iter_to_array)
325 })
326 .collect::<Result<Vec<_>>>()?;
327
328 let batch = RecordBatch::try_new_with_options(
329 Arc::clone(&schema),
330 arrays,
331 &RecordBatchOptions::new().with_row_count(Some(n_row)),
332 )?;
333
334 let partitions = vec![batch];
335 Self::try_new_from_batches(Arc::clone(&schema), partitions)
336 }
337
338 pub fn try_new_from_batches(
343 schema: SchemaRef,
344 batches: Vec<RecordBatch>,
345 ) -> Result<Arc<DataSourceExec>> {
346 if batches.is_empty() {
347 return plan_err!("Values list cannot be empty");
348 }
349
350 for batch in &batches {
351 let batch_schema = batch.schema();
352 if batch_schema != schema {
353 return plan_err!(
354 "Batch has invalid schema. Expected: {}, got: {}",
355 schema,
356 batch_schema
357 );
358 }
359 }
360
361 let partitions = vec![batches];
362 let source = Self {
363 partitions,
364 schema: Arc::clone(&schema),
365 projected_schema: Arc::clone(&schema),
366 projection: None,
367 sort_information: vec![],
368 show_sizes: true,
369 fetch: None,
370 };
371 Ok(DataSourceExec::from_data_source(source))
372 }
373
374 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
376 self.fetch = limit;
377 self
378 }
379
380 pub fn with_show_sizes(mut self, show_sizes: bool) -> Self {
382 self.show_sizes = show_sizes;
383 self
384 }
385
386 pub fn partitions(&self) -> &[Vec<RecordBatch>] {
388 &self.partitions
389 }
390
391 pub fn projection(&self) -> &Option<Vec<usize>> {
393 &self.projection
394 }
395
396 pub fn show_sizes(&self) -> bool {
398 self.show_sizes
399 }
400
401 pub fn sort_information(&self) -> &[LexOrdering] {
403 &self.sort_information
404 }
405
406 pub fn try_with_sort_information(
426 mut self,
427 mut sort_information: Vec<LexOrdering>,
428 ) -> Result<Self> {
429 let fields = self.schema.fields();
431 let ambiguous_column = sort_information
432 .iter()
433 .flat_map(|ordering| ordering.clone())
434 .flat_map(|expr| collect_columns(&expr.expr))
435 .find(|col| {
436 fields
437 .get(col.index())
438 .map(|field| field.name() != col.name())
439 .unwrap_or(true)
440 });
441 if let Some(col) = ambiguous_column {
442 return internal_err!(
443 "Column {:?} is not found in the original schema of the MemorySourceConfig",
444 col
445 );
446 }
447
448 if self.projection.is_some() {
450 sort_information =
451 project_orderings(&sort_information, &self.projected_schema);
452 }
453
454 self.sort_information = sort_information;
455 Ok(self)
456 }
457
458 pub fn original_schema(&self) -> SchemaRef {
460 Arc::clone(&self.schema)
461 }
462
463 fn repartition_preserving_order(
469 &self,
470 target_partitions: usize,
471 output_ordering: LexOrdering,
472 ) -> Result<Option<Vec<Vec<RecordBatch>>>> {
473 if !self.eq_properties().ordering_satisfy(output_ordering)? {
474 Ok(None)
475 } else {
476 let total_num_batches =
477 self.partitions.iter().map(|b| b.len()).sum::<usize>();
478 if total_num_batches < target_partitions {
479 return Ok(None);
481 }
482
483 let cnt_to_repartition = target_partitions - self.partitions.len();
484
485 let to_repartition = self
488 .partitions
489 .iter()
490 .enumerate()
491 .map(|(idx, batches)| RePartition {
492 idx: idx + (cnt_to_repartition * idx), row_count: batches.iter().map(|batch| batch.num_rows()).sum(),
494 batches: batches.clone(),
495 })
496 .collect_vec();
497
498 let mut max_heap = BinaryHeap::with_capacity(target_partitions);
501 for rep in to_repartition {
502 max_heap.push(CompareByRowCount(rep));
503 }
504
505 let mut cannot_split_further = Vec::with_capacity(target_partitions);
508 for _ in 0..cnt_to_repartition {
509 loop {
511 let Some(to_split) = max_heap.pop() else {
513 break;
515 };
516
517 let mut new_partitions = to_split.into_inner().split();
519 if new_partitions.len() > 1 {
520 for new_partition in new_partitions {
521 max_heap.push(CompareByRowCount(new_partition));
522 }
523 break;
525 } else {
526 cannot_split_further.push(new_partitions.remove(0));
527 }
528 }
529 }
530 let mut partitions = max_heap
531 .drain()
532 .map(CompareByRowCount::into_inner)
533 .collect_vec();
534 partitions.extend(cannot_split_further);
535
536 partitions.sort_by_key(|p| p.idx);
539 let partitions = partitions.into_iter().map(|rep| rep.batches).collect_vec();
540
541 Ok(Some(partitions))
542 }
543 }
544
545 fn repartition_evenly_by_size(
554 &self,
555 target_partitions: usize,
556 ) -> Result<Option<Vec<Vec<RecordBatch>>>> {
557 let mut flatten_batches =
559 self.partitions.clone().into_iter().flatten().collect_vec();
560 if flatten_batches.len() < target_partitions {
561 return Ok(None);
562 }
563
564 let total_num_rows = flatten_batches.iter().map(|b| b.num_rows()).sum::<usize>();
566 flatten_batches.sort_by_key(|b| std::cmp::Reverse(b.num_rows()));
568
569 let mut partitions =
571 vec![Vec::with_capacity(flatten_batches.len()); target_partitions];
572 let mut target_partition_size = total_num_rows.div_ceil(target_partitions);
573 let mut total_rows_seen = 0;
574 let mut curr_bin_row_count = 0;
575 let mut idx = 0;
576 for batch in flatten_batches {
577 let row_cnt = batch.num_rows();
578 idx = std::cmp::min(idx, target_partitions - 1);
579
580 partitions[idx].push(batch);
581 curr_bin_row_count += row_cnt;
582 total_rows_seen += row_cnt;
583
584 if curr_bin_row_count >= target_partition_size {
585 idx += 1;
586 curr_bin_row_count = 0;
587
588 if total_rows_seen < total_num_rows {
591 target_partition_size = (total_num_rows - total_rows_seen)
592 .div_ceil(target_partitions - idx);
593 }
594 }
595 }
596
597 Ok(Some(partitions))
598 }
599}
600
601struct RePartition {
605 idx: usize,
607 row_count: usize,
610 batches: Vec<RecordBatch>,
612}
613
614impl RePartition {
615 fn split(self) -> Vec<Self> {
619 if self.batches.len() == 1 {
620 return vec![self];
621 }
622
623 let new_0 = RePartition {
624 idx: self.idx, row_count: 0,
626 batches: vec![],
627 };
628 let new_1 = RePartition {
629 idx: self.idx + 1, row_count: 0,
631 batches: vec![],
632 };
633 let split_pt = self.row_count / 2;
634
635 let [new_0, new_1] = self.batches.into_iter().fold(
636 [new_0, new_1],
637 |[mut new0, mut new1], batch| {
638 if new0.row_count < split_pt {
639 new0.add_batch(batch);
640 } else {
641 new1.add_batch(batch);
642 }
643 [new0, new1]
644 },
645 );
646 vec![new_0, new_1]
647 }
648
649 fn add_batch(&mut self, batch: RecordBatch) {
650 self.row_count += batch.num_rows();
651 self.batches.push(batch);
652 }
653}
654
655impl fmt::Display for RePartition {
656 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
657 write!(
658 f,
659 "{}rows-in-{}batches@{}",
660 self.row_count,
661 self.batches.len(),
662 self.idx
663 )
664 }
665}
666
667struct CompareByRowCount(RePartition);
668impl CompareByRowCount {
669 fn into_inner(self) -> RePartition {
670 self.0
671 }
672}
673impl Ord for CompareByRowCount {
674 fn cmp(&self, other: &Self) -> Ordering {
675 self.0.row_count.cmp(&other.0.row_count)
676 }
677}
678impl PartialOrd for CompareByRowCount {
679 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
680 Some(self.cmp(other))
681 }
682}
683impl PartialEq for CompareByRowCount {
684 fn eq(&self, other: &Self) -> bool {
685 self.cmp(other) == Ordering::Equal
687 }
688}
689impl Eq for CompareByRowCount {}
690impl Deref for CompareByRowCount {
691 type Target = RePartition;
692 fn deref(&self) -> &Self::Target {
693 &self.0
694 }
695}
696
697pub type PartitionData = Arc<RwLock<Vec<RecordBatch>>>;
699
700pub struct MemSink {
704 batches: Vec<PartitionData>,
706 schema: SchemaRef,
707}
708
709impl Debug for MemSink {
710 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
711 f.debug_struct("MemSink")
712 .field("num_partitions", &self.batches.len())
713 .finish()
714 }
715}
716
717impl DisplayAs for MemSink {
718 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
719 match t {
720 DisplayFormatType::Default | DisplayFormatType::Verbose => {
721 let partition_count = self.batches.len();
722 write!(f, "MemoryTable (partitions={partition_count})")
723 }
724 DisplayFormatType::TreeRender => {
725 write!(f, "")
727 }
728 }
729 }
730}
731
732impl MemSink {
733 pub fn try_new(batches: Vec<PartitionData>, schema: SchemaRef) -> Result<Self> {
737 if batches.is_empty() {
738 return plan_err!("Cannot insert into MemTable with zero partitions");
739 }
740 Ok(Self { batches, schema })
741 }
742}
743
744#[async_trait]
745impl DataSink for MemSink {
746 fn as_any(&self) -> &dyn Any {
747 self
748 }
749
750 fn schema(&self) -> &SchemaRef {
751 &self.schema
752 }
753
754 async fn write_all(
755 &self,
756 mut data: SendableRecordBatchStream,
757 _context: &Arc<TaskContext>,
758 ) -> Result<u64> {
759 let num_partitions = self.batches.len();
760
761 let mut new_batches = vec![vec![]; num_partitions];
764 let mut i = 0;
765 let mut row_count = 0;
766 while let Some(batch) = data.next().await.transpose()? {
767 row_count += batch.num_rows();
768 new_batches[i].push(batch);
769 i = (i + 1) % num_partitions;
770 }
771
772 for (target, mut batches) in self.batches.iter().zip(new_batches.into_iter()) {
774 target.write().await.append(&mut batches);
776 }
777
778 Ok(row_count as u64)
779 }
780}
781
782#[cfg(test)]
783mod memory_source_tests {
784 use std::sync::Arc;
785
786 use crate::memory::MemorySourceConfig;
787 use crate::source::DataSourceExec;
788
789 use arrow::compute::SortOptions;
790 use arrow::datatypes::{DataType, Field, Schema};
791 use datafusion_common::Result;
792 use datafusion_physical_expr::expressions::col;
793 use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
794 use datafusion_physical_plan::ExecutionPlan;
795
796 #[test]
797 fn test_memory_order_eq() -> Result<()> {
798 let schema = Arc::new(Schema::new(vec![
799 Field::new("a", DataType::Int64, false),
800 Field::new("b", DataType::Int64, false),
801 Field::new("c", DataType::Int64, false),
802 ]));
803 let sort1: LexOrdering = [
804 PhysicalSortExpr {
805 expr: col("a", &schema)?,
806 options: SortOptions::default(),
807 },
808 PhysicalSortExpr {
809 expr: col("b", &schema)?,
810 options: SortOptions::default(),
811 },
812 ]
813 .into();
814 let sort2: LexOrdering = [PhysicalSortExpr {
815 expr: col("c", &schema)?,
816 options: SortOptions::default(),
817 }]
818 .into();
819 let mut expected_output_order = sort1.clone();
820 expected_output_order.extend(sort2.clone());
821
822 let sort_information = vec![sort1.clone(), sort2.clone()];
823 let mem_exec = DataSourceExec::from_data_source(
824 MemorySourceConfig::try_new(&[vec![]], schema, None)?
825 .try_with_sort_information(sort_information)?,
826 );
827
828 assert_eq!(
829 mem_exec.properties().output_ordering().unwrap(),
830 &expected_output_order
831 );
832 let eq_properties = mem_exec.properties().equivalence_properties();
833 assert!(eq_properties.oeq_class().contains(&sort1));
834 assert!(eq_properties.oeq_class().contains(&sort2));
835 Ok(())
836 }
837}
838
839#[cfg(test)]
840mod tests {
841 use super::*;
842 use crate::test_util::col;
843 use crate::tests::{aggr_test_schema, make_partition};
844
845 use arrow::array::{ArrayRef, Int32Array, Int64Array, StringArray};
846 use arrow::datatypes::{DataType, Field};
847 use datafusion_common::assert_batches_eq;
848 use datafusion_common::stats::{ColumnStatistics, Precision};
849 use datafusion_physical_expr::PhysicalSortExpr;
850 use datafusion_physical_plan::expressions::lit;
851
852 use datafusion_physical_plan::ExecutionPlan;
853 use futures::StreamExt;
854
855 #[tokio::test]
856 async fn exec_with_limit() -> Result<()> {
857 let task_ctx = Arc::new(TaskContext::default());
858 let batch = make_partition(7);
859 let schema = batch.schema();
860 let batches = vec![batch.clone(), batch];
861
862 let exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap();
863 assert_eq!(exec.fetch(), None);
864
865 let exec = exec.with_fetch(Some(4)).unwrap();
866 assert_eq!(exec.fetch(), Some(4));
867
868 let mut it = exec.execute(0, task_ctx)?;
869 let mut results = vec![];
870 while let Some(batch) = it.next().await {
871 results.push(batch?);
872 }
873
874 let expected = [
875 "+---+", "| i |", "+---+", "| 0 |", "| 1 |", "| 2 |", "| 3 |", "+---+",
876 ];
877 assert_batches_eq!(expected, &results);
878 Ok(())
879 }
880
881 #[tokio::test]
882 async fn values_empty_case() -> Result<()> {
883 let schema = aggr_test_schema();
884 let empty = MemorySourceConfig::try_new_as_values(schema, vec![]);
885 assert!(empty.is_err());
886 Ok(())
887 }
888
889 #[test]
890 fn new_exec_with_batches() {
891 let batch = make_partition(7);
892 let schema = batch.schema();
893 let batches = vec![batch.clone(), batch];
894 let _exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap();
895 }
896
897 #[test]
898 fn new_exec_with_batches_empty() {
899 let batch = make_partition(7);
900 let schema = batch.schema();
901 let _ = MemorySourceConfig::try_new_from_batches(schema, Vec::new()).unwrap_err();
902 }
903
904 #[test]
905 fn new_exec_with_batches_invalid_schema() {
906 let batch = make_partition(7);
907 let batches = vec![batch.clone(), batch];
908
909 let invalid_schema = Arc::new(Schema::new(vec![
910 Field::new("col0", DataType::UInt32, false),
911 Field::new("col1", DataType::Utf8, false),
912 ]));
913 let _ = MemorySourceConfig::try_new_from_batches(invalid_schema, batches)
914 .unwrap_err();
915 }
916
917 #[test]
919 fn new_exec_with_non_nullable_schema() {
920 let schema = Arc::new(Schema::new(vec![Field::new(
921 "col0",
922 DataType::UInt32,
923 false,
924 )]));
925 let _ = MemorySourceConfig::try_new_as_values(
926 Arc::clone(&schema),
927 vec![vec![lit(1u32)]],
928 )
929 .unwrap();
930 let _ = MemorySourceConfig::try_new_as_values(
932 schema,
933 vec![vec![lit(ScalarValue::UInt32(None))]],
934 )
935 .unwrap_err();
936 }
937
938 #[test]
939 fn values_stats_with_nulls_only() -> Result<()> {
940 let data = vec![
941 vec![lit(ScalarValue::Null)],
942 vec![lit(ScalarValue::Null)],
943 vec![lit(ScalarValue::Null)],
944 ];
945 let rows = data.len();
946 let values = MemorySourceConfig::try_new_as_values(
947 Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)])),
948 data,
949 )?;
950
951 assert_eq!(
952 values.partition_statistics(None)?,
953 Statistics {
954 num_rows: Precision::Exact(rows),
955 total_byte_size: Precision::Exact(8), column_statistics: vec![ColumnStatistics {
957 null_count: Precision::Exact(rows), distinct_count: Precision::Absent,
959 max_value: Precision::Absent,
960 min_value: Precision::Absent,
961 sum_value: Precision::Absent,
962 },],
963 }
964 );
965
966 Ok(())
967 }
968
969 fn batch(row_size: usize) -> RecordBatch {
970 let a: ArrayRef = Arc::new(Int32Array::from(vec![1; row_size]));
971 let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("foo"); row_size]));
972 let c: ArrayRef = Arc::new(Int64Array::from_iter(vec![1; row_size]));
973 RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
974 }
975
976 fn schema() -> SchemaRef {
977 batch(1).schema()
978 }
979
980 fn memorysrcconfig_no_partitions(
981 sort_information: Vec<LexOrdering>,
982 ) -> Result<MemorySourceConfig> {
983 let partitions = vec![];
984 MemorySourceConfig::try_new(&partitions, schema(), None)?
985 .try_with_sort_information(sort_information)
986 }
987
988 fn memorysrcconfig_1_partition_1_batch(
989 sort_information: Vec<LexOrdering>,
990 ) -> Result<MemorySourceConfig> {
991 let partitions = vec![vec![batch(100)]];
992 MemorySourceConfig::try_new(&partitions, schema(), None)?
993 .try_with_sort_information(sort_information)
994 }
995
996 fn memorysrcconfig_3_partitions_1_batch_each(
997 sort_information: Vec<LexOrdering>,
998 ) -> Result<MemorySourceConfig> {
999 let partitions = vec![vec![batch(100)], vec![batch(100)], vec![batch(100)]];
1000 MemorySourceConfig::try_new(&partitions, schema(), None)?
1001 .try_with_sort_information(sort_information)
1002 }
1003
1004 fn memorysrcconfig_3_partitions_with_2_batches_each(
1005 sort_information: Vec<LexOrdering>,
1006 ) -> Result<MemorySourceConfig> {
1007 let partitions = vec![
1008 vec![batch(100), batch(100)],
1009 vec![batch(100), batch(100)],
1010 vec![batch(100), batch(100)],
1011 ];
1012 MemorySourceConfig::try_new(&partitions, schema(), None)?
1013 .try_with_sort_information(sort_information)
1014 }
1015
1016 fn memorysrcconfig_1_partition_with_different_sized_batches(
1019 sort_information: Vec<LexOrdering>,
1020 ) -> Result<MemorySourceConfig> {
1021 let partitions = vec![vec![batch(100_000), batch(10_000), batch(100), batch(1)]];
1022 MemorySourceConfig::try_new(&partitions, schema(), None)?
1023 .try_with_sort_information(sort_information)
1024 }
1025
1026 fn memorysrcconfig_1_partition_with_ordering_not_matching_size(
1030 sort_information: Vec<LexOrdering>,
1031 ) -> Result<MemorySourceConfig> {
1032 let partitions = vec![vec![batch(100_000), batch(1), batch(100), batch(10_000)]];
1033 MemorySourceConfig::try_new(&partitions, schema(), None)?
1034 .try_with_sort_information(sort_information)
1035 }
1036
1037 fn memorysrcconfig_2_partition_with_different_sized_batches(
1038 sort_information: Vec<LexOrdering>,
1039 ) -> Result<MemorySourceConfig> {
1040 let partitions = vec![
1041 vec![batch(100_000), batch(10_000), batch(1_000)],
1042 vec![batch(2_000), batch(20)],
1043 ];
1044 MemorySourceConfig::try_new(&partitions, schema(), None)?
1045 .try_with_sort_information(sort_information)
1046 }
1047
1048 fn memorysrcconfig_2_partition_with_extreme_sized_batches(
1049 sort_information: Vec<LexOrdering>,
1050 ) -> Result<MemorySourceConfig> {
1051 let partitions = vec![
1052 vec![
1053 batch(100_000),
1054 batch(1),
1055 batch(1),
1056 batch(1),
1057 batch(1),
1058 batch(0),
1059 ],
1060 vec![batch(1), batch(1), batch(1), batch(1), batch(0), batch(100)],
1061 ];
1062 MemorySourceConfig::try_new(&partitions, schema(), None)?
1063 .try_with_sort_information(sort_information)
1064 }
1065
1066 fn assert_partitioning(
1070 partitioned_datasrc: Option<Arc<dyn DataSource>>,
1071 partition_cnt: Option<usize>,
1072 ) {
1073 let should_exist = if let Some(partition_cnt) = partition_cnt {
1074 format!("new datasource should exist and have {partition_cnt:?} partitions")
1075 } else {
1076 "new datasource should not exist".into()
1077 };
1078
1079 let actual = partitioned_datasrc
1080 .map(|datasrc| datasrc.output_partitioning().partition_count());
1081 assert_eq!(
1082 actual,
1083 partition_cnt,
1084 "partitioned datasrc does not match expected, we expected {should_exist}, instead found {actual:?}"
1085 );
1086 }
1087
1088 fn run_all_test_scenarios(
1089 output_ordering: Option<LexOrdering>,
1090 sort_information_on_config: Vec<LexOrdering>,
1091 ) -> Result<()> {
1092 let not_used = usize::MAX;
1093
1094 let mem_src_config =
1096 memorysrcconfig_no_partitions(sort_information_on_config.clone())?;
1097 let partitioned_datasrc =
1098 mem_src_config.repartitioned(1, not_used, output_ordering.clone())?;
1099 assert_partitioning(partitioned_datasrc, None);
1100
1101 let target_partitions = 1;
1103 let mem_src_config =
1104 memorysrcconfig_1_partition_1_batch(sort_information_on_config.clone())?;
1105 let partitioned_datasrc = mem_src_config.repartitioned(
1106 target_partitions,
1107 not_used,
1108 output_ordering.clone(),
1109 )?;
1110 assert_partitioning(partitioned_datasrc, None);
1111
1112 let target_partitions = 3;
1114 let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
1115 sort_information_on_config.clone(),
1116 )?;
1117 let partitioned_datasrc = mem_src_config.repartitioned(
1118 target_partitions,
1119 not_used,
1120 output_ordering.clone(),
1121 )?;
1122 assert_partitioning(partitioned_datasrc, None);
1123
1124 let target_partitions = 2;
1126 let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
1127 sort_information_on_config.clone(),
1128 )?;
1129 let partitioned_datasrc = mem_src_config.repartitioned(
1130 target_partitions,
1131 not_used,
1132 output_ordering.clone(),
1133 )?;
1134 assert_partitioning(partitioned_datasrc, None);
1135
1136 let target_partitions = 4;
1138 let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
1139 sort_information_on_config.clone(),
1140 )?;
1141 let partitioned_datasrc = mem_src_config.repartitioned(
1142 target_partitions,
1143 not_used,
1144 output_ordering.clone(),
1145 )?;
1146 assert_partitioning(partitioned_datasrc, None);
1147
1148 let target_partitions = 5;
1151 let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
1152 sort_information_on_config.clone(),
1153 )?;
1154 let partitioned_datasrc = mem_src_config.repartitioned(
1155 target_partitions,
1156 not_used,
1157 output_ordering.clone(),
1158 )?;
1159 assert_partitioning(partitioned_datasrc, Some(5));
1160
1161 let target_partitions = 6;
1164 let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
1165 sort_information_on_config.clone(),
1166 )?;
1167 let partitioned_datasrc = mem_src_config.repartitioned(
1168 target_partitions,
1169 not_used,
1170 output_ordering.clone(),
1171 )?;
1172 assert_partitioning(partitioned_datasrc, Some(6));
1173
1174 let target_partitions = 3 * 2 + 1;
1176 let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
1177 sort_information_on_config.clone(),
1178 )?;
1179 let partitioned_datasrc = mem_src_config.repartitioned(
1180 target_partitions,
1181 not_used,
1182 output_ordering.clone(),
1183 )?;
1184 assert_partitioning(partitioned_datasrc, None);
1185
1186 let target_partitions = 2;
1189 let mem_src_config = memorysrcconfig_1_partition_with_different_sized_batches(
1190 sort_information_on_config,
1191 )?;
1192 let partitioned_datasrc = mem_src_config.clone().repartitioned(
1193 target_partitions,
1194 not_used,
1195 output_ordering,
1196 )?;
1197 assert_partitioning(partitioned_datasrc.clone(), Some(2));
1198 let partitioned_datasrc = partitioned_datasrc.unwrap();
1201 let Some(mem_src_config) = partitioned_datasrc
1202 .as_any()
1203 .downcast_ref::<MemorySourceConfig>()
1204 else {
1205 unreachable!()
1206 };
1207 let repartitioned_raw_batches = mem_src_config.partitions.clone();
1208 assert_eq!(repartitioned_raw_batches.len(), 2);
1209 let [ref p1, ref p2] = repartitioned_raw_batches[..] else {
1210 unreachable!()
1211 };
1212 assert_eq!(p1.len(), 1);
1214 assert_eq!(p1[0].num_rows(), 100_000);
1215 assert_eq!(p2.len(), 3);
1217 assert_eq!(p2[0].num_rows(), 10_000);
1218 assert_eq!(p2[1].num_rows(), 100);
1219 assert_eq!(p2[2].num_rows(), 1);
1220
1221 Ok(())
1222 }
1223
1224 #[test]
1225 fn test_repartition_no_sort_information_no_output_ordering() -> Result<()> {
1226 let no_sort = vec![];
1227 let no_output_ordering = None;
1228
1229 run_all_test_scenarios(no_output_ordering.clone(), no_sort.clone())?;
1231
1232 let target_partitions = 3;
1236 let mem_src_config =
1237 memorysrcconfig_2_partition_with_different_sized_batches(no_sort)?;
1238 let partitioned_datasrc = mem_src_config.clone().repartitioned(
1239 target_partitions,
1240 usize::MAX,
1241 no_output_ordering,
1242 )?;
1243 assert_partitioning(partitioned_datasrc.clone(), Some(3));
1244 let repartitioned_raw_batches = mem_src_config
1247 .repartition_evenly_by_size(target_partitions)?
1248 .unwrap();
1249 assert_eq!(repartitioned_raw_batches.len(), 3);
1250 let [ref p1, ref p2, ref p3] = repartitioned_raw_batches[..] else {
1251 unreachable!()
1252 };
1253 assert_eq!(p1.len(), 1);
1255 assert_eq!(p1[0].num_rows(), 100_000);
1256 assert_eq!(p2.len(), 1);
1258 assert_eq!(p2[0].num_rows(), 10_000);
1259 assert_eq!(p3.len(), 3);
1261 assert_eq!(p3[0].num_rows(), 2_000);
1262 assert_eq!(p3[1].num_rows(), 1_000);
1263 assert_eq!(p3[2].num_rows(), 20);
1264
1265 Ok(())
1266 }
1267
1268 #[test]
1269 fn test_repartition_no_sort_information_no_output_ordering_lopsized_batches(
1270 ) -> Result<()> {
1271 let no_sort = vec![];
1272 let no_output_ordering = None;
1273
1274 let target_partitions = 5;
1285 let mem_src_config =
1286 memorysrcconfig_2_partition_with_extreme_sized_batches(no_sort)?;
1287 let partitioned_datasrc = mem_src_config.clone().repartitioned(
1288 target_partitions,
1289 usize::MAX,
1290 no_output_ordering,
1291 )?;
1292 assert_partitioning(partitioned_datasrc.clone(), Some(5));
1293 let repartitioned_raw_batches = mem_src_config
1297 .repartition_evenly_by_size(target_partitions)?
1298 .unwrap();
1299 assert_eq!(repartitioned_raw_batches.len(), 5);
1300 let [ref p1, ref p2, ref p3, ref p4, ref p5] = repartitioned_raw_batches[..]
1301 else {
1302 unreachable!()
1303 };
1304 assert_eq!(p1.len(), 1);
1306 assert_eq!(p1[0].num_rows(), 100_000);
1307 assert_eq!(p2.len(), 1);
1309 assert_eq!(p2[0].num_rows(), 100);
1310 assert_eq!(p3.len(), 3);
1312 assert_eq!(p3[0].num_rows(), 1);
1313 assert_eq!(p3[1].num_rows(), 1);
1314 assert_eq!(p3[2].num_rows(), 1);
1315 assert_eq!(p4.len(), 3);
1317 assert_eq!(p4[0].num_rows(), 1);
1318 assert_eq!(p4[1].num_rows(), 1);
1319 assert_eq!(p4[2].num_rows(), 1);
1320 assert_eq!(p5.len(), 4);
1322 assert_eq!(p5[0].num_rows(), 1);
1323 assert_eq!(p5[1].num_rows(), 1);
1324 assert_eq!(p5[2].num_rows(), 0);
1325 assert_eq!(p5[3].num_rows(), 0);
1326
1327 Ok(())
1328 }
1329
1330 #[test]
1331 fn test_repartition_with_sort_information() -> Result<()> {
1332 let schema = schema();
1333 let sort_key: LexOrdering =
1334 [PhysicalSortExpr::new_default(col("c", &schema)?)].into();
1335 let has_sort = vec![sort_key.clone()];
1336 let output_ordering = Some(sort_key);
1337
1338 run_all_test_scenarios(output_ordering.clone(), has_sort.clone())?;
1340
1341 let target_partitions = 3;
1343 let mem_src_config =
1344 memorysrcconfig_2_partition_with_different_sized_batches(has_sort)?;
1345 let partitioned_datasrc = mem_src_config.clone().repartitioned(
1346 target_partitions,
1347 usize::MAX,
1348 output_ordering.clone(),
1349 )?;
1350 assert_partitioning(partitioned_datasrc.clone(), Some(3));
1351 let Some(output_ord) = output_ordering else {
1354 unreachable!()
1355 };
1356 let repartitioned_raw_batches = mem_src_config
1357 .repartition_preserving_order(target_partitions, output_ord)?
1358 .unwrap();
1359 assert_eq!(repartitioned_raw_batches.len(), 3);
1360 let [ref p1, ref p2, ref p3] = repartitioned_raw_batches[..] else {
1361 unreachable!()
1362 };
1363 assert_eq!(p1.len(), 1);
1365 assert_eq!(p1[0].num_rows(), 100_000);
1366 assert_eq!(p2.len(), 2);
1368 assert_eq!(p2[0].num_rows(), 10_000);
1369 assert_eq!(p2[1].num_rows(), 1_000);
1370 assert_eq!(p3.len(), 2);
1372 assert_eq!(p3[0].num_rows(), 2_000);
1373 assert_eq!(p3[1].num_rows(), 20);
1374
1375 Ok(())
1376 }
1377
1378 #[test]
1379 fn test_repartition_with_batch_ordering_not_matching_sizing() -> Result<()> {
1380 let schema = schema();
1381 let sort_key: LexOrdering =
1382 [PhysicalSortExpr::new_default(col("c", &schema)?)].into();
1383 let has_sort = vec![sort_key.clone()];
1384 let output_ordering = Some(sort_key);
1385
1386 let target_partitions = 2;
1389 let mem_src_config =
1390 memorysrcconfig_1_partition_with_ordering_not_matching_size(has_sort)?;
1391 let partitioned_datasrc = mem_src_config.clone().repartitioned(
1392 target_partitions,
1393 usize::MAX,
1394 output_ordering,
1395 )?;
1396 assert_partitioning(partitioned_datasrc.clone(), Some(2));
1397 let partitioned_datasrc = partitioned_datasrc.unwrap();
1400 let Some(mem_src_config) = partitioned_datasrc
1401 .as_any()
1402 .downcast_ref::<MemorySourceConfig>()
1403 else {
1404 unreachable!()
1405 };
1406 let repartitioned_raw_batches = mem_src_config.partitions.clone();
1407 assert_eq!(repartitioned_raw_batches.len(), 2);
1408 let [ref p1, ref p2] = repartitioned_raw_batches[..] else {
1409 unreachable!()
1410 };
1411 assert_eq!(p1.len(), 1);
1413 assert_eq!(p1[0].num_rows(), 100_000);
1414 assert_eq!(p2.len(), 3);
1416 assert_eq!(p2[0].num_rows(), 1);
1417 assert_eq!(p2[1].num_rows(), 100);
1418 assert_eq!(p2[2].num_rows(), 10_000);
1419
1420 Ok(())
1421 }
1422}