1use crate::file_groups::FileGroup;
22#[allow(unused_imports)]
23use crate::schema_adapter::SchemaAdapterFactory;
24use crate::{
25 display::FileGroupsDisplay, file::FileSource,
26 file_compression_type::FileCompressionType, file_stream::FileStream,
27 source::DataSource, statistics::MinMaxStatistics, PartitionedFile, TableSchema,
28};
29use arrow::datatypes::FieldRef;
30use arrow::{
31 array::{
32 ArrayData, ArrayRef, BufferBuilder, DictionaryArray, RecordBatch,
33 RecordBatchOptions,
34 },
35 buffer::Buffer,
36 datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef, UInt16Type},
37};
38use datafusion_common::config::ConfigOptions;
39use datafusion_common::{
40 exec_datafusion_err, exec_err, internal_datafusion_err, ColumnStatistics,
41 Constraints, Result, ScalarValue, Statistics,
42};
43use datafusion_execution::{
44 object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext,
45};
46use datafusion_expr::Operator;
47use datafusion_physical_expr::expressions::{BinaryExpr, Column};
48use datafusion_physical_expr::projection::ProjectionExprs;
49use datafusion_physical_expr::utils::reassign_expr_columns;
50use datafusion_physical_expr::{split_conjunction, EquivalenceProperties, Partitioning};
51use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
52use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
53use datafusion_physical_expr_common::sort_expr::LexOrdering;
54use datafusion_physical_plan::projection::{
55 all_alias_free_columns, new_projections_for_columns, ProjectionExpr,
56};
57use datafusion_physical_plan::{
58 display::{display_orderings, ProjectSchemaDisplay},
59 filter_pushdown::FilterPushdownPropagation,
60 metrics::ExecutionPlanMetricsSet,
61 DisplayAs, DisplayFormatType,
62};
63use std::{
64 any::Any, borrow::Cow, collections::HashMap, fmt::Debug, fmt::Formatter,
65 fmt::Result as FmtResult, marker::PhantomData, sync::Arc,
66};
67
68use datafusion_physical_expr::equivalence::project_orderings;
69use datafusion_physical_plan::coop::cooperative;
70use datafusion_physical_plan::execution_plan::SchedulingType;
71use log::{debug, warn};
72
73#[derive(Clone)]
146pub struct FileScanConfig {
147 pub object_store_url: ObjectStoreUrl,
159 pub table_schema: TableSchema,
169 pub file_groups: Vec<FileGroup>,
179 pub constraints: Constraints,
181 pub projection_exprs: Option<ProjectionExprs>,
187 pub limit: Option<usize>,
190 pub output_ordering: Vec<LexOrdering>,
192 pub file_compression_type: FileCompressionType,
194 pub new_lines_in_values: bool,
196 pub file_source: Arc<dyn FileSource>,
198 pub batch_size: Option<usize>,
201 pub expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
204}
205
206#[derive(Clone)]
256pub struct FileScanConfigBuilder {
257 object_store_url: ObjectStoreUrl,
258 table_schema: TableSchema,
268 file_source: Arc<dyn FileSource>,
269 limit: Option<usize>,
270 projection_indices: Option<Vec<usize>>,
271 constraints: Option<Constraints>,
272 file_groups: Vec<FileGroup>,
273 statistics: Option<Statistics>,
274 output_ordering: Vec<LexOrdering>,
275 file_compression_type: Option<FileCompressionType>,
276 new_lines_in_values: Option<bool>,
277 batch_size: Option<usize>,
278 expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
279}
280
281impl FileScanConfigBuilder {
282 pub fn new(
289 object_store_url: ObjectStoreUrl,
290 file_schema: SchemaRef,
291 file_source: Arc<dyn FileSource>,
292 ) -> Self {
293 Self {
294 object_store_url,
295 table_schema: TableSchema::from_file_schema(file_schema),
296 file_source,
297 file_groups: vec![],
298 statistics: None,
299 output_ordering: vec![],
300 file_compression_type: None,
301 new_lines_in_values: None,
302 limit: None,
303 projection_indices: None,
304 constraints: None,
305 batch_size: None,
306 expr_adapter_factory: None,
307 }
308 }
309
310 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
313 self.limit = limit;
314 self
315 }
316
317 pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
322 self.file_source = file_source;
323 self
324 }
325
326 pub fn table_schema(&self) -> &SchemaRef {
327 self.table_schema.table_schema()
328 }
329
330 #[deprecated(since = "51.0.0", note = "Use with_projection_indices instead")]
336 pub fn with_projection(self, indices: Option<Vec<usize>>) -> Self {
337 self.with_projection_indices(indices)
338 }
339
340 pub fn with_projection_indices(mut self, indices: Option<Vec<usize>>) -> Self {
344 self.projection_indices = indices;
345 self
346 }
347
348 pub fn with_table_partition_cols(mut self, table_partition_cols: Vec<Field>) -> Self {
350 let table_partition_cols: Vec<FieldRef> = table_partition_cols
351 .into_iter()
352 .map(|f| Arc::new(f) as FieldRef)
353 .collect();
354 self.table_schema = self
355 .table_schema
356 .with_table_partition_cols(table_partition_cols);
357 self
358 }
359
360 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
362 self.constraints = Some(constraints);
363 self
364 }
365
366 pub fn with_statistics(mut self, statistics: Statistics) -> Self {
369 self.statistics = Some(statistics);
370 self
371 }
372
373 pub fn with_file_groups(mut self, file_groups: Vec<FileGroup>) -> Self {
383 self.file_groups = file_groups;
384 self
385 }
386
387 pub fn with_file_group(mut self, file_group: FileGroup) -> Self {
391 self.file_groups.push(file_group);
392 self
393 }
394
395 pub fn with_file(self, partitioned_file: PartitionedFile) -> Self {
399 self.with_file_group(FileGroup::new(vec![partitioned_file]))
400 }
401
402 pub fn with_output_ordering(mut self, output_ordering: Vec<LexOrdering>) -> Self {
404 self.output_ordering = output_ordering;
405 self
406 }
407
408 pub fn with_file_compression_type(
410 mut self,
411 file_compression_type: FileCompressionType,
412 ) -> Self {
413 self.file_compression_type = Some(file_compression_type);
414 self
415 }
416
417 pub fn with_newlines_in_values(mut self, new_lines_in_values: bool) -> Self {
423 self.new_lines_in_values = Some(new_lines_in_values);
424 self
425 }
426
427 pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
429 self.batch_size = batch_size;
430 self
431 }
432
433 pub fn with_expr_adapter(
440 mut self,
441 expr_adapter: Option<Arc<dyn PhysicalExprAdapterFactory>>,
442 ) -> Self {
443 self.expr_adapter_factory = expr_adapter;
444 self
445 }
446
447 pub fn build(self) -> FileScanConfig {
452 let Self {
453 object_store_url,
454 table_schema,
455 file_source,
456 limit,
457 projection_indices,
458 constraints,
459 file_groups,
460 statistics,
461 output_ordering,
462 file_compression_type,
463 new_lines_in_values,
464 batch_size,
465 expr_adapter_factory: expr_adapter,
466 } = self;
467
468 let constraints = constraints.unwrap_or_default();
469 let statistics = statistics
470 .unwrap_or_else(|| Statistics::new_unknown(table_schema.file_schema()));
471
472 let file_source = file_source
473 .with_statistics(statistics.clone())
474 .with_schema(table_schema.clone());
475 let file_compression_type =
476 file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED);
477 let new_lines_in_values = new_lines_in_values.unwrap_or(false);
478
479 let projection_exprs = projection_indices.map(|indices| {
482 ProjectionExprs::from_indices(&indices, table_schema.table_schema())
483 });
484
485 FileScanConfig {
486 object_store_url,
487 table_schema,
488 file_source,
489 limit,
490 projection_exprs,
491 constraints,
492 file_groups,
493 output_ordering,
494 file_compression_type,
495 new_lines_in_values,
496 batch_size,
497 expr_adapter_factory: expr_adapter,
498 }
499 }
500}
501
502impl From<FileScanConfig> for FileScanConfigBuilder {
503 fn from(config: FileScanConfig) -> Self {
504 Self {
505 object_store_url: config.object_store_url,
506 table_schema: config.table_schema,
507 file_source: Arc::<dyn FileSource>::clone(&config.file_source),
508 file_groups: config.file_groups,
509 statistics: config.file_source.statistics().ok(),
510 output_ordering: config.output_ordering,
511 file_compression_type: Some(config.file_compression_type),
512 new_lines_in_values: Some(config.new_lines_in_values),
513 limit: config.limit,
514 projection_indices: config
515 .projection_exprs
516 .map(|p| p.ordered_column_indices()),
517 constraints: Some(config.constraints),
518 batch_size: config.batch_size,
519 expr_adapter_factory: config.expr_adapter_factory,
520 }
521 }
522}
523
524impl DataSource for FileScanConfig {
525 fn open(
526 &self,
527 partition: usize,
528 context: Arc<TaskContext>,
529 ) -> Result<SendableRecordBatchStream> {
530 let object_store = context.runtime_env().object_store(&self.object_store_url)?;
531 let batch_size = self
532 .batch_size
533 .unwrap_or_else(|| context.session_config().batch_size());
534
535 let source = self
536 .file_source
537 .with_batch_size(batch_size)
538 .with_projection(self);
539
540 let opener = source.create_file_opener(object_store, self, partition);
541
542 let stream = FileStream::new(self, partition, opener, source.metrics())?;
543 Ok(Box::pin(cooperative(stream)))
544 }
545
546 fn as_any(&self) -> &dyn Any {
547 self
548 }
549
550 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
551 match t {
552 DisplayFormatType::Default | DisplayFormatType::Verbose => {
553 let schema = self.projected_schema();
554 let orderings = get_projected_output_ordering(self, &schema);
555
556 write!(f, "file_groups=")?;
557 FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
558
559 if !schema.fields().is_empty() {
560 write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
561 }
562
563 if let Some(limit) = self.limit {
564 write!(f, ", limit={limit}")?;
565 }
566
567 display_orderings(f, &orderings)?;
568
569 if !self.constraints.is_empty() {
570 write!(f, ", {}", self.constraints)?;
571 }
572
573 self.fmt_file_source(t, f)
574 }
575 DisplayFormatType::TreeRender => {
576 writeln!(f, "format={}", self.file_source.file_type())?;
577 self.file_source.fmt_extra(t, f)?;
578 let num_files = self.file_groups.iter().map(|fg| fg.len()).sum::<usize>();
579 writeln!(f, "files={num_files}")?;
580 Ok(())
581 }
582 }
583 }
584
585 fn repartitioned(
587 &self,
588 target_partitions: usize,
589 repartition_file_min_size: usize,
590 output_ordering: Option<LexOrdering>,
591 ) -> Result<Option<Arc<dyn DataSource>>> {
592 let source = self.file_source.repartitioned(
593 target_partitions,
594 repartition_file_min_size,
595 output_ordering,
596 self,
597 )?;
598
599 Ok(source.map(|s| Arc::new(s) as _))
600 }
601
602 fn output_partitioning(&self) -> Partitioning {
603 Partitioning::UnknownPartitioning(self.file_groups.len())
604 }
605
606 fn eq_properties(&self) -> EquivalenceProperties {
607 let (schema, constraints, _, orderings) = self.project();
608 let mut eq_properties =
609 EquivalenceProperties::new_with_orderings(Arc::clone(&schema), orderings)
610 .with_constraints(constraints);
611 if let Some(filter) = self.file_source.filter() {
612 match Self::add_filter_equivalence_info(filter, &mut eq_properties, &schema) {
615 Ok(()) => {}
616 Err(e) => {
617 warn!("Failed to add filter equivalence info: {e}");
618 #[cfg(debug_assertions)]
619 panic!("Failed to add filter equivalence info: {e}");
620 }
621 }
622 }
623 eq_properties
624 }
625
626 fn scheduling_type(&self) -> SchedulingType {
627 SchedulingType::Cooperative
628 }
629
630 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
631 if let Some(partition) = partition {
632 if let Some(file_group) = self.file_groups.get(partition) {
634 if let Some(stat) = file_group.file_statistics(None) {
635 let table_cols_stats = self
637 .projection_indices()
638 .into_iter()
639 .map(|idx| {
640 if idx < self.file_schema().fields().len() {
641 stat.column_statistics[idx].clone()
642 } else {
643 ColumnStatistics::new_unknown()
646 }
647 })
648 .collect();
649
650 return Ok(Statistics {
651 num_rows: stat.num_rows,
652 total_byte_size: stat.total_byte_size,
653 column_statistics: table_cols_stats,
654 });
655 }
656 }
657 Ok(Statistics::new_unknown(&self.projected_schema()))
659 } else {
660 Ok(self.projected_stats())
662 }
663 }
664
665 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
666 let source = FileScanConfigBuilder::from(self.clone())
667 .with_limit(limit)
668 .build();
669 Some(Arc::new(source))
670 }
671
672 fn fetch(&self) -> Option<usize> {
673 self.limit
674 }
675
676 fn metrics(&self) -> ExecutionPlanMetricsSet {
677 self.file_source.metrics().clone()
678 }
679
680 fn try_swapping_with_projection(
681 &self,
682 projection: &[ProjectionExpr],
683 ) -> Result<Option<Arc<dyn DataSource>>> {
684 let partitioned_columns_in_proj = projection.iter().any(|proj_expr| {
688 proj_expr
689 .expr
690 .as_any()
691 .downcast_ref::<Column>()
692 .map(|expr| expr.index() >= self.file_schema().fields().len())
693 .unwrap_or(false)
694 });
695
696 let no_aliases = all_alias_free_columns(projection);
698
699 Ok((no_aliases && !partitioned_columns_in_proj).then(|| {
700 let file_scan = self.clone();
701 let source = Arc::clone(&file_scan.file_source);
702 let new_projections = new_projections_for_columns(
703 projection,
704 &file_scan
705 .projection_exprs
706 .as_ref()
707 .map(|p| p.ordered_column_indices())
708 .unwrap_or_else(|| (0..self.file_schema().fields().len()).collect()),
709 );
710
711 Arc::new(
712 FileScanConfigBuilder::from(file_scan)
713 .with_projection_indices(Some(new_projections))
715 .with_source(source)
716 .build(),
717 ) as _
718 }))
719 }
720
721 fn try_pushdown_filters(
722 &self,
723 filters: Vec<Arc<dyn PhysicalExpr>>,
724 config: &ConfigOptions,
725 ) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
726 let result = self.file_source.try_pushdown_filters(filters, config)?;
727 match result.updated_node {
728 Some(new_file_source) => {
729 let file_scan_config = FileScanConfigBuilder::from(self.clone())
730 .with_source(new_file_source)
731 .build();
732 Ok(FilterPushdownPropagation {
733 filters: result.filters,
734 updated_node: Some(Arc::new(file_scan_config) as _),
735 })
736 }
737 None => {
738 Ok(FilterPushdownPropagation {
740 filters: result.filters,
741 updated_node: None,
742 })
743 }
744 }
745 }
746}
747
748impl FileScanConfig {
749 pub fn file_schema(&self) -> &SchemaRef {
751 self.table_schema.file_schema()
752 }
753
754 pub fn table_partition_cols(&self) -> &Vec<FieldRef> {
756 self.table_schema.table_partition_cols()
757 }
758
759 fn projection_indices(&self) -> Vec<usize> {
760 match &self.projection_exprs {
761 Some(proj) => proj.ordered_column_indices(),
762 None => (0..self.file_schema().fields().len()
763 + self.table_partition_cols().len())
764 .collect(),
765 }
766 }
767
768 pub fn projected_stats(&self) -> Statistics {
769 let statistics = self.file_source.statistics().unwrap();
770
771 let table_cols_stats = self
772 .projection_indices()
773 .into_iter()
774 .map(|idx| {
775 if idx < self.file_schema().fields().len() {
776 statistics.column_statistics[idx].clone()
777 } else {
778 ColumnStatistics::new_unknown()
780 }
781 })
782 .collect();
783
784 Statistics {
785 num_rows: statistics.num_rows,
786 total_byte_size: statistics.total_byte_size,
788 column_statistics: table_cols_stats,
789 }
790 }
791
792 pub fn projected_schema(&self) -> Arc<Schema> {
793 let table_fields: Vec<_> = self
794 .projection_indices()
795 .into_iter()
796 .map(|idx| {
797 if idx < self.file_schema().fields().len() {
798 self.file_schema().field(idx).clone()
799 } else {
800 let partition_idx = idx - self.file_schema().fields().len();
801 Arc::unwrap_or_clone(Arc::clone(
802 &self.table_partition_cols()[partition_idx],
803 ))
804 }
805 })
806 .collect();
807
808 Arc::new(Schema::new_with_metadata(
809 table_fields,
810 self.file_schema().metadata().clone(),
811 ))
812 }
813
814 fn add_filter_equivalence_info(
815 filter: Arc<dyn PhysicalExpr>,
816 eq_properties: &mut EquivalenceProperties,
817 schema: &Schema,
818 ) -> Result<()> {
819 let equal_pairs = split_conjunction(&filter).into_iter().filter_map(|expr| {
821 reassign_expr_columns(Arc::clone(expr), schema)
824 .ok()
825 .and_then(|expr| match expr.as_any().downcast_ref::<BinaryExpr>() {
826 Some(expr) if expr.op() == &Operator::Eq => {
827 Some((Arc::clone(expr.left()), Arc::clone(expr.right())))
828 }
829 _ => None,
830 })
831 });
832
833 for (lhs, rhs) in equal_pairs {
834 eq_properties.add_equal_conditions(lhs, rhs)?
835 }
836
837 Ok(())
838 }
839
840 pub fn projected_constraints(&self) -> Constraints {
841 let indexes = self.projection_indices();
842 self.constraints.project(&indexes).unwrap_or_default()
843 }
844
845 pub fn newlines_in_values(&self) -> bool {
853 self.new_lines_in_values
854 }
855
856 pub fn project(&self) -> (SchemaRef, Constraints, Statistics, Vec<LexOrdering>) {
858 if self.projection_exprs.is_none() && self.table_partition_cols().is_empty() {
859 return (
860 Arc::clone(self.file_schema()),
861 self.constraints.clone(),
862 self.file_source.statistics().unwrap().clone(),
863 self.output_ordering.clone(),
864 );
865 }
866
867 let schema = self.projected_schema();
868 let constraints = self.projected_constraints();
869 let stats = self.projected_stats();
870
871 let output_ordering = get_projected_output_ordering(self, &schema);
872
873 (schema, constraints, stats, output_ordering)
874 }
875
876 pub fn projected_file_column_names(&self) -> Option<Vec<String>> {
877 let fields = self.file_schema().fields();
878
879 self.projection_exprs.as_ref().map(|p| {
880 let column_indices = p.ordered_column_indices();
881
882 column_indices
883 .iter()
884 .filter(|&&col_i| col_i < fields.len())
885 .map(|&col_i| self.file_schema().field(col_i).name())
886 .cloned()
887 .collect::<Vec<_>>()
888 })
889 }
890
891 pub fn projected_file_schema(&self) -> SchemaRef {
893 let fields = self.file_column_projection_indices().map(|indices| {
894 indices
895 .iter()
896 .map(|col_idx| self.file_schema().field(*col_idx))
897 .cloned()
898 .collect::<Vec<_>>()
899 });
900
901 fields.map_or_else(
902 || Arc::clone(self.file_schema()),
903 |f| {
904 Arc::new(Schema::new_with_metadata(
905 f,
906 self.file_schema().metadata.clone(),
907 ))
908 },
909 )
910 }
911
912 pub fn file_column_projection_indices(&self) -> Option<Vec<usize>> {
913 self.projection_exprs.as_ref().map(|p| {
914 p.ordered_column_indices()
915 .into_iter()
916 .filter(|&i| i < self.file_schema().fields().len())
917 .collect::<Vec<_>>()
918 })
919 }
920
921 pub fn split_groups_by_statistics_with_target_partitions(
943 table_schema: &SchemaRef,
944 file_groups: &[FileGroup],
945 sort_order: &LexOrdering,
946 target_partitions: usize,
947 ) -> Result<Vec<FileGroup>> {
948 if target_partitions == 0 {
949 return Err(internal_datafusion_err!(
950 "target_partitions must be greater than 0"
951 ));
952 }
953
954 let flattened_files = file_groups
955 .iter()
956 .flat_map(FileGroup::iter)
957 .collect::<Vec<_>>();
958
959 if flattened_files.is_empty() {
960 return Ok(vec![]);
961 }
962
963 let statistics = MinMaxStatistics::new_from_files(
964 sort_order,
965 table_schema,
966 None,
967 flattened_files.iter().copied(),
968 )?;
969
970 let indices_sorted_by_min = statistics.min_values_sorted();
971
972 let mut file_groups_indices: Vec<Vec<usize>> = vec![vec![]; target_partitions];
974
975 for (idx, min) in indices_sorted_by_min {
976 if let Some((_, group)) = file_groups_indices
977 .iter_mut()
978 .enumerate()
979 .filter(|(_, group)| {
980 group.is_empty()
981 || min
982 > statistics
983 .max(*group.last().expect("groups should not be empty"))
984 })
985 .min_by_key(|(_, group)| group.len())
986 {
987 group.push(idx);
988 } else {
989 file_groups_indices.push(vec![idx]);
991 }
992 }
993
994 file_groups_indices.retain(|group| !group.is_empty());
996
997 Ok(file_groups_indices
999 .into_iter()
1000 .map(|file_group_indices| {
1001 FileGroup::new(
1002 file_group_indices
1003 .into_iter()
1004 .map(|idx| flattened_files[idx].clone())
1005 .collect(),
1006 )
1007 })
1008 .collect())
1009 }
1010
1011 pub fn split_groups_by_statistics(
1015 table_schema: &SchemaRef,
1016 file_groups: &[FileGroup],
1017 sort_order: &LexOrdering,
1018 ) -> Result<Vec<FileGroup>> {
1019 let flattened_files = file_groups
1020 .iter()
1021 .flat_map(FileGroup::iter)
1022 .collect::<Vec<_>>();
1023 if flattened_files.is_empty() {
1035 return Ok(vec![]);
1036 }
1037
1038 let statistics = MinMaxStatistics::new_from_files(
1039 sort_order,
1040 table_schema,
1041 None,
1042 flattened_files.iter().copied(),
1043 )
1044 .map_err(|e| {
1045 e.context("construct min/max statistics for split_groups_by_statistics")
1046 })?;
1047
1048 let indices_sorted_by_min = statistics.min_values_sorted();
1049 let mut file_groups_indices: Vec<Vec<usize>> = vec![];
1050
1051 for (idx, min) in indices_sorted_by_min {
1052 let file_group_to_insert = file_groups_indices.iter_mut().find(|group| {
1053 min > statistics.max(
1056 *group
1057 .last()
1058 .expect("groups should be nonempty at construction"),
1059 )
1060 });
1061 match file_group_to_insert {
1062 Some(group) => group.push(idx),
1063 None => file_groups_indices.push(vec![idx]),
1064 }
1065 }
1066
1067 Ok(file_groups_indices
1069 .into_iter()
1070 .map(|file_group_indices| {
1071 file_group_indices
1072 .into_iter()
1073 .map(|idx| flattened_files[idx].clone())
1074 .collect()
1075 })
1076 .collect())
1077 }
1078
1079 fn fmt_file_source(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
1081 write!(f, ", file_type={}", self.file_source.file_type())?;
1082 self.file_source.fmt_extra(t, f)
1083 }
1084
1085 pub fn file_source(&self) -> &Arc<dyn FileSource> {
1087 &self.file_source
1088 }
1089}
1090
1091impl Debug for FileScanConfig {
1092 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1093 write!(f, "FileScanConfig {{")?;
1094 write!(f, "object_store_url={:?}, ", self.object_store_url)?;
1095
1096 write!(
1097 f,
1098 "statistics={:?}, ",
1099 self.file_source.statistics().unwrap()
1100 )?;
1101
1102 DisplayAs::fmt_as(self, DisplayFormatType::Verbose, f)?;
1103 write!(f, "}}")
1104 }
1105}
1106
1107impl DisplayAs for FileScanConfig {
1108 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
1109 let schema = self.projected_schema();
1110 let orderings = get_projected_output_ordering(self, &schema);
1111
1112 write!(f, "file_groups=")?;
1113 FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
1114
1115 if !schema.fields().is_empty() {
1116 write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
1117 }
1118
1119 if let Some(limit) = self.limit {
1120 write!(f, ", limit={limit}")?;
1121 }
1122
1123 display_orderings(f, &orderings)?;
1124
1125 if !self.constraints.is_empty() {
1126 write!(f, ", {}", self.constraints)?;
1127 }
1128
1129 Ok(())
1130 }
1131}
1132
1133pub struct PartitionColumnProjector {
1140 key_buffer_cache: ZeroBufferGenerators,
1144 projected_partition_indexes: Vec<(usize, usize)>,
1148 projected_schema: SchemaRef,
1150}
1151
1152impl PartitionColumnProjector {
1153 pub fn new(projected_schema: SchemaRef, table_partition_cols: &[String]) -> Self {
1157 let mut idx_map = HashMap::new();
1158 for (partition_idx, partition_name) in table_partition_cols.iter().enumerate() {
1159 if let Ok(schema_idx) = projected_schema.index_of(partition_name) {
1160 idx_map.insert(partition_idx, schema_idx);
1161 }
1162 }
1163
1164 let mut projected_partition_indexes: Vec<_> = idx_map.into_iter().collect();
1165 projected_partition_indexes.sort_by(|(_, a), (_, b)| a.cmp(b));
1166
1167 Self {
1168 projected_partition_indexes,
1169 key_buffer_cache: Default::default(),
1170 projected_schema,
1171 }
1172 }
1173
1174 pub fn project(
1179 &mut self,
1180 file_batch: RecordBatch,
1181 partition_values: &[ScalarValue],
1182 ) -> Result<RecordBatch> {
1183 let expected_cols =
1184 self.projected_schema.fields().len() - self.projected_partition_indexes.len();
1185
1186 if file_batch.columns().len() != expected_cols {
1187 return exec_err!(
1188 "Unexpected batch schema from file, expected {} cols but got {}",
1189 expected_cols,
1190 file_batch.columns().len()
1191 );
1192 }
1193
1194 let mut cols = file_batch.columns().to_vec();
1195 for &(pidx, sidx) in &self.projected_partition_indexes {
1196 let p_value = partition_values.get(pidx).ok_or_else(|| {
1197 exec_datafusion_err!("Invalid partitioning found on disk")
1198 })?;
1199
1200 let mut partition_value = Cow::Borrowed(p_value);
1201
1202 let field = self.projected_schema.field(sidx);
1204 let expected_data_type = field.data_type();
1205 let actual_data_type = partition_value.data_type();
1206 if let DataType::Dictionary(key_type, _) = expected_data_type {
1207 if !matches!(actual_data_type, DataType::Dictionary(_, _)) {
1208 warn!("Partition value for column {} was not dictionary-encoded, applied auto-fix.", field.name());
1209 partition_value = Cow::Owned(ScalarValue::Dictionary(
1210 key_type.clone(),
1211 Box::new(partition_value.as_ref().clone()),
1212 ));
1213 }
1214 }
1215
1216 cols.insert(
1217 sidx,
1218 create_output_array(
1219 &mut self.key_buffer_cache,
1220 partition_value.as_ref(),
1221 file_batch.num_rows(),
1222 )?,
1223 )
1224 }
1225
1226 RecordBatch::try_new_with_options(
1227 Arc::clone(&self.projected_schema),
1228 cols,
1229 &RecordBatchOptions::new().with_row_count(Some(file_batch.num_rows())),
1230 )
1231 .map_err(Into::into)
1232 }
1233}
1234
1235#[derive(Debug, Default)]
1236struct ZeroBufferGenerators {
1237 gen_i8: ZeroBufferGenerator<i8>,
1238 gen_i16: ZeroBufferGenerator<i16>,
1239 gen_i32: ZeroBufferGenerator<i32>,
1240 gen_i64: ZeroBufferGenerator<i64>,
1241 gen_u8: ZeroBufferGenerator<u8>,
1242 gen_u16: ZeroBufferGenerator<u16>,
1243 gen_u32: ZeroBufferGenerator<u32>,
1244 gen_u64: ZeroBufferGenerator<u64>,
1245}
1246
1247#[derive(Debug, Default)]
1249struct ZeroBufferGenerator<T>
1250where
1251 T: ArrowNativeType,
1252{
1253 cache: Option<Buffer>,
1254 _t: PhantomData<T>,
1255}
1256
1257impl<T> ZeroBufferGenerator<T>
1258where
1259 T: ArrowNativeType,
1260{
1261 const SIZE: usize = size_of::<T>();
1262
1263 fn get_buffer(&mut self, n_vals: usize) -> Buffer {
1264 match &mut self.cache {
1265 Some(buf) if buf.len() >= n_vals * Self::SIZE => {
1266 buf.slice_with_length(0, n_vals * Self::SIZE)
1267 }
1268 _ => {
1269 let mut key_buffer_builder = BufferBuilder::<T>::new(n_vals);
1270 key_buffer_builder.advance(n_vals); self.cache.insert(key_buffer_builder.finish()).clone()
1272 }
1273 }
1274 }
1275}
1276
1277fn create_dict_array<T>(
1278 buffer_gen: &mut ZeroBufferGenerator<T>,
1279 dict_val: &ScalarValue,
1280 len: usize,
1281 data_type: DataType,
1282) -> Result<ArrayRef>
1283where
1284 T: ArrowNativeType,
1285{
1286 let dict_vals = dict_val.to_array()?;
1287
1288 let sliced_key_buffer = buffer_gen.get_buffer(len);
1289
1290 let mut builder = ArrayData::builder(data_type)
1292 .len(len)
1293 .add_buffer(sliced_key_buffer);
1294 builder = builder.add_child_data(dict_vals.to_data());
1295 Ok(Arc::new(DictionaryArray::<UInt16Type>::from(
1296 builder.build().unwrap(),
1297 )))
1298}
1299
1300fn create_output_array(
1301 key_buffer_cache: &mut ZeroBufferGenerators,
1302 val: &ScalarValue,
1303 len: usize,
1304) -> Result<ArrayRef> {
1305 if let ScalarValue::Dictionary(key_type, dict_val) = &val {
1306 match key_type.as_ref() {
1307 DataType::Int8 => {
1308 return create_dict_array(
1309 &mut key_buffer_cache.gen_i8,
1310 dict_val,
1311 len,
1312 val.data_type(),
1313 );
1314 }
1315 DataType::Int16 => {
1316 return create_dict_array(
1317 &mut key_buffer_cache.gen_i16,
1318 dict_val,
1319 len,
1320 val.data_type(),
1321 );
1322 }
1323 DataType::Int32 => {
1324 return create_dict_array(
1325 &mut key_buffer_cache.gen_i32,
1326 dict_val,
1327 len,
1328 val.data_type(),
1329 );
1330 }
1331 DataType::Int64 => {
1332 return create_dict_array(
1333 &mut key_buffer_cache.gen_i64,
1334 dict_val,
1335 len,
1336 val.data_type(),
1337 );
1338 }
1339 DataType::UInt8 => {
1340 return create_dict_array(
1341 &mut key_buffer_cache.gen_u8,
1342 dict_val,
1343 len,
1344 val.data_type(),
1345 );
1346 }
1347 DataType::UInt16 => {
1348 return create_dict_array(
1349 &mut key_buffer_cache.gen_u16,
1350 dict_val,
1351 len,
1352 val.data_type(),
1353 );
1354 }
1355 DataType::UInt32 => {
1356 return create_dict_array(
1357 &mut key_buffer_cache.gen_u32,
1358 dict_val,
1359 len,
1360 val.data_type(),
1361 );
1362 }
1363 DataType::UInt64 => {
1364 return create_dict_array(
1365 &mut key_buffer_cache.gen_u64,
1366 dict_val,
1367 len,
1368 val.data_type(),
1369 );
1370 }
1371 _ => {}
1372 }
1373 }
1374
1375 val.to_array_of_size(len)
1376}
1377
1378fn get_projected_output_ordering(
1438 base_config: &FileScanConfig,
1439 projected_schema: &SchemaRef,
1440) -> Vec<LexOrdering> {
1441 let projected_orderings =
1442 project_orderings(&base_config.output_ordering, projected_schema);
1443
1444 let mut all_orderings = vec![];
1445 for new_ordering in projected_orderings {
1446 if base_config.file_groups.iter().any(|group| {
1448 if group.len() <= 1 {
1449 return false;
1451 }
1452
1453 let indices = base_config
1454 .projection_exprs
1455 .as_ref()
1456 .map(|p| p.ordered_column_indices());
1457
1458 let statistics = match MinMaxStatistics::new_from_files(
1459 &new_ordering,
1460 projected_schema,
1461 indices.as_deref(),
1462 group.iter(),
1463 ) {
1464 Ok(statistics) => statistics,
1465 Err(e) => {
1466 log::trace!("Error fetching statistics for file group: {e}");
1467 return true;
1469 }
1470 };
1471
1472 !statistics.is_sorted()
1473 }) {
1474 debug!(
1475 "Skipping specified output ordering {:?}. \
1476 Some file groups couldn't be determined to be sorted: {:?}",
1477 base_config.output_ordering[0], base_config.file_groups
1478 );
1479 continue;
1480 }
1481
1482 all_orderings.push(new_ordering);
1483 }
1484 all_orderings
1485}
1486
1487pub fn wrap_partition_type_in_dict(val_type: DataType) -> DataType {
1498 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type))
1499}
1500
1501pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {
1505 ScalarValue::Dictionary(Box::new(DataType::UInt16), Box::new(val))
1506}
1507
1508#[cfg(test)]
1509mod tests {
1510 use super::*;
1511 use crate::test_util::col;
1512 use crate::{
1513 generate_test_files, test_util::MockSource, tests::aggr_test_schema,
1514 verify_sort_integrity,
1515 };
1516
1517 use arrow::array::{Int32Array, RecordBatch};
1518 use datafusion_common::stats::Precision;
1519 use datafusion_common::{assert_batches_eq, internal_err};
1520 use datafusion_expr::{Operator, SortExpr};
1521 use datafusion_physical_expr::create_physical_sort_expr;
1522 use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
1523 use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
1524
1525 pub fn columns(schema: &Schema) -> Vec<String> {
1527 schema.fields().iter().map(|f| f.name().clone()).collect()
1528 }
1529
1530 #[test]
1531 fn physical_plan_config_no_projection() {
1532 let file_schema = aggr_test_schema();
1533 let conf = config_for_projection(
1534 Arc::clone(&file_schema),
1535 None,
1536 Statistics::new_unknown(&file_schema),
1537 to_partition_cols(vec![(
1538 "date".to_owned(),
1539 wrap_partition_type_in_dict(DataType::Utf8),
1540 )]),
1541 );
1542
1543 let (proj_schema, _, proj_statistics, _) = conf.project();
1544 assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
1545 assert_eq!(
1546 proj_schema.field(file_schema.fields().len()).name(),
1547 "date",
1548 "partition columns are the last columns"
1549 );
1550 assert_eq!(
1551 proj_statistics.column_statistics.len(),
1552 file_schema.fields().len() + 1
1553 );
1554 let col_names = conf.projected_file_column_names();
1557 assert_eq!(col_names, None);
1558
1559 let col_indices = conf.file_column_projection_indices();
1560 assert_eq!(col_indices, None);
1561 }
1562
1563 #[test]
1564 fn physical_plan_config_no_projection_tab_cols_as_field() {
1565 let file_schema = aggr_test_schema();
1566
1567 let table_partition_col =
1569 Field::new("date", wrap_partition_type_in_dict(DataType::Utf8), true)
1570 .with_metadata(HashMap::from_iter(vec![(
1571 "key_whatever".to_owned(),
1572 "value_whatever".to_owned(),
1573 )]));
1574
1575 let conf = config_for_projection(
1576 Arc::clone(&file_schema),
1577 None,
1578 Statistics::new_unknown(&file_schema),
1579 vec![table_partition_col.clone()],
1580 );
1581
1582 let proj_schema = conf.projected_schema();
1584 assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
1585 assert_eq!(
1586 *proj_schema.field(file_schema.fields().len()),
1587 table_partition_col,
1588 "partition columns are the last columns and ust have all values defined in created field"
1589 );
1590 }
1591
1592 #[test]
1593 fn physical_plan_config_with_projection() {
1594 let file_schema = aggr_test_schema();
1595 let conf = config_for_projection(
1596 Arc::clone(&file_schema),
1597 Some(vec![file_schema.fields().len(), 0]),
1598 Statistics {
1599 num_rows: Precision::Inexact(10),
1600 column_statistics: (0..file_schema.fields().len())
1603 .map(|i| ColumnStatistics {
1604 distinct_count: Precision::Inexact(i),
1605 ..Default::default()
1606 })
1607 .collect(),
1608 total_byte_size: Precision::Absent,
1609 },
1610 to_partition_cols(vec![(
1611 "date".to_owned(),
1612 wrap_partition_type_in_dict(DataType::Utf8),
1613 )]),
1614 );
1615
1616 let (proj_schema, _, proj_statistics, _) = conf.project();
1617 assert_eq!(
1618 columns(&proj_schema),
1619 vec!["date".to_owned(), "c1".to_owned()]
1620 );
1621 let proj_stat_cols = proj_statistics.column_statistics;
1622 assert_eq!(proj_stat_cols.len(), 2);
1623 assert_eq!(proj_stat_cols[1].distinct_count, Precision::Inexact(0));
1626
1627 let col_names = conf.projected_file_column_names();
1628 assert_eq!(col_names, Some(vec!["c1".to_owned()]));
1629
1630 let col_indices = conf.file_column_projection_indices();
1631 assert_eq!(col_indices, Some(vec![0]));
1632 }
1633
1634 #[test]
1635 fn partition_column_projector() {
1636 let file_batch = build_table_i32(
1637 ("a", &vec![0, 1, 2]),
1638 ("b", &vec![-2, -1, 0]),
1639 ("c", &vec![10, 11, 12]),
1640 );
1641 let partition_cols = vec![
1642 (
1643 "year".to_owned(),
1644 wrap_partition_type_in_dict(DataType::Utf8),
1645 ),
1646 (
1647 "month".to_owned(),
1648 wrap_partition_type_in_dict(DataType::Utf8),
1649 ),
1650 (
1651 "day".to_owned(),
1652 wrap_partition_type_in_dict(DataType::Utf8),
1653 ),
1654 ];
1655 let statistics = Statistics {
1657 num_rows: Precision::Inexact(3),
1658 total_byte_size: Precision::Absent,
1659 column_statistics: Statistics::unknown_column(&file_batch.schema()),
1660 };
1661
1662 let conf = config_for_projection(
1663 file_batch.schema(),
1664 Some(vec![
1666 0,
1667 1,
1668 2,
1669 file_batch.schema().fields().len(),
1670 file_batch.schema().fields().len() + 2,
1671 ]),
1672 statistics.clone(),
1673 to_partition_cols(partition_cols.clone()),
1674 );
1675
1676 let source_statistics = conf.file_source.statistics().unwrap();
1677 let conf_stats = conf.partition_statistics(None).unwrap();
1678
1679 assert_eq!(conf_stats.num_rows, Precision::Inexact(3));
1681
1682 assert_eq!(conf_stats.column_statistics.len(), 5);
1684
1685 assert_eq!(source_statistics, statistics);
1687 assert_eq!(source_statistics.column_statistics.len(), 3);
1688
1689 let proj_schema = conf.projected_schema();
1690 let mut proj = PartitionColumnProjector::new(
1692 proj_schema,
1693 &partition_cols
1694 .iter()
1695 .map(|x| x.0.clone())
1696 .collect::<Vec<_>>(),
1697 );
1698
1699 let projected_batch = proj
1701 .project(
1702 file_batch,
1704 &[
1705 wrap_partition_value_in_dict(ScalarValue::from("2021")),
1706 wrap_partition_value_in_dict(ScalarValue::from("10")),
1707 wrap_partition_value_in_dict(ScalarValue::from("26")),
1708 ],
1709 )
1710 .expect("Projection of partition columns into record batch failed");
1711 let expected = [
1712 "+---+----+----+------+-----+",
1713 "| a | b | c | year | day |",
1714 "+---+----+----+------+-----+",
1715 "| 0 | -2 | 10 | 2021 | 26 |",
1716 "| 1 | -1 | 11 | 2021 | 26 |",
1717 "| 2 | 0 | 12 | 2021 | 26 |",
1718 "+---+----+----+------+-----+",
1719 ];
1720 assert_batches_eq!(expected, &[projected_batch]);
1721
1722 let file_batch = build_table_i32(
1724 ("a", &vec![5, 6, 7, 8, 9]),
1725 ("b", &vec![-10, -9, -8, -7, -6]),
1726 ("c", &vec![12, 13, 14, 15, 16]),
1727 );
1728 let projected_batch = proj
1729 .project(
1730 file_batch,
1732 &[
1733 wrap_partition_value_in_dict(ScalarValue::from("2021")),
1734 wrap_partition_value_in_dict(ScalarValue::from("10")),
1735 wrap_partition_value_in_dict(ScalarValue::from("27")),
1736 ],
1737 )
1738 .expect("Projection of partition columns into record batch failed");
1739 let expected = [
1740 "+---+-----+----+------+-----+",
1741 "| a | b | c | year | day |",
1742 "+---+-----+----+------+-----+",
1743 "| 5 | -10 | 12 | 2021 | 27 |",
1744 "| 6 | -9 | 13 | 2021 | 27 |",
1745 "| 7 | -8 | 14 | 2021 | 27 |",
1746 "| 8 | -7 | 15 | 2021 | 27 |",
1747 "| 9 | -6 | 16 | 2021 | 27 |",
1748 "+---+-----+----+------+-----+",
1749 ];
1750 assert_batches_eq!(expected, &[projected_batch]);
1751
1752 let file_batch = build_table_i32(
1754 ("a", &vec![0, 1, 3]),
1755 ("b", &vec![2, 3, 4]),
1756 ("c", &vec![4, 5, 6]),
1757 );
1758 let projected_batch = proj
1759 .project(
1760 file_batch,
1762 &[
1763 wrap_partition_value_in_dict(ScalarValue::from("2021")),
1764 wrap_partition_value_in_dict(ScalarValue::from("10")),
1765 wrap_partition_value_in_dict(ScalarValue::from("28")),
1766 ],
1767 )
1768 .expect("Projection of partition columns into record batch failed");
1769 let expected = [
1770 "+---+---+---+------+-----+",
1771 "| a | b | c | year | day |",
1772 "+---+---+---+------+-----+",
1773 "| 0 | 2 | 4 | 2021 | 28 |",
1774 "| 1 | 3 | 5 | 2021 | 28 |",
1775 "| 3 | 4 | 6 | 2021 | 28 |",
1776 "+---+---+---+------+-----+",
1777 ];
1778 assert_batches_eq!(expected, &[projected_batch]);
1779
1780 let file_batch = build_table_i32(
1782 ("a", &vec![0, 1, 2]),
1783 ("b", &vec![-2, -1, 0]),
1784 ("c", &vec![10, 11, 12]),
1785 );
1786 let projected_batch = proj
1787 .project(
1788 file_batch,
1790 &[
1791 ScalarValue::from("2021"),
1792 ScalarValue::from("10"),
1793 ScalarValue::from("26"),
1794 ],
1795 )
1796 .expect("Projection of partition columns into record batch failed");
1797 let expected = [
1798 "+---+----+----+------+-----+",
1799 "| a | b | c | year | day |",
1800 "+---+----+----+------+-----+",
1801 "| 0 | -2 | 10 | 2021 | 26 |",
1802 "| 1 | -1 | 11 | 2021 | 26 |",
1803 "| 2 | 0 | 12 | 2021 | 26 |",
1804 "+---+----+----+------+-----+",
1805 ];
1806 assert_batches_eq!(expected, &[projected_batch]);
1807 }
1808
1809 #[test]
1810 fn test_projected_file_schema_with_partition_col() {
1811 let schema = aggr_test_schema();
1812 let partition_cols = vec![
1813 (
1814 "part1".to_owned(),
1815 wrap_partition_type_in_dict(DataType::Utf8),
1816 ),
1817 (
1818 "part2".to_owned(),
1819 wrap_partition_type_in_dict(DataType::Utf8),
1820 ),
1821 ];
1822
1823 let projection = config_for_projection(
1825 schema.clone(),
1826 Some(vec![0, 3, 5, schema.fields().len()]),
1827 Statistics::new_unknown(&schema),
1828 to_partition_cols(partition_cols),
1829 )
1830 .projected_file_schema();
1831
1832 let expected_columns = vec!["c1", "c4", "c6"];
1834 let actual_columns = projection
1835 .fields()
1836 .iter()
1837 .map(|f| f.name().clone())
1838 .collect::<Vec<_>>();
1839 assert_eq!(expected_columns, actual_columns);
1840 }
1841
1842 #[test]
1843 fn test_projected_file_schema_without_projection() {
1844 let schema = aggr_test_schema();
1845 let partition_cols = vec![
1846 (
1847 "part1".to_owned(),
1848 wrap_partition_type_in_dict(DataType::Utf8),
1849 ),
1850 (
1851 "part2".to_owned(),
1852 wrap_partition_type_in_dict(DataType::Utf8),
1853 ),
1854 ];
1855
1856 let projection = config_for_projection(
1858 schema.clone(),
1859 None,
1860 Statistics::new_unknown(&schema),
1861 to_partition_cols(partition_cols),
1862 )
1863 .projected_file_schema();
1864
1865 assert_eq!(projection.fields(), schema.fields());
1867 }
1868
1869 #[test]
1870 fn test_split_groups_by_statistics() -> Result<()> {
1871 use chrono::TimeZone;
1872 use datafusion_common::DFSchema;
1873 use datafusion_expr::execution_props::ExecutionProps;
1874 use object_store::{path::Path, ObjectMeta};
1875
1876 struct File {
1877 name: &'static str,
1878 date: &'static str,
1879 statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
1880 }
1881 impl File {
1882 fn new(
1883 name: &'static str,
1884 date: &'static str,
1885 statistics: Vec<Option<(f64, f64)>>,
1886 ) -> Self {
1887 Self::new_nullable(
1888 name,
1889 date,
1890 statistics
1891 .into_iter()
1892 .map(|opt| opt.map(|(min, max)| (Some(min), Some(max))))
1893 .collect(),
1894 )
1895 }
1896
1897 fn new_nullable(
1898 name: &'static str,
1899 date: &'static str,
1900 statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
1901 ) -> Self {
1902 Self {
1903 name,
1904 date,
1905 statistics,
1906 }
1907 }
1908 }
1909
1910 struct TestCase {
1911 name: &'static str,
1912 file_schema: Schema,
1913 files: Vec<File>,
1914 sort: Vec<SortExpr>,
1915 expected_result: Result<Vec<Vec<&'static str>>, &'static str>,
1916 }
1917
1918 use datafusion_expr::col;
1919 let cases = vec![
1920 TestCase {
1921 name: "test sort",
1922 file_schema: Schema::new(vec![Field::new(
1923 "value".to_string(),
1924 DataType::Float64,
1925 false,
1926 )]),
1927 files: vec![
1928 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1929 File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1930 File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1931 ],
1932 sort: vec![col("value").sort(true, false)],
1933 expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1934 },
1935 TestCase {
1938 name: "test sort with files ordered differently",
1939 file_schema: Schema::new(vec![Field::new(
1940 "value".to_string(),
1941 DataType::Float64,
1942 false,
1943 )]),
1944 files: vec![
1945 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1946 File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1947 File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1948 ],
1949 sort: vec![col("value").sort(true, false)],
1950 expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1951 },
1952 TestCase {
1953 name: "reverse sort",
1954 file_schema: Schema::new(vec![Field::new(
1955 "value".to_string(),
1956 DataType::Float64,
1957 false,
1958 )]),
1959 files: vec![
1960 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1961 File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1962 File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1963 ],
1964 sort: vec![col("value").sort(false, true)],
1965 expected_result: Ok(vec![vec!["1", "0"], vec!["2"]]),
1966 },
1967 TestCase {
1968 name: "nullable sort columns, nulls last",
1969 file_schema: Schema::new(vec![Field::new(
1970 "value".to_string(),
1971 DataType::Float64,
1972 true,
1973 )]),
1974 files: vec![
1975 File::new_nullable("0", "2023-01-01", vec![Some((Some(0.00), Some(0.49)))]),
1976 File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), None))]),
1977 File::new_nullable("2", "2023-01-02", vec![Some((Some(0.00), None))]),
1978 ],
1979 sort: vec![col("value").sort(true, false)],
1980 expected_result: Ok(vec![vec!["0", "1"], vec!["2"]])
1981 },
1982 TestCase {
1983 name: "nullable sort columns, nulls first",
1984 file_schema: Schema::new(vec![Field::new(
1985 "value".to_string(),
1986 DataType::Float64,
1987 true,
1988 )]),
1989 files: vec![
1990 File::new_nullable("0", "2023-01-01", vec![Some((None, Some(0.49)))]),
1991 File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), Some(1.00)))]),
1992 File::new_nullable("2", "2023-01-02", vec![Some((None, Some(1.00)))]),
1993 ],
1994 sort: vec![col("value").sort(true, true)],
1995 expected_result: Ok(vec![vec!["0", "1"], vec!["2"]])
1996 },
1997 TestCase {
1998 name: "all three non-overlapping",
1999 file_schema: Schema::new(vec![Field::new(
2000 "value".to_string(),
2001 DataType::Float64,
2002 false,
2003 )]),
2004 files: vec![
2005 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
2006 File::new("1", "2023-01-01", vec![Some((0.50, 0.99))]),
2007 File::new("2", "2023-01-02", vec![Some((1.00, 1.49))]),
2008 ],
2009 sort: vec![col("value").sort(true, false)],
2010 expected_result: Ok(vec![vec!["0", "1", "2"]]),
2011 },
2012 TestCase {
2013 name: "all three overlapping",
2014 file_schema: Schema::new(vec![Field::new(
2015 "value".to_string(),
2016 DataType::Float64,
2017 false,
2018 )]),
2019 files: vec![
2020 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
2021 File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
2022 File::new("2", "2023-01-02", vec![Some((0.00, 0.49))]),
2023 ],
2024 sort: vec![col("value").sort(true, false)],
2025 expected_result: Ok(vec![vec!["0"], vec!["1"], vec!["2"]]),
2026 },
2027 TestCase {
2028 name: "empty input",
2029 file_schema: Schema::new(vec![Field::new(
2030 "value".to_string(),
2031 DataType::Float64,
2032 false,
2033 )]),
2034 files: vec![],
2035 sort: vec![col("value").sort(true, false)],
2036 expected_result: Ok(vec![]),
2037 },
2038 TestCase {
2039 name: "one file missing statistics",
2040 file_schema: Schema::new(vec![Field::new(
2041 "value".to_string(),
2042 DataType::Float64,
2043 false,
2044 )]),
2045 files: vec![
2046 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
2047 File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
2048 File::new("2", "2023-01-02", vec![None]),
2049 ],
2050 sort: vec![col("value").sort(true, false)],
2051 expected_result: Err("construct min/max statistics for split_groups_by_statistics\ncaused by\ncollect min/max values\ncaused by\nget min/max for column: 'value'\ncaused by\nError during planning: statistics not found"),
2052 },
2053 ];
2054
2055 for case in cases {
2056 let table_schema = Arc::new(Schema::new(
2057 case.file_schema
2058 .fields()
2059 .clone()
2060 .into_iter()
2061 .cloned()
2062 .chain(Some(Arc::new(Field::new(
2063 "date".to_string(),
2064 DataType::Utf8,
2065 false,
2066 ))))
2067 .collect::<Vec<_>>(),
2068 ));
2069 let Some(sort_order) = LexOrdering::new(
2070 case.sort
2071 .into_iter()
2072 .map(|expr| {
2073 create_physical_sort_expr(
2074 &expr,
2075 &DFSchema::try_from(Arc::clone(&table_schema))?,
2076 &ExecutionProps::default(),
2077 )
2078 })
2079 .collect::<Result<Vec<_>>>()?,
2080 ) else {
2081 return internal_err!("This test should always use an ordering");
2082 };
2083
2084 let partitioned_files = FileGroup::new(
2085 case.files.into_iter().map(From::from).collect::<Vec<_>>(),
2086 );
2087 let result = FileScanConfig::split_groups_by_statistics(
2088 &table_schema,
2089 std::slice::from_ref(&partitioned_files),
2090 &sort_order,
2091 );
2092 let results_by_name = result
2093 .as_ref()
2094 .map(|file_groups| {
2095 file_groups
2096 .iter()
2097 .map(|file_group| {
2098 file_group
2099 .iter()
2100 .map(|file| {
2101 partitioned_files
2102 .iter()
2103 .find_map(|f| {
2104 if f.object_meta == file.object_meta {
2105 Some(
2106 f.object_meta
2107 .location
2108 .as_ref()
2109 .rsplit('/')
2110 .next()
2111 .unwrap()
2112 .trim_end_matches(".parquet"),
2113 )
2114 } else {
2115 None
2116 }
2117 })
2118 .unwrap()
2119 })
2120 .collect::<Vec<_>>()
2121 })
2122 .collect::<Vec<_>>()
2123 })
2124 .map_err(|e| e.strip_backtrace().leak() as &'static str);
2125
2126 assert_eq!(results_by_name, case.expected_result, "{}", case.name);
2127 }
2128
2129 return Ok(());
2130
2131 impl From<File> for PartitionedFile {
2132 fn from(file: File) -> Self {
2133 PartitionedFile {
2134 object_meta: ObjectMeta {
2135 location: Path::from(format!(
2136 "data/date={}/{}.parquet",
2137 file.date, file.name
2138 )),
2139 last_modified: chrono::Utc.timestamp_nanos(0),
2140 size: 0,
2141 e_tag: None,
2142 version: None,
2143 },
2144 partition_values: vec![ScalarValue::from(file.date)],
2145 range: None,
2146 statistics: Some(Arc::new(Statistics {
2147 num_rows: Precision::Absent,
2148 total_byte_size: Precision::Absent,
2149 column_statistics: file
2150 .statistics
2151 .into_iter()
2152 .map(|stats| {
2153 stats
2154 .map(|(min, max)| ColumnStatistics {
2155 min_value: Precision::Exact(
2156 ScalarValue::Float64(min),
2157 ),
2158 max_value: Precision::Exact(
2159 ScalarValue::Float64(max),
2160 ),
2161 ..Default::default()
2162 })
2163 .unwrap_or_default()
2164 })
2165 .collect::<Vec<_>>(),
2166 })),
2167 extensions: None,
2168 metadata_size_hint: None,
2169 }
2170 }
2171 }
2172 }
2173
2174 fn config_for_projection(
2176 file_schema: SchemaRef,
2177 projection: Option<Vec<usize>>,
2178 statistics: Statistics,
2179 table_partition_cols: Vec<Field>,
2180 ) -> FileScanConfig {
2181 FileScanConfigBuilder::new(
2182 ObjectStoreUrl::parse("test:///").unwrap(),
2183 file_schema,
2184 Arc::new(MockSource::default()),
2185 )
2186 .with_projection_indices(projection)
2187 .with_statistics(statistics)
2188 .with_table_partition_cols(table_partition_cols)
2189 .build()
2190 }
2191
2192 fn to_partition_cols(table_partition_cols: Vec<(String, DataType)>) -> Vec<Field> {
2194 table_partition_cols
2195 .iter()
2196 .map(|(name, dtype)| Field::new(name, dtype.clone(), false))
2197 .collect::<Vec<_>>()
2198 }
2199
2200 pub fn build_table_i32(
2202 a: (&str, &Vec<i32>),
2203 b: (&str, &Vec<i32>),
2204 c: (&str, &Vec<i32>),
2205 ) -> RecordBatch {
2206 let schema = Schema::new(vec![
2207 Field::new(a.0, DataType::Int32, false),
2208 Field::new(b.0, DataType::Int32, false),
2209 Field::new(c.0, DataType::Int32, false),
2210 ]);
2211
2212 RecordBatch::try_new(
2213 Arc::new(schema),
2214 vec![
2215 Arc::new(Int32Array::from(a.1.clone())),
2216 Arc::new(Int32Array::from(b.1.clone())),
2217 Arc::new(Int32Array::from(c.1.clone())),
2218 ],
2219 )
2220 .unwrap()
2221 }
2222
2223 #[test]
2224 fn test_file_scan_config_builder() {
2225 let file_schema = aggr_test_schema();
2226 let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2227 let file_source: Arc<dyn FileSource> = Arc::new(MockSource::default());
2228
2229 let builder = FileScanConfigBuilder::new(
2231 object_store_url.clone(),
2232 Arc::clone(&file_schema),
2233 Arc::clone(&file_source),
2234 );
2235
2236 let config = builder
2238 .with_limit(Some(1000))
2239 .with_projection_indices(Some(vec![0, 1]))
2240 .with_table_partition_cols(vec![Field::new(
2241 "date",
2242 wrap_partition_type_in_dict(DataType::Utf8),
2243 false,
2244 )])
2245 .with_statistics(Statistics::new_unknown(&file_schema))
2246 .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new(
2247 "test.parquet".to_string(),
2248 1024,
2249 )])])
2250 .with_output_ordering(vec![[PhysicalSortExpr::new_default(Arc::new(
2251 Column::new("date", 0),
2252 ))]
2253 .into()])
2254 .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
2255 .with_newlines_in_values(true)
2256 .build();
2257
2258 assert_eq!(config.object_store_url, object_store_url);
2260 assert_eq!(*config.file_schema(), file_schema);
2261 assert_eq!(config.limit, Some(1000));
2262 assert_eq!(
2263 config.projection_exprs.as_ref().map(|p| p.column_indices()),
2264 Some(vec![0, 1])
2265 );
2266 assert_eq!(config.table_partition_cols().len(), 1);
2267 assert_eq!(config.table_partition_cols()[0].name(), "date");
2268 assert_eq!(config.file_groups.len(), 1);
2269 assert_eq!(config.file_groups[0].len(), 1);
2270 assert_eq!(
2271 config.file_groups[0][0].object_meta.location.as_ref(),
2272 "test.parquet"
2273 );
2274 assert_eq!(
2275 config.file_compression_type,
2276 FileCompressionType::UNCOMPRESSED
2277 );
2278 assert!(config.new_lines_in_values);
2279 assert_eq!(config.output_ordering.len(), 1);
2280 }
2281
2282 #[test]
2283 fn equivalence_properties_after_schema_change() {
2284 let file_schema = aggr_test_schema();
2285 let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2286 let file_source: Arc<dyn FileSource> =
2288 Arc::new(MockSource::default().with_filter(Arc::new(BinaryExpr::new(
2289 col("c2", &file_schema).unwrap(),
2290 Operator::Eq,
2291 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2292 ))));
2293
2294 let config = FileScanConfigBuilder::new(
2295 object_store_url.clone(),
2296 Arc::clone(&file_schema),
2297 Arc::clone(&file_source),
2298 )
2299 .with_projection_indices(Some(vec![0, 1, 2]))
2300 .build();
2301
2302 let data_source = config
2305 .try_swapping_with_projection(&[ProjectionExpr::new(
2306 col("c3", &file_schema).unwrap(),
2307 "c3".to_string(),
2308 )])
2309 .unwrap()
2310 .unwrap();
2311
2312 let eq_properties = data_source.eq_properties();
2315 let eq_group = eq_properties.eq_group();
2316
2317 for class in eq_group.iter() {
2318 for expr in class.iter() {
2319 if let Some(col) = expr.as_any().downcast_ref::<Column>() {
2320 assert_ne!(
2321 col.name(),
2322 "c2",
2323 "c2 should not be present in any equivalence class"
2324 );
2325 }
2326 }
2327 }
2328 }
2329
2330 #[test]
2331 fn test_file_scan_config_builder_defaults() {
2332 let file_schema = aggr_test_schema();
2333 let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2334 let file_source: Arc<dyn FileSource> = Arc::new(MockSource::default());
2335
2336 let config = FileScanConfigBuilder::new(
2338 object_store_url.clone(),
2339 Arc::clone(&file_schema),
2340 Arc::clone(&file_source),
2341 )
2342 .build();
2343
2344 assert_eq!(config.object_store_url, object_store_url);
2346 assert_eq!(*config.file_schema(), file_schema);
2347 assert_eq!(config.limit, None);
2348 assert_eq!(
2349 config.projection_exprs.as_ref().map(|p| p.column_indices()),
2350 None
2351 );
2352 assert!(config.table_partition_cols().is_empty());
2353 assert!(config.file_groups.is_empty());
2354 assert_eq!(
2355 config.file_compression_type,
2356 FileCompressionType::UNCOMPRESSED
2357 );
2358 assert!(!config.new_lines_in_values);
2359 assert!(config.output_ordering.is_empty());
2360 assert!(config.constraints.is_empty());
2361
2362 assert_eq!(
2364 config.file_source.statistics().unwrap().num_rows,
2365 Precision::Absent
2366 );
2367 assert_eq!(
2368 config.file_source.statistics().unwrap().total_byte_size,
2369 Precision::Absent
2370 );
2371 assert_eq!(
2372 config
2373 .file_source
2374 .statistics()
2375 .unwrap()
2376 .column_statistics
2377 .len(),
2378 file_schema.fields().len()
2379 );
2380 for stat in config.file_source.statistics().unwrap().column_statistics {
2381 assert_eq!(stat.distinct_count, Precision::Absent);
2382 assert_eq!(stat.min_value, Precision::Absent);
2383 assert_eq!(stat.max_value, Precision::Absent);
2384 assert_eq!(stat.null_count, Precision::Absent);
2385 }
2386 }
2387
2388 #[test]
2389 fn test_file_scan_config_builder_new_from() {
2390 let schema = aggr_test_schema();
2391 let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2392 let file_source: Arc<dyn FileSource> = Arc::new(MockSource::default());
2393 let partition_cols = vec![Field::new(
2394 "date",
2395 wrap_partition_type_in_dict(DataType::Utf8),
2396 false,
2397 )];
2398 let file = PartitionedFile::new("test_file.parquet", 100);
2399
2400 let original_config = FileScanConfigBuilder::new(
2402 object_store_url.clone(),
2403 Arc::clone(&schema),
2404 Arc::clone(&file_source),
2405 )
2406 .with_projection_indices(Some(vec![0, 2]))
2407 .with_limit(Some(10))
2408 .with_table_partition_cols(partition_cols.clone())
2409 .with_file(file.clone())
2410 .with_constraints(Constraints::default())
2411 .with_newlines_in_values(true)
2412 .build();
2413
2414 let new_builder = FileScanConfigBuilder::from(original_config);
2416
2417 let new_config = new_builder.build();
2419
2420 let partition_cols = partition_cols.into_iter().map(Arc::new).collect::<Vec<_>>();
2422 assert_eq!(new_config.object_store_url, object_store_url);
2423 assert_eq!(*new_config.file_schema(), schema);
2424 assert_eq!(
2425 new_config
2426 .projection_exprs
2427 .as_ref()
2428 .map(|p| p.column_indices()),
2429 Some(vec![0, 2])
2430 );
2431 assert_eq!(new_config.limit, Some(10));
2432 assert_eq!(*new_config.table_partition_cols(), partition_cols);
2433 assert_eq!(new_config.file_groups.len(), 1);
2434 assert_eq!(new_config.file_groups[0].len(), 1);
2435 assert_eq!(
2436 new_config.file_groups[0][0].object_meta.location.as_ref(),
2437 "test_file.parquet"
2438 );
2439 assert_eq!(new_config.constraints, Constraints::default());
2440 assert!(new_config.new_lines_in_values);
2441 }
2442
2443 #[test]
2444 fn test_split_groups_by_statistics_with_target_partitions() -> Result<()> {
2445 use datafusion_common::DFSchema;
2446 use datafusion_expr::{col, execution_props::ExecutionProps};
2447
2448 let schema = Arc::new(Schema::new(vec![Field::new(
2449 "value",
2450 DataType::Float64,
2451 false,
2452 )]));
2453
2454 let exec_props = ExecutionProps::new();
2456 let df_schema = DFSchema::try_from_qualified_schema("test", schema.as_ref())?;
2457 let sort_expr = [col("value").sort(true, false)];
2458 let sort_ordering = sort_expr
2459 .map(|expr| {
2460 create_physical_sort_expr(&expr, &df_schema, &exec_props).unwrap()
2461 })
2462 .into();
2463
2464 struct TestCase {
2466 name: String,
2467 file_count: usize,
2468 overlap_factor: f64,
2469 target_partitions: usize,
2470 expected_partition_count: usize,
2471 }
2472
2473 let test_cases = vec![
2474 TestCase {
2476 name: "no_overlap_10_files_4_partitions".to_string(),
2477 file_count: 10,
2478 overlap_factor: 0.0,
2479 target_partitions: 4,
2480 expected_partition_count: 4,
2481 },
2482 TestCase {
2483 name: "medium_overlap_20_files_5_partitions".to_string(),
2484 file_count: 20,
2485 overlap_factor: 0.5,
2486 target_partitions: 5,
2487 expected_partition_count: 5,
2488 },
2489 TestCase {
2490 name: "high_overlap_30_files_3_partitions".to_string(),
2491 file_count: 30,
2492 overlap_factor: 0.8,
2493 target_partitions: 3,
2494 expected_partition_count: 7,
2495 },
2496 TestCase {
2498 name: "fewer_files_than_partitions".to_string(),
2499 file_count: 3,
2500 overlap_factor: 0.0,
2501 target_partitions: 10,
2502 expected_partition_count: 3, },
2504 TestCase {
2505 name: "single_file".to_string(),
2506 file_count: 1,
2507 overlap_factor: 0.0,
2508 target_partitions: 5,
2509 expected_partition_count: 1, },
2511 TestCase {
2512 name: "empty_files".to_string(),
2513 file_count: 0,
2514 overlap_factor: 0.0,
2515 target_partitions: 3,
2516 expected_partition_count: 0, },
2518 ];
2519
2520 for case in test_cases {
2521 println!("Running test case: {}", case.name);
2522
2523 let file_groups = generate_test_files(case.file_count, case.overlap_factor);
2525
2526 let result =
2528 FileScanConfig::split_groups_by_statistics_with_target_partitions(
2529 &schema,
2530 &file_groups,
2531 &sort_ordering,
2532 case.target_partitions,
2533 )?;
2534
2535 println!(
2537 "Created {} partitions (target was {})",
2538 result.len(),
2539 case.target_partitions
2540 );
2541
2542 assert_eq!(
2544 result.len(),
2545 case.expected_partition_count,
2546 "Case '{}': Unexpected partition count",
2547 case.name
2548 );
2549
2550 assert!(
2552 verify_sort_integrity(&result),
2553 "Case '{}': Files within partitions are not properly ordered",
2554 case.name
2555 );
2556
2557 if case.file_count > 1 && case.expected_partition_count > 1 {
2559 let group_sizes: Vec<usize> = result.iter().map(FileGroup::len).collect();
2560 let max_size = *group_sizes.iter().max().unwrap();
2561 let min_size = *group_sizes.iter().min().unwrap();
2562
2563 let avg_files_per_partition =
2565 case.file_count as f64 / case.expected_partition_count as f64;
2566 assert!(
2567 (max_size as f64) < 2.0 * avg_files_per_partition,
2568 "Case '{}': Unbalanced distribution. Max partition size {} exceeds twice the average {}",
2569 case.name,
2570 max_size,
2571 avg_files_per_partition
2572 );
2573
2574 println!("Distribution - min files: {min_size}, max files: {max_size}");
2575 }
2576 }
2577
2578 let empty_groups: Vec<FileGroup> = vec![];
2580 let err = FileScanConfig::split_groups_by_statistics_with_target_partitions(
2581 &schema,
2582 &empty_groups,
2583 &sort_ordering,
2584 0,
2585 )
2586 .unwrap_err();
2587
2588 assert!(
2589 err.to_string()
2590 .contains("target_partitions must be greater than 0"),
2591 "Expected error for zero target partitions"
2592 );
2593
2594 Ok(())
2595 }
2596
2597 #[test]
2598 fn test_partition_statistics_projection() {
2599 use crate::source::DataSourceExec;
2605 use datafusion_physical_plan::ExecutionPlan;
2606
2607 let schema = Arc::new(Schema::new(vec![
2609 Field::new("col0", DataType::Int32, false),
2610 Field::new("col1", DataType::Int32, false),
2611 Field::new("col2", DataType::Int32, false),
2612 Field::new("col3", DataType::Int32, false),
2613 ]));
2614
2615 let file_group_stats = Statistics {
2617 num_rows: Precision::Exact(100),
2618 total_byte_size: Precision::Exact(1024),
2619 column_statistics: vec![
2620 ColumnStatistics {
2621 null_count: Precision::Exact(0),
2622 ..ColumnStatistics::new_unknown()
2623 },
2624 ColumnStatistics {
2625 null_count: Precision::Exact(5),
2626 ..ColumnStatistics::new_unknown()
2627 },
2628 ColumnStatistics {
2629 null_count: Precision::Exact(10),
2630 ..ColumnStatistics::new_unknown()
2631 },
2632 ColumnStatistics {
2633 null_count: Precision::Exact(15),
2634 ..ColumnStatistics::new_unknown()
2635 },
2636 ],
2637 };
2638
2639 let file_group = FileGroup::new(vec![PartitionedFile::new("test.parquet", 1024)])
2641 .with_statistics(Arc::new(file_group_stats));
2642
2643 let config = FileScanConfigBuilder::new(
2645 ObjectStoreUrl::parse("test:///").unwrap(),
2646 Arc::clone(&schema),
2647 Arc::new(MockSource::default()),
2648 )
2649 .with_projection_indices(Some(vec![0, 2])) .with_file_groups(vec![file_group])
2651 .build();
2652
2653 let exec = DataSourceExec::from_data_source(config);
2655
2656 let partition_stats = exec.partition_statistics(Some(0)).unwrap();
2658
2659 assert_eq!(
2661 partition_stats.column_statistics.len(),
2662 2,
2663 "Expected 2 column statistics (projected), but got {}",
2664 partition_stats.column_statistics.len()
2665 );
2666
2667 assert_eq!(
2669 partition_stats.column_statistics[0].null_count,
2670 Precision::Exact(0),
2671 "First projected column should be col0 with 0 nulls"
2672 );
2673 assert_eq!(
2674 partition_stats.column_statistics[1].null_count,
2675 Precision::Exact(10),
2676 "Second projected column should be col2 with 10 nulls"
2677 );
2678
2679 assert_eq!(partition_stats.num_rows, Precision::Exact(100));
2681 assert_eq!(partition_stats.total_byte_size, Precision::Exact(1024));
2682 }
2683}