1use std::any::Any;
21use std::fmt;
22use std::fmt::Debug;
23use std::sync::Arc;
24
25use crate::sink::DataSink;
26use crate::source::{DataSource, DataSourceExec};
27use async_trait::async_trait;
28use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
29use datafusion_physical_plan::memory::MemoryStream;
30use datafusion_physical_plan::projection::{
31 all_alias_free_columns, new_projections_for_columns, ProjectionExec,
32};
33use datafusion_physical_plan::{
34 common, ColumnarValue, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
35 PhysicalExpr, PlanProperties, SendableRecordBatchStream, Statistics,
36};
37
38use arrow::array::{RecordBatch, RecordBatchOptions};
39use arrow::datatypes::{Schema, SchemaRef};
40use datafusion_common::{
41 internal_err, plan_err, project_schema, Constraints, Result, ScalarValue,
42};
43use datafusion_execution::TaskContext;
44use datafusion_physical_expr::equivalence::ProjectionMapping;
45use datafusion_physical_expr::expressions::Column;
46use datafusion_physical_expr::utils::collect_columns;
47use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
48use futures::StreamExt;
49use tokio::sync::RwLock;
50
51#[derive(Clone)]
53#[deprecated(
54 since = "46.0.0",
55 note = "use MemorySourceConfig and DataSourceExec instead"
56)]
57pub struct MemoryExec {
58 inner: DataSourceExec,
59 partitions: Vec<Vec<RecordBatch>>,
61 projection: Option<Vec<usize>>,
63 sort_information: Vec<LexOrdering>,
65 show_sizes: bool,
67}
68
69#[allow(unused, deprecated)]
70impl Debug for MemoryExec {
71 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
72 self.inner.fmt_as(DisplayFormatType::Default, f)
73 }
74}
75
76#[allow(unused, deprecated)]
77impl DisplayAs for MemoryExec {
78 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
79 self.inner.fmt_as(t, f)
80 }
81}
82
83#[allow(unused, deprecated)]
84impl ExecutionPlan for MemoryExec {
85 fn name(&self) -> &'static str {
86 "MemoryExec"
87 }
88
89 fn as_any(&self) -> &dyn Any {
91 self
92 }
93
94 fn properties(&self) -> &PlanProperties {
95 self.inner.properties()
96 }
97
98 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
99 vec![]
101 }
102
103 fn with_new_children(
104 self: Arc<Self>,
105 children: Vec<Arc<dyn ExecutionPlan>>,
106 ) -> Result<Arc<dyn ExecutionPlan>> {
107 if children.is_empty() {
109 Ok(self)
110 } else {
111 internal_err!("Children cannot be replaced in {self:?}")
112 }
113 }
114
115 fn execute(
116 &self,
117 partition: usize,
118 context: Arc<TaskContext>,
119 ) -> Result<SendableRecordBatchStream> {
120 self.inner.execute(partition, context)
121 }
122
123 fn statistics(&self) -> Result<Statistics> {
125 self.inner.statistics()
126 }
127
128 fn try_swapping_with_projection(
129 &self,
130 projection: &ProjectionExec,
131 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
132 self.inner.try_swapping_with_projection(projection)
133 }
134}
135
136#[allow(unused, deprecated)]
137impl MemoryExec {
138 pub fn try_new(
141 partitions: &[Vec<RecordBatch>],
142 schema: SchemaRef,
143 projection: Option<Vec<usize>>,
144 ) -> Result<Self> {
145 let source = MemorySourceConfig::try_new(partitions, schema, projection.clone())?;
146 let data_source = DataSourceExec::new(Arc::new(source));
147 Ok(Self {
148 inner: data_source,
149 partitions: partitions.to_vec(),
150 projection,
151 sort_information: vec![],
152 show_sizes: true,
153 })
154 }
155
156 pub fn try_new_as_values(
158 schema: SchemaRef,
159 data: Vec<Vec<Arc<dyn PhysicalExpr>>>,
160 ) -> Result<Self> {
161 if data.is_empty() {
162 return plan_err!("Values list cannot be empty");
163 }
164
165 let n_row = data.len();
166 let n_col = schema.fields().len();
167
168 let placeholder_schema = Arc::new(Schema::empty());
171 let placeholder_batch = RecordBatch::try_new_with_options(
172 Arc::clone(&placeholder_schema),
173 vec![],
174 &RecordBatchOptions::new().with_row_count(Some(1)),
175 )?;
176
177 let arrays = (0..n_col)
179 .map(|j| {
180 (0..n_row)
181 .map(|i| {
182 let expr = &data[i][j];
183 let result = expr.evaluate(&placeholder_batch)?;
184
185 match result {
186 ColumnarValue::Scalar(scalar) => Ok(scalar),
187 ColumnarValue::Array(array) if array.len() == 1 => {
188 ScalarValue::try_from_array(&array, 0)
189 }
190 ColumnarValue::Array(_) => {
191 plan_err!("Cannot have array values in a values list")
192 }
193 }
194 })
195 .collect::<Result<Vec<_>>>()
196 .and_then(ScalarValue::iter_to_array)
197 })
198 .collect::<Result<Vec<_>>>()?;
199
200 let batch = RecordBatch::try_new_with_options(
201 Arc::clone(&schema),
202 arrays,
203 &RecordBatchOptions::new().with_row_count(Some(n_row)),
204 )?;
205
206 let partitions = vec![batch];
207 Self::try_new_from_batches(Arc::clone(&schema), partitions)
208 }
209
210 pub fn try_new_from_batches(
215 schema: SchemaRef,
216 batches: Vec<RecordBatch>,
217 ) -> Result<Self> {
218 if batches.is_empty() {
219 return plan_err!("Values list cannot be empty");
220 }
221
222 for batch in &batches {
223 let batch_schema = batch.schema();
224 if batch_schema != schema {
225 return plan_err!(
226 "Batch has invalid schema. Expected: {}, got: {}",
227 schema,
228 batch_schema
229 );
230 }
231 }
232
233 let partitions = vec![batches];
234 let source = MemorySourceConfig {
235 partitions: partitions.clone(),
236 schema: Arc::clone(&schema),
237 projected_schema: Arc::clone(&schema),
238 projection: None,
239 sort_information: vec![],
240 show_sizes: true,
241 fetch: None,
242 };
243 let data_source = DataSourceExec::new(Arc::new(source));
244 Ok(Self {
245 inner: data_source,
246 partitions,
247 projection: None,
248 sort_information: vec![],
249 show_sizes: true,
250 })
251 }
252
253 fn memory_source_config(&self) -> MemorySourceConfig {
254 self.inner
255 .data_source()
256 .as_any()
257 .downcast_ref::<MemorySourceConfig>()
258 .unwrap()
259 .clone()
260 }
261
262 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
263 self.inner = self.inner.with_constraints(constraints);
264 self
265 }
266
267 pub fn with_show_sizes(mut self, show_sizes: bool) -> Self {
269 let mut memory_source = self.memory_source_config();
270 memory_source.show_sizes = show_sizes;
271 self.show_sizes = show_sizes;
272 self.inner = DataSourceExec::new(Arc::new(memory_source));
273 self
274 }
275
276 pub fn constraints(&self) -> &Constraints {
278 self.properties().equivalence_properties().constraints()
279 }
280
281 pub fn partitions(&self) -> &[Vec<RecordBatch>] {
283 &self.partitions
284 }
285
286 pub fn projection(&self) -> &Option<Vec<usize>> {
288 &self.projection
289 }
290
291 pub fn show_sizes(&self) -> bool {
293 self.show_sizes
294 }
295
296 pub fn sort_information(&self) -> &[LexOrdering] {
298 &self.sort_information
299 }
300
301 pub fn try_with_sort_information(
321 mut self,
322 sort_information: Vec<LexOrdering>,
323 ) -> Result<Self> {
324 self.sort_information = sort_information.clone();
325 let mut memory_source = self.memory_source_config();
326 memory_source = memory_source.try_with_sort_information(sort_information)?;
327 self.inner = DataSourceExec::new(Arc::new(memory_source));
328 Ok(self)
329 }
330
331 pub fn original_schema(&self) -> SchemaRef {
333 Arc::clone(&self.inner.schema())
334 }
335
336 fn compute_properties(
338 schema: SchemaRef,
339 orderings: &[LexOrdering],
340 constraints: Constraints,
341 partitions: &[Vec<RecordBatch>],
342 ) -> PlanProperties {
343 PlanProperties::new(
344 EquivalenceProperties::new_with_orderings(schema, orderings)
345 .with_constraints(constraints),
346 Partitioning::UnknownPartitioning(partitions.len()),
347 EmissionType::Incremental,
348 Boundedness::Bounded,
349 )
350 }
351}
352
353#[derive(Clone, Debug)]
355pub struct MemorySourceConfig {
356 partitions: Vec<Vec<RecordBatch>>,
358 schema: SchemaRef,
360 projected_schema: SchemaRef,
362 projection: Option<Vec<usize>>,
364 sort_information: Vec<LexOrdering>,
366 show_sizes: bool,
368 fetch: Option<usize>,
371}
372
373impl DataSource for MemorySourceConfig {
374 fn open(
375 &self,
376 partition: usize,
377 _context: Arc<TaskContext>,
378 ) -> Result<SendableRecordBatchStream> {
379 Ok(Box::pin(
380 MemoryStream::try_new(
381 self.partitions[partition].clone(),
382 Arc::clone(&self.projected_schema),
383 self.projection.clone(),
384 )?
385 .with_fetch(self.fetch),
386 ))
387 }
388
389 fn as_any(&self) -> &dyn Any {
390 self
391 }
392
393 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
394 match t {
395 DisplayFormatType::Default | DisplayFormatType::Verbose => {
396 let partition_sizes: Vec<_> =
397 self.partitions.iter().map(|b| b.len()).collect();
398
399 let output_ordering = self
400 .sort_information
401 .first()
402 .map(|output_ordering| {
403 format!(", output_ordering={}", output_ordering)
404 })
405 .unwrap_or_default();
406
407 let eq_properties = self.eq_properties();
408 let constraints = eq_properties.constraints();
409 let constraints = if constraints.is_empty() {
410 String::new()
411 } else {
412 format!(", {}", constraints)
413 };
414
415 let limit = self
416 .fetch
417 .map_or(String::new(), |limit| format!(", fetch={}", limit));
418 if self.show_sizes {
419 write!(
420 f,
421 "partitions={}, partition_sizes={partition_sizes:?}{limit}{output_ordering}{constraints}",
422 partition_sizes.len(),
423 )
424 } else {
425 write!(
426 f,
427 "partitions={}{limit}{output_ordering}{constraints}",
428 partition_sizes.len(),
429 )
430 }
431 }
432 DisplayFormatType::TreeRender => {
433 let total_rows = self.partitions.iter().map(|b| b.len()).sum::<usize>();
434 let total_bytes: usize = self
435 .partitions
436 .iter()
437 .flatten()
438 .map(|batch| batch.get_array_memory_size())
439 .sum();
440 writeln!(f, "format=memory")?;
441 writeln!(f, "rows={total_rows}")?;
442 writeln!(f, "bytes={total_bytes}")?;
443 Ok(())
444 }
445 }
446 }
447
448 fn output_partitioning(&self) -> Partitioning {
449 Partitioning::UnknownPartitioning(self.partitions.len())
450 }
451
452 fn eq_properties(&self) -> EquivalenceProperties {
453 EquivalenceProperties::new_with_orderings(
454 Arc::clone(&self.projected_schema),
455 self.sort_information.as_slice(),
456 )
457 }
458
459 fn statistics(&self) -> Result<Statistics> {
460 Ok(common::compute_record_batch_statistics(
461 &self.partitions,
462 &self.schema,
463 self.projection.clone(),
464 ))
465 }
466
467 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
468 let source = self.clone();
469 Some(Arc::new(source.with_limit(limit)))
470 }
471
472 fn fetch(&self) -> Option<usize> {
473 self.fetch
474 }
475
476 fn try_swapping_with_projection(
477 &self,
478 projection: &ProjectionExec,
479 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
480 all_alias_free_columns(projection.expr())
483 .then(|| {
484 let all_projections = (0..self.schema.fields().len()).collect();
485 let new_projections = new_projections_for_columns(
486 projection,
487 self.projection().as_ref().unwrap_or(&all_projections),
488 );
489
490 MemorySourceConfig::try_new_exec(
491 self.partitions(),
492 self.original_schema(),
493 Some(new_projections),
494 )
495 .map(|e| e as _)
496 })
497 .transpose()
498 }
499}
500
501impl MemorySourceConfig {
502 pub fn try_new(
505 partitions: &[Vec<RecordBatch>],
506 schema: SchemaRef,
507 projection: Option<Vec<usize>>,
508 ) -> Result<Self> {
509 let projected_schema = project_schema(&schema, projection.as_ref())?;
510 Ok(Self {
511 partitions: partitions.to_vec(),
512 schema,
513 projected_schema,
514 projection,
515 sort_information: vec![],
516 show_sizes: true,
517 fetch: None,
518 })
519 }
520
521 pub fn try_new_exec(
524 partitions: &[Vec<RecordBatch>],
525 schema: SchemaRef,
526 projection: Option<Vec<usize>>,
527 ) -> Result<Arc<DataSourceExec>> {
528 let source = Self::try_new(partitions, schema, projection)?;
529 Ok(DataSourceExec::from_data_source(source))
530 }
531
532 pub fn try_new_as_values(
534 schema: SchemaRef,
535 data: Vec<Vec<Arc<dyn PhysicalExpr>>>,
536 ) -> Result<Arc<DataSourceExec>> {
537 if data.is_empty() {
538 return plan_err!("Values list cannot be empty");
539 }
540
541 let n_row = data.len();
542 let n_col = schema.fields().len();
543
544 let placeholder_schema = Arc::new(Schema::empty());
547 let placeholder_batch = RecordBatch::try_new_with_options(
548 Arc::clone(&placeholder_schema),
549 vec![],
550 &RecordBatchOptions::new().with_row_count(Some(1)),
551 )?;
552
553 let arrays = (0..n_col)
555 .map(|j| {
556 (0..n_row)
557 .map(|i| {
558 let expr = &data[i][j];
559 let result = expr.evaluate(&placeholder_batch)?;
560
561 match result {
562 ColumnarValue::Scalar(scalar) => Ok(scalar),
563 ColumnarValue::Array(array) if array.len() == 1 => {
564 ScalarValue::try_from_array(&array, 0)
565 }
566 ColumnarValue::Array(_) => {
567 plan_err!("Cannot have array values in a values list")
568 }
569 }
570 })
571 .collect::<Result<Vec<_>>>()
572 .and_then(ScalarValue::iter_to_array)
573 })
574 .collect::<Result<Vec<_>>>()?;
575
576 let batch = RecordBatch::try_new_with_options(
577 Arc::clone(&schema),
578 arrays,
579 &RecordBatchOptions::new().with_row_count(Some(n_row)),
580 )?;
581
582 let partitions = vec![batch];
583 Self::try_new_from_batches(Arc::clone(&schema), partitions)
584 }
585
586 pub fn try_new_from_batches(
591 schema: SchemaRef,
592 batches: Vec<RecordBatch>,
593 ) -> Result<Arc<DataSourceExec>> {
594 if batches.is_empty() {
595 return plan_err!("Values list cannot be empty");
596 }
597
598 for batch in &batches {
599 let batch_schema = batch.schema();
600 if batch_schema != schema {
601 return plan_err!(
602 "Batch has invalid schema. Expected: {}, got: {}",
603 schema,
604 batch_schema
605 );
606 }
607 }
608
609 let partitions = vec![batches];
610 let source = Self {
611 partitions,
612 schema: Arc::clone(&schema),
613 projected_schema: Arc::clone(&schema),
614 projection: None,
615 sort_information: vec![],
616 show_sizes: true,
617 fetch: None,
618 };
619 Ok(DataSourceExec::from_data_source(source))
620 }
621
622 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
624 self.fetch = limit;
625 self
626 }
627
628 pub fn with_show_sizes(mut self, show_sizes: bool) -> Self {
630 self.show_sizes = show_sizes;
631 self
632 }
633
634 pub fn partitions(&self) -> &[Vec<RecordBatch>] {
636 &self.partitions
637 }
638
639 pub fn projection(&self) -> &Option<Vec<usize>> {
641 &self.projection
642 }
643
644 pub fn show_sizes(&self) -> bool {
646 self.show_sizes
647 }
648
649 pub fn sort_information(&self) -> &[LexOrdering] {
651 &self.sort_information
652 }
653
654 pub fn try_with_sort_information(
674 mut self,
675 mut sort_information: Vec<LexOrdering>,
676 ) -> Result<Self> {
677 let fields = self.schema.fields();
679 let ambiguous_column = sort_information
680 .iter()
681 .flat_map(|ordering| ordering.clone())
682 .flat_map(|expr| collect_columns(&expr.expr))
683 .find(|col| {
684 fields
685 .get(col.index())
686 .map(|field| field.name() != col.name())
687 .unwrap_or(true)
688 });
689 if let Some(col) = ambiguous_column {
690 return internal_err!(
691 "Column {:?} is not found in the original schema of the MemorySourceConfig",
692 col
693 );
694 }
695
696 if let Some(projection) = &self.projection {
698 let base_eqp = EquivalenceProperties::new_with_orderings(
699 self.original_schema(),
700 &sort_information,
701 );
702 let proj_exprs = projection
703 .iter()
704 .map(|idx| {
705 let base_schema = self.original_schema();
706 let name = base_schema.field(*idx).name();
707 (Arc::new(Column::new(name, *idx)) as _, name.to_string())
708 })
709 .collect::<Vec<_>>();
710 let projection_mapping =
711 ProjectionMapping::try_new(&proj_exprs, &self.original_schema())?;
712 sort_information = base_eqp
713 .project(&projection_mapping, Arc::clone(&self.projected_schema))
714 .into_oeq_class()
715 .into_inner();
716 }
717
718 self.sort_information = sort_information;
719 Ok(self)
720 }
721
722 pub fn original_schema(&self) -> SchemaRef {
724 Arc::clone(&self.schema)
725 }
726}
727
728pub type PartitionData = Arc<RwLock<Vec<RecordBatch>>>;
730
731pub struct MemSink {
735 batches: Vec<PartitionData>,
737 schema: SchemaRef,
738}
739
740impl Debug for MemSink {
741 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
742 f.debug_struct("MemSink")
743 .field("num_partitions", &self.batches.len())
744 .finish()
745 }
746}
747
748impl DisplayAs for MemSink {
749 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
750 match t {
751 DisplayFormatType::Default | DisplayFormatType::Verbose => {
752 let partition_count = self.batches.len();
753 write!(f, "MemoryTable (partitions={partition_count})")
754 }
755 DisplayFormatType::TreeRender => {
756 write!(f, "")
758 }
759 }
760 }
761}
762
763impl MemSink {
764 pub fn try_new(batches: Vec<PartitionData>, schema: SchemaRef) -> Result<Self> {
768 if batches.is_empty() {
769 return plan_err!("Cannot insert into MemTable with zero partitions");
770 }
771 Ok(Self { batches, schema })
772 }
773}
774
775#[async_trait]
776impl DataSink for MemSink {
777 fn as_any(&self) -> &dyn Any {
778 self
779 }
780
781 fn schema(&self) -> &SchemaRef {
782 &self.schema
783 }
784
785 async fn write_all(
786 &self,
787 mut data: SendableRecordBatchStream,
788 _context: &Arc<TaskContext>,
789 ) -> Result<u64> {
790 let num_partitions = self.batches.len();
791
792 let mut new_batches = vec![vec![]; num_partitions];
795 let mut i = 0;
796 let mut row_count = 0;
797 while let Some(batch) = data.next().await.transpose()? {
798 row_count += batch.num_rows();
799 new_batches[i].push(batch);
800 i = (i + 1) % num_partitions;
801 }
802
803 for (target, mut batches) in self.batches.iter().zip(new_batches.into_iter()) {
805 target.write().await.append(&mut batches);
807 }
808
809 Ok(row_count as u64)
810 }
811}
812
813#[cfg(test)]
814mod memory_source_tests {
815 use std::sync::Arc;
816
817 use crate::memory::MemorySourceConfig;
818 use crate::source::DataSourceExec;
819 use datafusion_physical_plan::ExecutionPlan;
820
821 use arrow::compute::SortOptions;
822 use arrow::datatypes::{DataType, Field, Schema};
823 use datafusion_physical_expr::expressions::col;
824 use datafusion_physical_expr::PhysicalSortExpr;
825 use datafusion_physical_expr_common::sort_expr::LexOrdering;
826
827 #[test]
828 fn test_memory_order_eq() -> datafusion_common::Result<()> {
829 let schema = Arc::new(Schema::new(vec![
830 Field::new("a", DataType::Int64, false),
831 Field::new("b", DataType::Int64, false),
832 Field::new("c", DataType::Int64, false),
833 ]));
834 let sort1 = LexOrdering::new(vec![
835 PhysicalSortExpr {
836 expr: col("a", &schema)?,
837 options: SortOptions::default(),
838 },
839 PhysicalSortExpr {
840 expr: col("b", &schema)?,
841 options: SortOptions::default(),
842 },
843 ]);
844 let sort2 = LexOrdering::new(vec![PhysicalSortExpr {
845 expr: col("c", &schema)?,
846 options: SortOptions::default(),
847 }]);
848 let mut expected_output_order = LexOrdering::default();
849 expected_output_order.extend(sort1.clone());
850 expected_output_order.extend(sort2.clone());
851
852 let sort_information = vec![sort1.clone(), sort2.clone()];
853 let mem_exec = DataSourceExec::from_data_source(
854 MemorySourceConfig::try_new(&[vec![]], schema, None)?
855 .try_with_sort_information(sort_information)?,
856 );
857
858 assert_eq!(
859 mem_exec.properties().output_ordering().unwrap(),
860 &expected_output_order
861 );
862 let eq_properties = mem_exec.properties().equivalence_properties();
863 assert!(eq_properties.oeq_class().contains(&sort1));
864 assert!(eq_properties.oeq_class().contains(&sort2));
865 Ok(())
866 }
867}
868
869#[cfg(test)]
870mod tests {
871 use crate::tests::{aggr_test_schema, make_partition};
872
873 use super::*;
874
875 use datafusion_physical_plan::expressions::lit;
876
877 use arrow::datatypes::{DataType, Field};
878 use datafusion_common::assert_batches_eq;
879 use datafusion_common::stats::{ColumnStatistics, Precision};
880 use futures::StreamExt;
881
882 #[tokio::test]
883 async fn exec_with_limit() -> Result<()> {
884 let task_ctx = Arc::new(TaskContext::default());
885 let batch = make_partition(7);
886 let schema = batch.schema();
887 let batches = vec![batch.clone(), batch];
888
889 let exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap();
890 assert_eq!(exec.fetch(), None);
891
892 let exec = exec.with_fetch(Some(4)).unwrap();
893 assert_eq!(exec.fetch(), Some(4));
894
895 let mut it = exec.execute(0, task_ctx)?;
896 let mut results = vec![];
897 while let Some(batch) = it.next().await {
898 results.push(batch?);
899 }
900
901 let expected = [
902 "+---+", "| i |", "+---+", "| 0 |", "| 1 |", "| 2 |", "| 3 |", "+---+",
903 ];
904 assert_batches_eq!(expected, &results);
905 Ok(())
906 }
907
908 #[tokio::test]
909 async fn values_empty_case() -> Result<()> {
910 let schema = aggr_test_schema();
911 let empty = MemorySourceConfig::try_new_as_values(schema, vec![]);
912 assert!(empty.is_err());
913 Ok(())
914 }
915
916 #[test]
917 fn new_exec_with_batches() {
918 let batch = make_partition(7);
919 let schema = batch.schema();
920 let batches = vec![batch.clone(), batch];
921 let _exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap();
922 }
923
924 #[test]
925 fn new_exec_with_batches_empty() {
926 let batch = make_partition(7);
927 let schema = batch.schema();
928 let _ = MemorySourceConfig::try_new_from_batches(schema, Vec::new()).unwrap_err();
929 }
930
931 #[test]
932 fn new_exec_with_batches_invalid_schema() {
933 let batch = make_partition(7);
934 let batches = vec![batch.clone(), batch];
935
936 let invalid_schema = Arc::new(Schema::new(vec![
937 Field::new("col0", DataType::UInt32, false),
938 Field::new("col1", DataType::Utf8, false),
939 ]));
940 let _ = MemorySourceConfig::try_new_from_batches(invalid_schema, batches)
941 .unwrap_err();
942 }
943
944 #[test]
946 fn new_exec_with_non_nullable_schema() {
947 let schema = Arc::new(Schema::new(vec![Field::new(
948 "col0",
949 DataType::UInt32,
950 false,
951 )]));
952 let _ = MemorySourceConfig::try_new_as_values(
953 Arc::clone(&schema),
954 vec![vec![lit(1u32)]],
955 )
956 .unwrap();
957 let _ = MemorySourceConfig::try_new_as_values(
959 schema,
960 vec![vec![lit(ScalarValue::UInt32(None))]],
961 )
962 .unwrap_err();
963 }
964
965 #[test]
966 fn values_stats_with_nulls_only() -> Result<()> {
967 let data = vec![
968 vec![lit(ScalarValue::Null)],
969 vec![lit(ScalarValue::Null)],
970 vec![lit(ScalarValue::Null)],
971 ];
972 let rows = data.len();
973 let values = MemorySourceConfig::try_new_as_values(
974 Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)])),
975 data,
976 )?;
977
978 assert_eq!(
979 values.statistics()?,
980 Statistics {
981 num_rows: Precision::Exact(rows),
982 total_byte_size: Precision::Exact(8), column_statistics: vec![ColumnStatistics {
984 null_count: Precision::Exact(rows), distinct_count: Precision::Absent,
986 max_value: Precision::Absent,
987 min_value: Precision::Absent,
988 sum_value: Precision::Absent,
989 },],
990 }
991 );
992
993 Ok(())
994 }
995}