1use std::any::Any;
21use std::fmt;
22use std::sync::Arc;
23
24use crate::source::{DataSource, DataSourceExec};
25use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
26use datafusion_physical_plan::memory::MemoryStream;
27use datafusion_physical_plan::projection::{
28 all_alias_free_columns, new_projections_for_columns, ProjectionExec,
29};
30use datafusion_physical_plan::{
31 common, ColumnarValue, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
32 PhysicalExpr, PlanProperties, SendableRecordBatchStream, Statistics,
33};
34
35use arrow::array::{RecordBatch, RecordBatchOptions};
36use arrow::datatypes::{Schema, SchemaRef};
37use datafusion_common::{
38 internal_err, plan_err, project_schema, Constraints, Result, ScalarValue,
39};
40use datafusion_execution::TaskContext;
41use datafusion_physical_expr::equivalence::ProjectionMapping;
42use datafusion_physical_expr::expressions::Column;
43use datafusion_physical_expr::utils::collect_columns;
44use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
45
46#[derive(Clone)]
48#[deprecated(
49 since = "46.0.0",
50 note = "use MemorySourceConfig and DataSourceExec instead"
51)]
52pub struct MemoryExec {
53 inner: DataSourceExec,
54 partitions: Vec<Vec<RecordBatch>>,
56 projection: Option<Vec<usize>>,
58 sort_information: Vec<LexOrdering>,
60 show_sizes: bool,
62}
63
64#[allow(unused, deprecated)]
65impl fmt::Debug for MemoryExec {
66 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
67 self.inner.fmt_as(DisplayFormatType::Default, f)
68 }
69}
70
71#[allow(unused, deprecated)]
72impl DisplayAs for MemoryExec {
73 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
74 self.inner.fmt_as(t, f)
75 }
76}
77
78#[allow(unused, deprecated)]
79impl ExecutionPlan for MemoryExec {
80 fn name(&self) -> &'static str {
81 "MemoryExec"
82 }
83
84 fn as_any(&self) -> &dyn Any {
86 self
87 }
88
89 fn properties(&self) -> &PlanProperties {
90 self.inner.properties()
91 }
92
93 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
94 vec![]
96 }
97
98 fn with_new_children(
99 self: Arc<Self>,
100 children: Vec<Arc<dyn ExecutionPlan>>,
101 ) -> Result<Arc<dyn ExecutionPlan>> {
102 if children.is_empty() {
104 Ok(self)
105 } else {
106 internal_err!("Children cannot be replaced in {self:?}")
107 }
108 }
109
110 fn execute(
111 &self,
112 partition: usize,
113 context: Arc<TaskContext>,
114 ) -> Result<SendableRecordBatchStream> {
115 self.inner.execute(partition, context)
116 }
117
118 fn statistics(&self) -> Result<Statistics> {
120 self.inner.statistics()
121 }
122
123 fn try_swapping_with_projection(
124 &self,
125 projection: &ProjectionExec,
126 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
127 self.inner.try_swapping_with_projection(projection)
128 }
129}
130
131#[allow(unused, deprecated)]
132impl MemoryExec {
133 pub fn try_new(
136 partitions: &[Vec<RecordBatch>],
137 schema: SchemaRef,
138 projection: Option<Vec<usize>>,
139 ) -> Result<Self> {
140 let source = MemorySourceConfig::try_new(partitions, schema, projection.clone())?;
141 let data_source = DataSourceExec::new(Arc::new(source));
142 Ok(Self {
143 inner: data_source,
144 partitions: partitions.to_vec(),
145 projection,
146 sort_information: vec![],
147 show_sizes: true,
148 })
149 }
150
151 pub fn try_new_as_values(
153 schema: SchemaRef,
154 data: Vec<Vec<Arc<dyn PhysicalExpr>>>,
155 ) -> Result<Self> {
156 if data.is_empty() {
157 return plan_err!("Values list cannot be empty");
158 }
159
160 let n_row = data.len();
161 let n_col = schema.fields().len();
162
163 let placeholder_schema = Arc::new(Schema::empty());
166 let placeholder_batch = RecordBatch::try_new_with_options(
167 Arc::clone(&placeholder_schema),
168 vec![],
169 &RecordBatchOptions::new().with_row_count(Some(1)),
170 )?;
171
172 let arrays = (0..n_col)
174 .map(|j| {
175 (0..n_row)
176 .map(|i| {
177 let expr = &data[i][j];
178 let result = expr.evaluate(&placeholder_batch)?;
179
180 match result {
181 ColumnarValue::Scalar(scalar) => Ok(scalar),
182 ColumnarValue::Array(array) if array.len() == 1 => {
183 ScalarValue::try_from_array(&array, 0)
184 }
185 ColumnarValue::Array(_) => {
186 plan_err!("Cannot have array values in a values list")
187 }
188 }
189 })
190 .collect::<Result<Vec<_>>>()
191 .and_then(ScalarValue::iter_to_array)
192 })
193 .collect::<Result<Vec<_>>>()?;
194
195 let batch = RecordBatch::try_new_with_options(
196 Arc::clone(&schema),
197 arrays,
198 &RecordBatchOptions::new().with_row_count(Some(n_row)),
199 )?;
200
201 let partitions = vec![batch];
202 Self::try_new_from_batches(Arc::clone(&schema), partitions)
203 }
204
205 pub fn try_new_from_batches(
210 schema: SchemaRef,
211 batches: Vec<RecordBatch>,
212 ) -> Result<Self> {
213 if batches.is_empty() {
214 return plan_err!("Values list cannot be empty");
215 }
216
217 for batch in &batches {
218 let batch_schema = batch.schema();
219 if batch_schema != schema {
220 return plan_err!(
221 "Batch has invalid schema. Expected: {}, got: {}",
222 schema,
223 batch_schema
224 );
225 }
226 }
227
228 let partitions = vec![batches];
229 let source = MemorySourceConfig {
230 partitions: partitions.clone(),
231 schema: Arc::clone(&schema),
232 projected_schema: Arc::clone(&schema),
233 projection: None,
234 sort_information: vec![],
235 show_sizes: true,
236 fetch: None,
237 };
238 let data_source = DataSourceExec::new(Arc::new(source));
239 Ok(Self {
240 inner: data_source,
241 partitions,
242 projection: None,
243 sort_information: vec![],
244 show_sizes: true,
245 })
246 }
247
248 fn memory_source_config(&self) -> MemorySourceConfig {
249 self.inner
250 .data_source()
251 .as_any()
252 .downcast_ref::<MemorySourceConfig>()
253 .unwrap()
254 .clone()
255 }
256
257 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
258 self.inner = self.inner.with_constraints(constraints);
259 self
260 }
261
262 pub fn with_show_sizes(mut self, show_sizes: bool) -> Self {
264 let mut memory_source = self.memory_source_config();
265 memory_source.show_sizes = show_sizes;
266 self.show_sizes = show_sizes;
267 self.inner = DataSourceExec::new(Arc::new(memory_source));
268 self
269 }
270
271 pub fn constraints(&self) -> &Constraints {
273 self.properties().equivalence_properties().constraints()
274 }
275
276 pub fn partitions(&self) -> &[Vec<RecordBatch>] {
278 &self.partitions
279 }
280
281 pub fn projection(&self) -> &Option<Vec<usize>> {
283 &self.projection
284 }
285
286 pub fn show_sizes(&self) -> bool {
288 self.show_sizes
289 }
290
291 pub fn sort_information(&self) -> &[LexOrdering] {
293 &self.sort_information
294 }
295
296 pub fn try_with_sort_information(
316 mut self,
317 sort_information: Vec<LexOrdering>,
318 ) -> Result<Self> {
319 self.sort_information = sort_information.clone();
320 let mut memory_source = self.memory_source_config();
321 memory_source = memory_source.try_with_sort_information(sort_information)?;
322 self.inner = DataSourceExec::new(Arc::new(memory_source));
323 Ok(self)
324 }
325
326 pub fn original_schema(&self) -> SchemaRef {
328 Arc::clone(&self.inner.schema())
329 }
330
331 fn compute_properties(
333 schema: SchemaRef,
334 orderings: &[LexOrdering],
335 constraints: Constraints,
336 partitions: &[Vec<RecordBatch>],
337 ) -> PlanProperties {
338 PlanProperties::new(
339 EquivalenceProperties::new_with_orderings(schema, orderings)
340 .with_constraints(constraints),
341 Partitioning::UnknownPartitioning(partitions.len()),
342 EmissionType::Incremental,
343 Boundedness::Bounded,
344 )
345 }
346}
347
348#[derive(Clone, Debug)]
350pub struct MemorySourceConfig {
351 partitions: Vec<Vec<RecordBatch>>,
353 schema: SchemaRef,
355 projected_schema: SchemaRef,
357 projection: Option<Vec<usize>>,
359 sort_information: Vec<LexOrdering>,
361 show_sizes: bool,
363 fetch: Option<usize>,
366}
367
368impl DataSource for MemorySourceConfig {
369 fn open(
370 &self,
371 partition: usize,
372 _context: Arc<TaskContext>,
373 ) -> Result<SendableRecordBatchStream> {
374 Ok(Box::pin(
375 MemoryStream::try_new(
376 self.partitions[partition].clone(),
377 Arc::clone(&self.projected_schema),
378 self.projection.clone(),
379 )?
380 .with_fetch(self.fetch),
381 ))
382 }
383
384 fn as_any(&self) -> &dyn Any {
385 self
386 }
387
388 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
389 match t {
390 DisplayFormatType::Default | DisplayFormatType::Verbose => {
391 let partition_sizes: Vec<_> =
392 self.partitions.iter().map(|b| b.len()).collect();
393
394 let output_ordering = self
395 .sort_information
396 .first()
397 .map(|output_ordering| {
398 format!(", output_ordering={}", output_ordering)
399 })
400 .unwrap_or_default();
401
402 let eq_properties = self.eq_properties();
403 let constraints = eq_properties.constraints();
404 let constraints = if constraints.is_empty() {
405 String::new()
406 } else {
407 format!(", {}", constraints)
408 };
409
410 let limit = self
411 .fetch
412 .map_or(String::new(), |limit| format!(", fetch={}", limit));
413 if self.show_sizes {
414 write!(
415 f,
416 "partitions={}, partition_sizes={partition_sizes:?}{limit}{output_ordering}{constraints}",
417 partition_sizes.len(),
418 )
419 } else {
420 write!(
421 f,
422 "partitions={}{limit}{output_ordering}{constraints}",
423 partition_sizes.len(),
424 )
425 }
426 }
427 }
428 }
429
430 fn output_partitioning(&self) -> Partitioning {
431 Partitioning::UnknownPartitioning(self.partitions.len())
432 }
433
434 fn eq_properties(&self) -> EquivalenceProperties {
435 EquivalenceProperties::new_with_orderings(
436 Arc::clone(&self.projected_schema),
437 self.sort_information.as_slice(),
438 )
439 }
440
441 fn statistics(&self) -> Result<Statistics> {
442 Ok(common::compute_record_batch_statistics(
443 &self.partitions,
444 &self.schema,
445 self.projection.clone(),
446 ))
447 }
448
449 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
450 let source = self.clone();
451 Some(Arc::new(source.with_limit(limit)))
452 }
453
454 fn fetch(&self) -> Option<usize> {
455 self.fetch
456 }
457
458 fn try_swapping_with_projection(
459 &self,
460 projection: &ProjectionExec,
461 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
462 all_alias_free_columns(projection.expr())
465 .then(|| {
466 let all_projections = (0..self.schema.fields().len()).collect();
467 let new_projections = new_projections_for_columns(
468 projection,
469 self.projection().as_ref().unwrap_or(&all_projections),
470 );
471
472 MemorySourceConfig::try_new_exec(
473 self.partitions(),
474 self.original_schema(),
475 Some(new_projections),
476 )
477 .map(|e| e as _)
478 })
479 .transpose()
480 }
481}
482
483impl MemorySourceConfig {
484 pub fn try_new(
487 partitions: &[Vec<RecordBatch>],
488 schema: SchemaRef,
489 projection: Option<Vec<usize>>,
490 ) -> Result<Self> {
491 let projected_schema = project_schema(&schema, projection.as_ref())?;
492 Ok(Self {
493 partitions: partitions.to_vec(),
494 schema,
495 projected_schema,
496 projection,
497 sort_information: vec![],
498 show_sizes: true,
499 fetch: None,
500 })
501 }
502
503 pub fn try_new_exec(
506 partitions: &[Vec<RecordBatch>],
507 schema: SchemaRef,
508 projection: Option<Vec<usize>>,
509 ) -> Result<Arc<DataSourceExec>> {
510 let source = Self::try_new(partitions, schema, projection)?;
511 Ok(Arc::new(DataSourceExec::new(Arc::new(source))))
512 }
513
514 pub fn try_new_as_values(
516 schema: SchemaRef,
517 data: Vec<Vec<Arc<dyn PhysicalExpr>>>,
518 ) -> Result<Arc<DataSourceExec>> {
519 if data.is_empty() {
520 return plan_err!("Values list cannot be empty");
521 }
522
523 let n_row = data.len();
524 let n_col = schema.fields().len();
525
526 let placeholder_schema = Arc::new(Schema::empty());
529 let placeholder_batch = RecordBatch::try_new_with_options(
530 Arc::clone(&placeholder_schema),
531 vec![],
532 &RecordBatchOptions::new().with_row_count(Some(1)),
533 )?;
534
535 let arrays = (0..n_col)
537 .map(|j| {
538 (0..n_row)
539 .map(|i| {
540 let expr = &data[i][j];
541 let result = expr.evaluate(&placeholder_batch)?;
542
543 match result {
544 ColumnarValue::Scalar(scalar) => Ok(scalar),
545 ColumnarValue::Array(array) if array.len() == 1 => {
546 ScalarValue::try_from_array(&array, 0)
547 }
548 ColumnarValue::Array(_) => {
549 plan_err!("Cannot have array values in a values list")
550 }
551 }
552 })
553 .collect::<Result<Vec<_>>>()
554 .and_then(ScalarValue::iter_to_array)
555 })
556 .collect::<Result<Vec<_>>>()?;
557
558 let batch = RecordBatch::try_new_with_options(
559 Arc::clone(&schema),
560 arrays,
561 &RecordBatchOptions::new().with_row_count(Some(n_row)),
562 )?;
563
564 let partitions = vec![batch];
565 Self::try_new_from_batches(Arc::clone(&schema), partitions)
566 }
567
568 pub fn try_new_from_batches(
573 schema: SchemaRef,
574 batches: Vec<RecordBatch>,
575 ) -> Result<Arc<DataSourceExec>> {
576 if batches.is_empty() {
577 return plan_err!("Values list cannot be empty");
578 }
579
580 for batch in &batches {
581 let batch_schema = batch.schema();
582 if batch_schema != schema {
583 return plan_err!(
584 "Batch has invalid schema. Expected: {}, got: {}",
585 schema,
586 batch_schema
587 );
588 }
589 }
590
591 let partitions = vec![batches];
592 let source = Self {
593 partitions,
594 schema: Arc::clone(&schema),
595 projected_schema: Arc::clone(&schema),
596 projection: None,
597 sort_information: vec![],
598 show_sizes: true,
599 fetch: None,
600 };
601 Ok(Arc::new(DataSourceExec::new(Arc::new(source))))
602 }
603
604 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
606 self.fetch = limit;
607 self
608 }
609
610 pub fn with_show_sizes(mut self, show_sizes: bool) -> Self {
612 self.show_sizes = show_sizes;
613 self
614 }
615
616 pub fn partitions(&self) -> &[Vec<RecordBatch>] {
618 &self.partitions
619 }
620
621 pub fn projection(&self) -> &Option<Vec<usize>> {
623 &self.projection
624 }
625
626 pub fn show_sizes(&self) -> bool {
628 self.show_sizes
629 }
630
631 pub fn sort_information(&self) -> &[LexOrdering] {
633 &self.sort_information
634 }
635
636 pub fn try_with_sort_information(
656 mut self,
657 mut sort_information: Vec<LexOrdering>,
658 ) -> Result<Self> {
659 let fields = self.schema.fields();
661 let ambiguous_column = sort_information
662 .iter()
663 .flat_map(|ordering| ordering.clone())
664 .flat_map(|expr| collect_columns(&expr.expr))
665 .find(|col| {
666 fields
667 .get(col.index())
668 .map(|field| field.name() != col.name())
669 .unwrap_or(true)
670 });
671 if let Some(col) = ambiguous_column {
672 return internal_err!(
673 "Column {:?} is not found in the original schema of the MemorySourceConfig",
674 col
675 );
676 }
677
678 if let Some(projection) = &self.projection {
680 let base_eqp = EquivalenceProperties::new_with_orderings(
681 self.original_schema(),
682 &sort_information,
683 );
684 let proj_exprs = projection
685 .iter()
686 .map(|idx| {
687 let base_schema = self.original_schema();
688 let name = base_schema.field(*idx).name();
689 (Arc::new(Column::new(name, *idx)) as _, name.to_string())
690 })
691 .collect::<Vec<_>>();
692 let projection_mapping =
693 ProjectionMapping::try_new(&proj_exprs, &self.original_schema())?;
694 sort_information = base_eqp
695 .project(&projection_mapping, Arc::clone(&self.projected_schema))
696 .into_oeq_class()
697 .into_inner();
698 }
699
700 self.sort_information = sort_information;
701 Ok(self)
702 }
703
704 pub fn original_schema(&self) -> SchemaRef {
706 Arc::clone(&self.schema)
707 }
708}
709
710#[cfg(test)]
711mod memory_source_tests {
712 use std::sync::Arc;
713
714 use crate::memory::MemorySourceConfig;
715 use crate::source::DataSourceExec;
716 use datafusion_physical_plan::ExecutionPlan;
717
718 use arrow::compute::SortOptions;
719 use arrow::datatypes::{DataType, Field, Schema};
720 use datafusion_physical_expr::expressions::col;
721 use datafusion_physical_expr::PhysicalSortExpr;
722 use datafusion_physical_expr_common::sort_expr::LexOrdering;
723
724 #[test]
725 fn test_memory_order_eq() -> datafusion_common::Result<()> {
726 let schema = Arc::new(Schema::new(vec![
727 Field::new("a", DataType::Int64, false),
728 Field::new("b", DataType::Int64, false),
729 Field::new("c", DataType::Int64, false),
730 ]));
731 let sort1 = LexOrdering::new(vec![
732 PhysicalSortExpr {
733 expr: col("a", &schema)?,
734 options: SortOptions::default(),
735 },
736 PhysicalSortExpr {
737 expr: col("b", &schema)?,
738 options: SortOptions::default(),
739 },
740 ]);
741 let sort2 = LexOrdering::new(vec![PhysicalSortExpr {
742 expr: col("c", &schema)?,
743 options: SortOptions::default(),
744 }]);
745 let mut expected_output_order = LexOrdering::default();
746 expected_output_order.extend(sort1.clone());
747 expected_output_order.extend(sort2.clone());
748
749 let sort_information = vec![sort1.clone(), sort2.clone()];
750 let mem_exec = Arc::new(DataSourceExec::new(Arc::new(
751 MemorySourceConfig::try_new(&[vec![]], schema, None)?
752 .try_with_sort_information(sort_information)?,
753 )));
754
755 assert_eq!(
756 mem_exec.properties().output_ordering().unwrap(),
757 &expected_output_order
758 );
759 let eq_properties = mem_exec.properties().equivalence_properties();
760 assert!(eq_properties.oeq_class().contains(&sort1));
761 assert!(eq_properties.oeq_class().contains(&sort2));
762 Ok(())
763 }
764}
765
766#[cfg(test)]
767mod tests {
768 use crate::tests::{aggr_test_schema, make_partition};
769
770 use super::*;
771
772 use datafusion_physical_plan::expressions::lit;
773
774 use arrow::datatypes::{DataType, Field};
775 use datafusion_common::assert_batches_eq;
776 use datafusion_common::stats::{ColumnStatistics, Precision};
777 use futures::StreamExt;
778
779 #[tokio::test]
780 async fn exec_with_limit() -> Result<()> {
781 let task_ctx = Arc::new(TaskContext::default());
782 let batch = make_partition(7);
783 let schema = batch.schema();
784 let batches = vec![batch.clone(), batch];
785
786 let exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap();
787 assert_eq!(exec.fetch(), None);
788
789 let exec = exec.with_fetch(Some(4)).unwrap();
790 assert_eq!(exec.fetch(), Some(4));
791
792 let mut it = exec.execute(0, task_ctx)?;
793 let mut results = vec![];
794 while let Some(batch) = it.next().await {
795 results.push(batch?);
796 }
797
798 let expected = [
799 "+---+", "| i |", "+---+", "| 0 |", "| 1 |", "| 2 |", "| 3 |", "+---+",
800 ];
801 assert_batches_eq!(expected, &results);
802 Ok(())
803 }
804
805 #[tokio::test]
806 async fn values_empty_case() -> Result<()> {
807 let schema = aggr_test_schema();
808 let empty = MemorySourceConfig::try_new_as_values(schema, vec![]);
809 assert!(empty.is_err());
810 Ok(())
811 }
812
813 #[test]
814 fn new_exec_with_batches() {
815 let batch = make_partition(7);
816 let schema = batch.schema();
817 let batches = vec![batch.clone(), batch];
818 let _exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap();
819 }
820
821 #[test]
822 fn new_exec_with_batches_empty() {
823 let batch = make_partition(7);
824 let schema = batch.schema();
825 let _ = MemorySourceConfig::try_new_from_batches(schema, Vec::new()).unwrap_err();
826 }
827
828 #[test]
829 fn new_exec_with_batches_invalid_schema() {
830 let batch = make_partition(7);
831 let batches = vec![batch.clone(), batch];
832
833 let invalid_schema = Arc::new(Schema::new(vec![
834 Field::new("col0", DataType::UInt32, false),
835 Field::new("col1", DataType::Utf8, false),
836 ]));
837 let _ = MemorySourceConfig::try_new_from_batches(invalid_schema, batches)
838 .unwrap_err();
839 }
840
841 #[test]
843 fn new_exec_with_non_nullable_schema() {
844 let schema = Arc::new(Schema::new(vec![Field::new(
845 "col0",
846 DataType::UInt32,
847 false,
848 )]));
849 let _ = MemorySourceConfig::try_new_as_values(
850 Arc::clone(&schema),
851 vec![vec![lit(1u32)]],
852 )
853 .unwrap();
854 let _ = MemorySourceConfig::try_new_as_values(
856 schema,
857 vec![vec![lit(ScalarValue::UInt32(None))]],
858 )
859 .unwrap_err();
860 }
861
862 #[test]
863 fn values_stats_with_nulls_only() -> Result<()> {
864 let data = vec![
865 vec![lit(ScalarValue::Null)],
866 vec![lit(ScalarValue::Null)],
867 vec![lit(ScalarValue::Null)],
868 ];
869 let rows = data.len();
870 let values = MemorySourceConfig::try_new_as_values(
871 Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)])),
872 data,
873 )?;
874
875 assert_eq!(
876 values.statistics()?,
877 Statistics {
878 num_rows: Precision::Exact(rows),
879 total_byte_size: Precision::Exact(8), column_statistics: vec![ColumnStatistics {
881 null_count: Precision::Exact(rows), distinct_count: Precision::Absent,
883 max_value: Precision::Absent,
884 min_value: Precision::Absent,
885 sum_value: Precision::Absent,
886 },],
887 }
888 );
889
890 Ok(())
891 }
892}