1use std::any::Any;
19use std::cmp::Ordering;
20use std::collections::BinaryHeap;
21use std::fmt;
22use std::fmt::Debug;
23use std::sync::Arc;
24
25use crate::sink::DataSink;
26use crate::source::{DataSource, DataSourceExec};
27use async_trait::async_trait;
28use datafusion_physical_plan::memory::MemoryStream;
29use datafusion_physical_plan::projection::{
30 all_alias_free_columns, new_projections_for_columns, ProjectionExec,
31};
32use datafusion_physical_plan::{
33 common, ColumnarValue, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
34 PhysicalExpr, SendableRecordBatchStream, Statistics,
35};
36
37use arrow::array::{RecordBatch, RecordBatchOptions};
38use arrow::datatypes::{Schema, SchemaRef};
39use datafusion_common::{internal_err, plan_err, project_schema, Result, ScalarValue};
40use datafusion_execution::TaskContext;
41use datafusion_physical_expr::equivalence::ProjectionMapping;
42use datafusion_physical_expr::expressions::Column;
43use datafusion_physical_expr::utils::collect_columns;
44use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
45use futures::StreamExt;
46use itertools::Itertools;
47use tokio::sync::RwLock;
48
49#[derive(Clone, Debug)]
51pub struct MemorySourceConfig {
52 partitions: Vec<Vec<RecordBatch>>,
56 schema: SchemaRef,
58 projected_schema: SchemaRef,
60 projection: Option<Vec<usize>>,
62 sort_information: Vec<LexOrdering>,
64 show_sizes: bool,
66 fetch: Option<usize>,
69}
70
71impl DataSource for MemorySourceConfig {
72 fn open(
73 &self,
74 partition: usize,
75 _context: Arc<TaskContext>,
76 ) -> Result<SendableRecordBatchStream> {
77 Ok(Box::pin(
78 MemoryStream::try_new(
79 self.partitions[partition].clone(),
80 Arc::clone(&self.projected_schema),
81 self.projection.clone(),
82 )?
83 .with_fetch(self.fetch),
84 ))
85 }
86
87 fn as_any(&self) -> &dyn Any {
88 self
89 }
90
91 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
92 match t {
93 DisplayFormatType::Default | DisplayFormatType::Verbose => {
94 let partition_sizes: Vec<_> =
95 self.partitions.iter().map(|b| b.len()).collect();
96
97 let output_ordering = self
98 .sort_information
99 .first()
100 .map(|output_ordering| format!(", output_ordering={output_ordering}"))
101 .unwrap_or_default();
102
103 let eq_properties = self.eq_properties();
104 let constraints = eq_properties.constraints();
105 let constraints = if constraints.is_empty() {
106 String::new()
107 } else {
108 format!(", {constraints}")
109 };
110
111 let limit = self
112 .fetch
113 .map_or(String::new(), |limit| format!(", fetch={limit}"));
114 if self.show_sizes {
115 write!(
116 f,
117 "partitions={}, partition_sizes={partition_sizes:?}{limit}{output_ordering}{constraints}",
118 partition_sizes.len(),
119 )
120 } else {
121 write!(
122 f,
123 "partitions={}{limit}{output_ordering}{constraints}",
124 partition_sizes.len(),
125 )
126 }
127 }
128 DisplayFormatType::TreeRender => {
129 let total_rows = self.partitions.iter().map(|b| b.len()).sum::<usize>();
130 let total_bytes: usize = self
131 .partitions
132 .iter()
133 .flatten()
134 .map(|batch| batch.get_array_memory_size())
135 .sum();
136 writeln!(f, "format=memory")?;
137 writeln!(f, "rows={total_rows}")?;
138 writeln!(f, "bytes={total_bytes}")?;
139 Ok(())
140 }
141 }
142 }
143
144 fn repartitioned(
149 &self,
150 target_partitions: usize,
151 _repartition_file_min_size: usize,
152 output_ordering: Option<LexOrdering>,
153 ) -> Result<Option<Arc<dyn DataSource>>> {
154 if self.partitions.is_empty() || self.partitions.len() >= target_partitions
155 {
157 return Ok(None);
158 }
159
160 let maybe_repartitioned = if let Some(output_ordering) = output_ordering {
161 self.repartition_preserving_order(target_partitions, output_ordering)?
162 } else {
163 self.repartition_evenly_by_size(target_partitions)?
164 };
165
166 if let Some(repartitioned) = maybe_repartitioned {
167 Ok(Some(Arc::new(Self::try_new(
168 &repartitioned,
169 self.original_schema(),
170 self.projection.clone(),
171 )?)))
172 } else {
173 Ok(None)
174 }
175 }
176
177 fn output_partitioning(&self) -> Partitioning {
178 Partitioning::UnknownPartitioning(self.partitions.len())
179 }
180
181 fn eq_properties(&self) -> EquivalenceProperties {
182 EquivalenceProperties::new_with_orderings(
183 Arc::clone(&self.projected_schema),
184 self.sort_information.as_slice(),
185 )
186 }
187
188 fn statistics(&self) -> Result<Statistics> {
189 Ok(common::compute_record_batch_statistics(
190 &self.partitions,
191 &self.schema,
192 self.projection.clone(),
193 ))
194 }
195
196 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
197 let source = self.clone();
198 Some(Arc::new(source.with_limit(limit)))
199 }
200
201 fn fetch(&self) -> Option<usize> {
202 self.fetch
203 }
204
205 fn try_swapping_with_projection(
206 &self,
207 projection: &ProjectionExec,
208 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
209 all_alias_free_columns(projection.expr())
212 .then(|| {
213 let all_projections = (0..self.schema.fields().len()).collect();
214 let new_projections = new_projections_for_columns(
215 projection,
216 self.projection().as_ref().unwrap_or(&all_projections),
217 );
218
219 MemorySourceConfig::try_new_exec(
220 self.partitions(),
221 self.original_schema(),
222 Some(new_projections),
223 )
224 .map(|e| e as _)
225 })
226 .transpose()
227 }
228}
229
230impl MemorySourceConfig {
231 pub fn try_new(
234 partitions: &[Vec<RecordBatch>],
235 schema: SchemaRef,
236 projection: Option<Vec<usize>>,
237 ) -> Result<Self> {
238 let projected_schema = project_schema(&schema, projection.as_ref())?;
239 Ok(Self {
240 partitions: partitions.to_vec(),
241 schema,
242 projected_schema,
243 projection,
244 sort_information: vec![],
245 show_sizes: true,
246 fetch: None,
247 })
248 }
249
250 pub fn try_new_exec(
253 partitions: &[Vec<RecordBatch>],
254 schema: SchemaRef,
255 projection: Option<Vec<usize>>,
256 ) -> Result<Arc<DataSourceExec>> {
257 let source = Self::try_new(partitions, schema, projection)?;
258 Ok(DataSourceExec::from_data_source(source))
259 }
260
261 pub fn try_new_as_values(
263 schema: SchemaRef,
264 data: Vec<Vec<Arc<dyn PhysicalExpr>>>,
265 ) -> Result<Arc<DataSourceExec>> {
266 if data.is_empty() {
267 return plan_err!("Values list cannot be empty");
268 }
269
270 let n_row = data.len();
271 let n_col = schema.fields().len();
272
273 let placeholder_schema = Arc::new(Schema::empty());
276 let placeholder_batch = RecordBatch::try_new_with_options(
277 Arc::clone(&placeholder_schema),
278 vec![],
279 &RecordBatchOptions::new().with_row_count(Some(1)),
280 )?;
281
282 let arrays = (0..n_col)
284 .map(|j| {
285 (0..n_row)
286 .map(|i| {
287 let expr = &data[i][j];
288 let result = expr.evaluate(&placeholder_batch)?;
289
290 match result {
291 ColumnarValue::Scalar(scalar) => Ok(scalar),
292 ColumnarValue::Array(array) if array.len() == 1 => {
293 ScalarValue::try_from_array(&array, 0)
294 }
295 ColumnarValue::Array(_) => {
296 plan_err!("Cannot have array values in a values list")
297 }
298 }
299 })
300 .collect::<Result<Vec<_>>>()
301 .and_then(ScalarValue::iter_to_array)
302 })
303 .collect::<Result<Vec<_>>>()?;
304
305 let batch = RecordBatch::try_new_with_options(
306 Arc::clone(&schema),
307 arrays,
308 &RecordBatchOptions::new().with_row_count(Some(n_row)),
309 )?;
310
311 let partitions = vec![batch];
312 Self::try_new_from_batches(Arc::clone(&schema), partitions)
313 }
314
315 pub fn try_new_from_batches(
320 schema: SchemaRef,
321 batches: Vec<RecordBatch>,
322 ) -> Result<Arc<DataSourceExec>> {
323 if batches.is_empty() {
324 return plan_err!("Values list cannot be empty");
325 }
326
327 for batch in &batches {
328 let batch_schema = batch.schema();
329 if batch_schema != schema {
330 return plan_err!(
331 "Batch has invalid schema. Expected: {}, got: {}",
332 schema,
333 batch_schema
334 );
335 }
336 }
337
338 let partitions = vec![batches];
339 let source = Self {
340 partitions,
341 schema: Arc::clone(&schema),
342 projected_schema: Arc::clone(&schema),
343 projection: None,
344 sort_information: vec![],
345 show_sizes: true,
346 fetch: None,
347 };
348 Ok(DataSourceExec::from_data_source(source))
349 }
350
351 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
353 self.fetch = limit;
354 self
355 }
356
357 pub fn with_show_sizes(mut self, show_sizes: bool) -> Self {
359 self.show_sizes = show_sizes;
360 self
361 }
362
363 pub fn partitions(&self) -> &[Vec<RecordBatch>] {
365 &self.partitions
366 }
367
368 pub fn projection(&self) -> &Option<Vec<usize>> {
370 &self.projection
371 }
372
373 pub fn show_sizes(&self) -> bool {
375 self.show_sizes
376 }
377
378 pub fn sort_information(&self) -> &[LexOrdering] {
380 &self.sort_information
381 }
382
383 pub fn try_with_sort_information(
403 mut self,
404 mut sort_information: Vec<LexOrdering>,
405 ) -> Result<Self> {
406 let fields = self.schema.fields();
408 let ambiguous_column = sort_information
409 .iter()
410 .flat_map(|ordering| ordering.clone())
411 .flat_map(|expr| collect_columns(&expr.expr))
412 .find(|col| {
413 fields
414 .get(col.index())
415 .map(|field| field.name() != col.name())
416 .unwrap_or(true)
417 });
418 if let Some(col) = ambiguous_column {
419 return internal_err!(
420 "Column {:?} is not found in the original schema of the MemorySourceConfig",
421 col
422 );
423 }
424
425 if let Some(projection) = &self.projection {
427 let base_eqp = EquivalenceProperties::new_with_orderings(
428 self.original_schema(),
429 &sort_information,
430 );
431 let proj_exprs = projection
432 .iter()
433 .map(|idx| {
434 let base_schema = self.original_schema();
435 let name = base_schema.field(*idx).name();
436 (Arc::new(Column::new(name, *idx)) as _, name.to_string())
437 })
438 .collect::<Vec<_>>();
439 let projection_mapping =
440 ProjectionMapping::try_new(&proj_exprs, &self.original_schema())?;
441 sort_information = base_eqp
442 .project(&projection_mapping, Arc::clone(&self.projected_schema))
443 .into_oeq_class()
444 .into_inner();
445 }
446
447 self.sort_information = sort_information;
448 Ok(self)
449 }
450
451 pub fn original_schema(&self) -> SchemaRef {
453 Arc::clone(&self.schema)
454 }
455
456 fn repartition_preserving_order(
462 &self,
463 target_partitions: usize,
464 output_ordering: LexOrdering,
465 ) -> Result<Option<Vec<Vec<RecordBatch>>>> {
466 if !self.eq_properties().ordering_satisfy(&output_ordering) {
467 Ok(None)
468 } else {
469 let total_num_batches =
470 self.partitions.iter().map(|b| b.len()).sum::<usize>();
471 if total_num_batches < target_partitions {
472 return Ok(None);
474 }
475
476 let cnt_to_repartition = target_partitions - self.partitions.len();
477
478 let to_repartition = self
481 .partitions
482 .iter()
483 .enumerate()
484 .map(|(idx, batches)| RePartition {
485 idx: idx + (cnt_to_repartition * idx), row_count: batches.iter().map(|batch| batch.num_rows()).sum(),
487 batches: batches.clone(),
488 })
489 .collect_vec();
490
491 let mut max_heap = BinaryHeap::with_capacity(target_partitions);
494 for rep in to_repartition {
495 max_heap.push(rep);
496 }
497
498 let mut cannot_split_further = Vec::with_capacity(target_partitions);
501 for _ in 0..cnt_to_repartition {
502 loop {
504 let Some(to_split) = max_heap.pop() else {
506 break;
508 };
509
510 let mut new_partitions = to_split.split();
512 if new_partitions.len() > 1 {
513 for new_partition in new_partitions {
514 max_heap.push(new_partition);
515 }
516 break;
518 } else {
519 cannot_split_further.push(new_partitions.remove(0));
520 }
521 }
522 }
523 let mut partitions = max_heap.drain().collect_vec();
524 partitions.extend(cannot_split_further);
525
526 partitions.sort_by_key(|p| p.idx);
529 let partitions = partitions.into_iter().map(|rep| rep.batches).collect_vec();
530
531 Ok(Some(partitions))
532 }
533 }
534
535 fn repartition_evenly_by_size(
544 &self,
545 target_partitions: usize,
546 ) -> Result<Option<Vec<Vec<RecordBatch>>>> {
547 let mut flatten_batches =
549 self.partitions.clone().into_iter().flatten().collect_vec();
550 if flatten_batches.len() < target_partitions {
551 return Ok(None);
552 }
553
554 let total_num_rows = flatten_batches.iter().map(|b| b.num_rows()).sum::<usize>();
556 flatten_batches.sort_by_key(|b| std::cmp::Reverse(b.num_rows()));
558
559 let mut partitions =
561 vec![Vec::with_capacity(flatten_batches.len()); target_partitions];
562 let mut target_partition_size = total_num_rows.div_ceil(target_partitions);
563 let mut total_rows_seen = 0;
564 let mut curr_bin_row_count = 0;
565 let mut idx = 0;
566 for batch in flatten_batches {
567 let row_cnt = batch.num_rows();
568 idx = std::cmp::min(idx, target_partitions - 1);
569
570 partitions[idx].push(batch);
571 curr_bin_row_count += row_cnt;
572 total_rows_seen += row_cnt;
573
574 if curr_bin_row_count >= target_partition_size {
575 idx += 1;
576 curr_bin_row_count = 0;
577
578 if total_rows_seen < total_num_rows {
581 target_partition_size = (total_num_rows - total_rows_seen)
582 .div_ceil(target_partitions - idx);
583 }
584 }
585 }
586
587 Ok(Some(partitions))
588 }
589}
590
591struct RePartition {
595 idx: usize,
597 row_count: usize,
600 batches: Vec<RecordBatch>,
602}
603
604impl RePartition {
605 fn split(self) -> Vec<Self> {
609 if self.batches.len() == 1 {
610 return vec![self];
611 }
612
613 let new_0 = RePartition {
614 idx: self.idx, row_count: 0,
616 batches: vec![],
617 };
618 let new_1 = RePartition {
619 idx: self.idx + 1, row_count: 0,
621 batches: vec![],
622 };
623 let split_pt = self.row_count / 2;
624
625 let [new_0, new_1] = self.batches.into_iter().fold(
626 [new_0, new_1],
627 |[mut new0, mut new1], batch| {
628 if new0.row_count < split_pt {
629 new0.add_batch(batch);
630 } else {
631 new1.add_batch(batch);
632 }
633 [new0, new1]
634 },
635 );
636 vec![new_0, new_1]
637 }
638
639 fn add_batch(&mut self, batch: RecordBatch) {
640 self.row_count += batch.num_rows();
641 self.batches.push(batch);
642 }
643}
644
645impl PartialOrd for RePartition {
646 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
647 Some(self.row_count.cmp(&other.row_count))
648 }
649}
650
651impl Ord for RePartition {
652 fn cmp(&self, other: &Self) -> Ordering {
653 self.row_count.cmp(&other.row_count)
654 }
655}
656
657impl PartialEq for RePartition {
658 fn eq(&self, other: &Self) -> bool {
659 self.row_count.eq(&other.row_count)
660 }
661}
662
663impl Eq for RePartition {}
664
665impl fmt::Display for RePartition {
666 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
667 write!(
668 f,
669 "{}rows-in-{}batches@{}",
670 self.row_count,
671 self.batches.len(),
672 self.idx
673 )
674 }
675}
676
677pub type PartitionData = Arc<RwLock<Vec<RecordBatch>>>;
679
680pub struct MemSink {
684 batches: Vec<PartitionData>,
686 schema: SchemaRef,
687}
688
689impl Debug for MemSink {
690 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
691 f.debug_struct("MemSink")
692 .field("num_partitions", &self.batches.len())
693 .finish()
694 }
695}
696
697impl DisplayAs for MemSink {
698 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
699 match t {
700 DisplayFormatType::Default | DisplayFormatType::Verbose => {
701 let partition_count = self.batches.len();
702 write!(f, "MemoryTable (partitions={partition_count})")
703 }
704 DisplayFormatType::TreeRender => {
705 write!(f, "")
707 }
708 }
709 }
710}
711
712impl MemSink {
713 pub fn try_new(batches: Vec<PartitionData>, schema: SchemaRef) -> Result<Self> {
717 if batches.is_empty() {
718 return plan_err!("Cannot insert into MemTable with zero partitions");
719 }
720 Ok(Self { batches, schema })
721 }
722}
723
724#[async_trait]
725impl DataSink for MemSink {
726 fn as_any(&self) -> &dyn Any {
727 self
728 }
729
730 fn schema(&self) -> &SchemaRef {
731 &self.schema
732 }
733
734 async fn write_all(
735 &self,
736 mut data: SendableRecordBatchStream,
737 _context: &Arc<TaskContext>,
738 ) -> Result<u64> {
739 let num_partitions = self.batches.len();
740
741 let mut new_batches = vec![vec![]; num_partitions];
744 let mut i = 0;
745 let mut row_count = 0;
746 while let Some(batch) = data.next().await.transpose()? {
747 row_count += batch.num_rows();
748 new_batches[i].push(batch);
749 i = (i + 1) % num_partitions;
750 }
751
752 for (target, mut batches) in self.batches.iter().zip(new_batches.into_iter()) {
754 target.write().await.append(&mut batches);
756 }
757
758 Ok(row_count as u64)
759 }
760}
761
762#[cfg(test)]
763mod memory_source_tests {
764 use std::sync::Arc;
765
766 use crate::memory::MemorySourceConfig;
767 use crate::source::DataSourceExec;
768 use datafusion_physical_plan::ExecutionPlan;
769
770 use arrow::compute::SortOptions;
771 use arrow::datatypes::{DataType, Field, Schema};
772 use datafusion_physical_expr::expressions::col;
773 use datafusion_physical_expr::PhysicalSortExpr;
774 use datafusion_physical_expr_common::sort_expr::LexOrdering;
775
776 #[test]
777 fn test_memory_order_eq() -> datafusion_common::Result<()> {
778 let schema = Arc::new(Schema::new(vec![
779 Field::new("a", DataType::Int64, false),
780 Field::new("b", DataType::Int64, false),
781 Field::new("c", DataType::Int64, false),
782 ]));
783 let sort1 = LexOrdering::new(vec![
784 PhysicalSortExpr {
785 expr: col("a", &schema)?,
786 options: SortOptions::default(),
787 },
788 PhysicalSortExpr {
789 expr: col("b", &schema)?,
790 options: SortOptions::default(),
791 },
792 ]);
793 let sort2 = LexOrdering::new(vec![PhysicalSortExpr {
794 expr: col("c", &schema)?,
795 options: SortOptions::default(),
796 }]);
797 let mut expected_output_order = LexOrdering::default();
798 expected_output_order.extend(sort1.clone());
799 expected_output_order.extend(sort2.clone());
800
801 let sort_information = vec![sort1.clone(), sort2.clone()];
802 let mem_exec = DataSourceExec::from_data_source(
803 MemorySourceConfig::try_new(&[vec![]], schema, None)?
804 .try_with_sort_information(sort_information)?,
805 );
806
807 assert_eq!(
808 mem_exec.properties().output_ordering().unwrap(),
809 &expected_output_order
810 );
811 let eq_properties = mem_exec.properties().equivalence_properties();
812 assert!(eq_properties.oeq_class().contains(&sort1));
813 assert!(eq_properties.oeq_class().contains(&sort2));
814 Ok(())
815 }
816}
817
818#[cfg(test)]
819mod tests {
820 use crate::test_util::col;
821 use crate::tests::{aggr_test_schema, make_partition};
822
823 use super::*;
824
825 use arrow::array::{ArrayRef, Int32Array, Int64Array, StringArray};
826 use arrow::compute::SortOptions;
827 use datafusion_physical_expr::PhysicalSortExpr;
828 use datafusion_physical_plan::expressions::lit;
829
830 use arrow::datatypes::{DataType, Field};
831 use datafusion_common::assert_batches_eq;
832 use datafusion_common::stats::{ColumnStatistics, Precision};
833 use futures::StreamExt;
834
835 #[tokio::test]
836 async fn exec_with_limit() -> Result<()> {
837 let task_ctx = Arc::new(TaskContext::default());
838 let batch = make_partition(7);
839 let schema = batch.schema();
840 let batches = vec![batch.clone(), batch];
841
842 let exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap();
843 assert_eq!(exec.fetch(), None);
844
845 let exec = exec.with_fetch(Some(4)).unwrap();
846 assert_eq!(exec.fetch(), Some(4));
847
848 let mut it = exec.execute(0, task_ctx)?;
849 let mut results = vec![];
850 while let Some(batch) = it.next().await {
851 results.push(batch?);
852 }
853
854 let expected = [
855 "+---+", "| i |", "+---+", "| 0 |", "| 1 |", "| 2 |", "| 3 |", "+---+",
856 ];
857 assert_batches_eq!(expected, &results);
858 Ok(())
859 }
860
861 #[tokio::test]
862 async fn values_empty_case() -> Result<()> {
863 let schema = aggr_test_schema();
864 let empty = MemorySourceConfig::try_new_as_values(schema, vec![]);
865 assert!(empty.is_err());
866 Ok(())
867 }
868
869 #[test]
870 fn new_exec_with_batches() {
871 let batch = make_partition(7);
872 let schema = batch.schema();
873 let batches = vec![batch.clone(), batch];
874 let _exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap();
875 }
876
877 #[test]
878 fn new_exec_with_batches_empty() {
879 let batch = make_partition(7);
880 let schema = batch.schema();
881 let _ = MemorySourceConfig::try_new_from_batches(schema, Vec::new()).unwrap_err();
882 }
883
884 #[test]
885 fn new_exec_with_batches_invalid_schema() {
886 let batch = make_partition(7);
887 let batches = vec![batch.clone(), batch];
888
889 let invalid_schema = Arc::new(Schema::new(vec![
890 Field::new("col0", DataType::UInt32, false),
891 Field::new("col1", DataType::Utf8, false),
892 ]));
893 let _ = MemorySourceConfig::try_new_from_batches(invalid_schema, batches)
894 .unwrap_err();
895 }
896
897 #[test]
899 fn new_exec_with_non_nullable_schema() {
900 let schema = Arc::new(Schema::new(vec![Field::new(
901 "col0",
902 DataType::UInt32,
903 false,
904 )]));
905 let _ = MemorySourceConfig::try_new_as_values(
906 Arc::clone(&schema),
907 vec![vec![lit(1u32)]],
908 )
909 .unwrap();
910 let _ = MemorySourceConfig::try_new_as_values(
912 schema,
913 vec![vec![lit(ScalarValue::UInt32(None))]],
914 )
915 .unwrap_err();
916 }
917
918 #[test]
919 fn values_stats_with_nulls_only() -> Result<()> {
920 let data = vec![
921 vec![lit(ScalarValue::Null)],
922 vec![lit(ScalarValue::Null)],
923 vec![lit(ScalarValue::Null)],
924 ];
925 let rows = data.len();
926 let values = MemorySourceConfig::try_new_as_values(
927 Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)])),
928 data,
929 )?;
930
931 assert_eq!(
932 values.partition_statistics(None)?,
933 Statistics {
934 num_rows: Precision::Exact(rows),
935 total_byte_size: Precision::Exact(8), column_statistics: vec![ColumnStatistics {
937 null_count: Precision::Exact(rows), distinct_count: Precision::Absent,
939 max_value: Precision::Absent,
940 min_value: Precision::Absent,
941 sum_value: Precision::Absent,
942 },],
943 }
944 );
945
946 Ok(())
947 }
948
949 fn batch(row_size: usize) -> RecordBatch {
950 let a: ArrayRef = Arc::new(Int32Array::from(vec![1; row_size]));
951 let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("foo"); row_size]));
952 let c: ArrayRef = Arc::new(Int64Array::from_iter(vec![1; row_size]));
953 RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
954 }
955
956 fn schema() -> SchemaRef {
957 batch(1).schema()
958 }
959
960 fn memorysrcconfig_no_partitions(
961 sort_information: Vec<LexOrdering>,
962 ) -> Result<MemorySourceConfig> {
963 let partitions = vec![];
964 MemorySourceConfig::try_new(&partitions, schema(), None)?
965 .try_with_sort_information(sort_information)
966 }
967
968 fn memorysrcconfig_1_partition_1_batch(
969 sort_information: Vec<LexOrdering>,
970 ) -> Result<MemorySourceConfig> {
971 let partitions = vec![vec![batch(100)]];
972 MemorySourceConfig::try_new(&partitions, schema(), None)?
973 .try_with_sort_information(sort_information)
974 }
975
976 fn memorysrcconfig_3_partitions_1_batch_each(
977 sort_information: Vec<LexOrdering>,
978 ) -> Result<MemorySourceConfig> {
979 let partitions = vec![vec![batch(100)], vec![batch(100)], vec![batch(100)]];
980 MemorySourceConfig::try_new(&partitions, schema(), None)?
981 .try_with_sort_information(sort_information)
982 }
983
984 fn memorysrcconfig_3_partitions_with_2_batches_each(
985 sort_information: Vec<LexOrdering>,
986 ) -> Result<MemorySourceConfig> {
987 let partitions = vec![
988 vec![batch(100), batch(100)],
989 vec![batch(100), batch(100)],
990 vec![batch(100), batch(100)],
991 ];
992 MemorySourceConfig::try_new(&partitions, schema(), None)?
993 .try_with_sort_information(sort_information)
994 }
995
996 fn memorysrcconfig_1_partition_with_different_sized_batches(
999 sort_information: Vec<LexOrdering>,
1000 ) -> Result<MemorySourceConfig> {
1001 let partitions = vec![vec![batch(100_000), batch(10_000), batch(100), batch(1)]];
1002 MemorySourceConfig::try_new(&partitions, schema(), None)?
1003 .try_with_sort_information(sort_information)
1004 }
1005
1006 fn memorysrcconfig_1_partition_with_ordering_not_matching_size(
1010 sort_information: Vec<LexOrdering>,
1011 ) -> Result<MemorySourceConfig> {
1012 let partitions = vec![vec![batch(100_000), batch(1), batch(100), batch(10_000)]];
1013 MemorySourceConfig::try_new(&partitions, schema(), None)?
1014 .try_with_sort_information(sort_information)
1015 }
1016
1017 fn memorysrcconfig_2_partition_with_different_sized_batches(
1018 sort_information: Vec<LexOrdering>,
1019 ) -> Result<MemorySourceConfig> {
1020 let partitions = vec![
1021 vec![batch(100_000), batch(10_000), batch(1_000)],
1022 vec![batch(2_000), batch(20)],
1023 ];
1024 MemorySourceConfig::try_new(&partitions, schema(), None)?
1025 .try_with_sort_information(sort_information)
1026 }
1027
1028 fn memorysrcconfig_2_partition_with_extreme_sized_batches(
1029 sort_information: Vec<LexOrdering>,
1030 ) -> Result<MemorySourceConfig> {
1031 let partitions = vec![
1032 vec![
1033 batch(100_000),
1034 batch(1),
1035 batch(1),
1036 batch(1),
1037 batch(1),
1038 batch(0),
1039 ],
1040 vec![batch(1), batch(1), batch(1), batch(1), batch(0), batch(100)],
1041 ];
1042 MemorySourceConfig::try_new(&partitions, schema(), None)?
1043 .try_with_sort_information(sort_information)
1044 }
1045
1046 fn assert_partitioning(
1050 partitioned_datasrc: Option<Arc<dyn DataSource>>,
1051 partition_cnt: Option<usize>,
1052 ) {
1053 let should_exist = if let Some(partition_cnt) = partition_cnt {
1054 format!("new datasource should exist and have {partition_cnt:?} partitions")
1055 } else {
1056 "new datasource should not exist".into()
1057 };
1058
1059 let actual = partitioned_datasrc
1060 .map(|datasrc| datasrc.output_partitioning().partition_count());
1061 assert_eq!(
1062 actual,
1063 partition_cnt,
1064 "partitioned datasrc does not match expected, we expected {should_exist}, instead found {actual:?}"
1065 );
1066 }
1067
1068 fn run_all_test_scenarios(
1069 output_ordering: Option<LexOrdering>,
1070 sort_information_on_config: Vec<LexOrdering>,
1071 ) -> Result<()> {
1072 let not_used = usize::MAX;
1073
1074 let mem_src_config =
1076 memorysrcconfig_no_partitions(sort_information_on_config.clone())?;
1077 let partitioned_datasrc =
1078 mem_src_config.repartitioned(1, not_used, output_ordering.clone())?;
1079 assert_partitioning(partitioned_datasrc, None);
1080
1081 let target_partitions = 1;
1083 let mem_src_config =
1084 memorysrcconfig_1_partition_1_batch(sort_information_on_config.clone())?;
1085 let partitioned_datasrc = mem_src_config.repartitioned(
1086 target_partitions,
1087 not_used,
1088 output_ordering.clone(),
1089 )?;
1090 assert_partitioning(partitioned_datasrc, None);
1091
1092 let target_partitions = 3;
1094 let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
1095 sort_information_on_config.clone(),
1096 )?;
1097 let partitioned_datasrc = mem_src_config.repartitioned(
1098 target_partitions,
1099 not_used,
1100 output_ordering.clone(),
1101 )?;
1102 assert_partitioning(partitioned_datasrc, None);
1103
1104 let target_partitions = 2;
1106 let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
1107 sort_information_on_config.clone(),
1108 )?;
1109 let partitioned_datasrc = mem_src_config.repartitioned(
1110 target_partitions,
1111 not_used,
1112 output_ordering.clone(),
1113 )?;
1114 assert_partitioning(partitioned_datasrc, None);
1115
1116 let target_partitions = 4;
1118 let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
1119 sort_information_on_config.clone(),
1120 )?;
1121 let partitioned_datasrc = mem_src_config.repartitioned(
1122 target_partitions,
1123 not_used,
1124 output_ordering.clone(),
1125 )?;
1126 assert_partitioning(partitioned_datasrc, None);
1127
1128 let target_partitions = 5;
1131 let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
1132 sort_information_on_config.clone(),
1133 )?;
1134 let partitioned_datasrc = mem_src_config.repartitioned(
1135 target_partitions,
1136 not_used,
1137 output_ordering.clone(),
1138 )?;
1139 assert_partitioning(partitioned_datasrc, Some(5));
1140
1141 let target_partitions = 6;
1144 let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
1145 sort_information_on_config.clone(),
1146 )?;
1147 let partitioned_datasrc = mem_src_config.repartitioned(
1148 target_partitions,
1149 not_used,
1150 output_ordering.clone(),
1151 )?;
1152 assert_partitioning(partitioned_datasrc, Some(6));
1153
1154 let target_partitions = 3 * 2 + 1;
1156 let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
1157 sort_information_on_config.clone(),
1158 )?;
1159 let partitioned_datasrc = mem_src_config.repartitioned(
1160 target_partitions,
1161 not_used,
1162 output_ordering.clone(),
1163 )?;
1164 assert_partitioning(partitioned_datasrc, None);
1165
1166 let target_partitions = 2;
1169 let mem_src_config = memorysrcconfig_1_partition_with_different_sized_batches(
1170 sort_information_on_config,
1171 )?;
1172 let partitioned_datasrc = mem_src_config.clone().repartitioned(
1173 target_partitions,
1174 not_used,
1175 output_ordering,
1176 )?;
1177 assert_partitioning(partitioned_datasrc.clone(), Some(2));
1178 let partitioned_datasrc = partitioned_datasrc.unwrap();
1181 let Some(mem_src_config) = partitioned_datasrc
1182 .as_any()
1183 .downcast_ref::<MemorySourceConfig>()
1184 else {
1185 unreachable!()
1186 };
1187 let repartitioned_raw_batches = mem_src_config.partitions.clone();
1188 assert_eq!(repartitioned_raw_batches.len(), 2);
1189 let [ref p1, ref p2] = repartitioned_raw_batches[..] else {
1190 unreachable!()
1191 };
1192 assert_eq!(p1.len(), 1);
1194 assert_eq!(p1[0].num_rows(), 100_000);
1195 assert_eq!(p2.len(), 3);
1197 assert_eq!(p2[0].num_rows(), 10_000);
1198 assert_eq!(p2[1].num_rows(), 100);
1199 assert_eq!(p2[2].num_rows(), 1);
1200
1201 Ok(())
1202 }
1203
1204 #[test]
1205 fn test_repartition_no_sort_information_no_output_ordering() -> Result<()> {
1206 let no_sort = vec![];
1207 let no_output_ordering = None;
1208
1209 run_all_test_scenarios(no_output_ordering.clone(), no_sort.clone())?;
1211
1212 let target_partitions = 3;
1216 let mem_src_config =
1217 memorysrcconfig_2_partition_with_different_sized_batches(no_sort)?;
1218 let partitioned_datasrc = mem_src_config.clone().repartitioned(
1219 target_partitions,
1220 usize::MAX,
1221 no_output_ordering,
1222 )?;
1223 assert_partitioning(partitioned_datasrc.clone(), Some(3));
1224 let repartitioned_raw_batches = mem_src_config
1227 .repartition_evenly_by_size(target_partitions)?
1228 .unwrap();
1229 assert_eq!(repartitioned_raw_batches.len(), 3);
1230 let [ref p1, ref p2, ref p3] = repartitioned_raw_batches[..] else {
1231 unreachable!()
1232 };
1233 assert_eq!(p1.len(), 1);
1235 assert_eq!(p1[0].num_rows(), 100_000);
1236 assert_eq!(p2.len(), 1);
1238 assert_eq!(p2[0].num_rows(), 10_000);
1239 assert_eq!(p3.len(), 3);
1241 assert_eq!(p3[0].num_rows(), 2_000);
1242 assert_eq!(p3[1].num_rows(), 1_000);
1243 assert_eq!(p3[2].num_rows(), 20);
1244
1245 Ok(())
1246 }
1247
1248 #[test]
1249 fn test_repartition_no_sort_information_no_output_ordering_lopsized_batches(
1250 ) -> Result<()> {
1251 let no_sort = vec![];
1252 let no_output_ordering = None;
1253
1254 let target_partitions = 5;
1265 let mem_src_config =
1266 memorysrcconfig_2_partition_with_extreme_sized_batches(no_sort)?;
1267 let partitioned_datasrc = mem_src_config.clone().repartitioned(
1268 target_partitions,
1269 usize::MAX,
1270 no_output_ordering,
1271 )?;
1272 assert_partitioning(partitioned_datasrc.clone(), Some(5));
1273 let repartitioned_raw_batches = mem_src_config
1277 .repartition_evenly_by_size(target_partitions)?
1278 .unwrap();
1279 assert_eq!(repartitioned_raw_batches.len(), 5);
1280 let [ref p1, ref p2, ref p3, ref p4, ref p5] = repartitioned_raw_batches[..]
1281 else {
1282 unreachable!()
1283 };
1284 assert_eq!(p1.len(), 1);
1286 assert_eq!(p1[0].num_rows(), 100_000);
1287 assert_eq!(p2.len(), 1);
1289 assert_eq!(p2[0].num_rows(), 100);
1290 assert_eq!(p3.len(), 3);
1292 assert_eq!(p3[0].num_rows(), 1);
1293 assert_eq!(p3[1].num_rows(), 1);
1294 assert_eq!(p3[2].num_rows(), 1);
1295 assert_eq!(p4.len(), 3);
1297 assert_eq!(p4[0].num_rows(), 1);
1298 assert_eq!(p4[1].num_rows(), 1);
1299 assert_eq!(p4[2].num_rows(), 1);
1300 assert_eq!(p5.len(), 4);
1302 assert_eq!(p5[0].num_rows(), 1);
1303 assert_eq!(p5[1].num_rows(), 1);
1304 assert_eq!(p5[2].num_rows(), 0);
1305 assert_eq!(p5[3].num_rows(), 0);
1306
1307 Ok(())
1308 }
1309
1310 #[test]
1311 fn test_repartition_with_sort_information() -> Result<()> {
1312 let schema = schema();
1313 let sort_key = LexOrdering::new(vec![PhysicalSortExpr {
1314 expr: col("c", &schema).unwrap(),
1315 options: SortOptions::default(),
1316 }]);
1317 let has_sort = vec![sort_key.clone()];
1318 let output_ordering = Some(sort_key);
1319
1320 run_all_test_scenarios(output_ordering.clone(), has_sort.clone())?;
1322
1323 let target_partitions = 3;
1325 let mem_src_config =
1326 memorysrcconfig_2_partition_with_different_sized_batches(has_sort)?;
1327 let partitioned_datasrc = mem_src_config.clone().repartitioned(
1328 target_partitions,
1329 usize::MAX,
1330 output_ordering.clone(),
1331 )?;
1332 assert_partitioning(partitioned_datasrc.clone(), Some(3));
1333 let Some(output_ord) = output_ordering else {
1336 unreachable!()
1337 };
1338 let repartitioned_raw_batches = mem_src_config
1339 .repartition_preserving_order(target_partitions, output_ord)?
1340 .unwrap();
1341 assert_eq!(repartitioned_raw_batches.len(), 3);
1342 let [ref p1, ref p2, ref p3] = repartitioned_raw_batches[..] else {
1343 unreachable!()
1344 };
1345 assert_eq!(p1.len(), 1);
1347 assert_eq!(p1[0].num_rows(), 100_000);
1348 assert_eq!(p2.len(), 2);
1350 assert_eq!(p2[0].num_rows(), 10_000);
1351 assert_eq!(p2[1].num_rows(), 1_000);
1352 assert_eq!(p3.len(), 2);
1354 assert_eq!(p3[0].num_rows(), 2_000);
1355 assert_eq!(p3[1].num_rows(), 20);
1356
1357 Ok(())
1358 }
1359
1360 #[test]
1361 fn test_repartition_with_batch_ordering_not_matching_sizing() -> Result<()> {
1362 let schema = schema();
1363 let sort_key = LexOrdering::new(vec![PhysicalSortExpr {
1364 expr: col("c", &schema).unwrap(),
1365 options: SortOptions::default(),
1366 }]);
1367 let has_sort = vec![sort_key.clone()];
1368 let output_ordering = Some(sort_key);
1369
1370 let target_partitions = 2;
1373 let mem_src_config =
1374 memorysrcconfig_1_partition_with_ordering_not_matching_size(has_sort)?;
1375 let partitioned_datasrc = mem_src_config.clone().repartitioned(
1376 target_partitions,
1377 usize::MAX,
1378 output_ordering,
1379 )?;
1380 assert_partitioning(partitioned_datasrc.clone(), Some(2));
1381 let partitioned_datasrc = partitioned_datasrc.unwrap();
1384 let Some(mem_src_config) = partitioned_datasrc
1385 .as_any()
1386 .downcast_ref::<MemorySourceConfig>()
1387 else {
1388 unreachable!()
1389 };
1390 let repartitioned_raw_batches = mem_src_config.partitions.clone();
1391 assert_eq!(repartitioned_raw_batches.len(), 2);
1392 let [ref p1, ref p2] = repartitioned_raw_batches[..] else {
1393 unreachable!()
1394 };
1395 assert_eq!(p1.len(), 1);
1397 assert_eq!(p1[0].num_rows(), 100_000);
1398 assert_eq!(p2.len(), 3);
1400 assert_eq!(p2[0].num_rows(), 1);
1401 assert_eq!(p2[1].num_rows(), 100);
1402 assert_eq!(p2[2].num_rows(), 10_000);
1403
1404 Ok(())
1405 }
1406}