1use std::{
22 any::Any, borrow::Cow, collections::HashMap, fmt::Debug, fmt::Formatter,
23 fmt::Result as FmtResult, marker::PhantomData, sync::Arc,
24};
25
26use crate::file_groups::FileGroup;
27#[allow(unused_imports)]
28use crate::schema_adapter::SchemaAdapterFactory;
29use crate::{
30 display::FileGroupsDisplay, file::FileSource,
31 file_compression_type::FileCompressionType, file_stream::FileStream,
32 source::DataSource, statistics::MinMaxStatistics, PartitionedFile,
33};
34use arrow::datatypes::FieldRef;
35use arrow::{
36 array::{
37 ArrayData, ArrayRef, BufferBuilder, DictionaryArray, RecordBatch,
38 RecordBatchOptions,
39 },
40 buffer::Buffer,
41 datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef, UInt16Type},
42};
43use datafusion_common::config::ConfigOptions;
44use datafusion_common::{
45 exec_err, ColumnStatistics, Constraints, DataFusionError, Result, ScalarValue,
46 Statistics,
47};
48use datafusion_execution::{
49 object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext,
50};
51use datafusion_physical_expr::{expressions::Column, utils::reassign_predicate_columns};
52use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
53use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
54use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
55use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
56use datafusion_physical_plan::projection::ProjectionExpr;
57use datafusion_physical_plan::{
58 display::{display_orderings, ProjectSchemaDisplay},
59 metrics::ExecutionPlanMetricsSet,
60 projection::{all_alias_free_columns, new_projections_for_columns},
61 DisplayAs, DisplayFormatType,
62};
63use datafusion_physical_plan::{
64 filter::collect_columns_from_predicate, filter_pushdown::FilterPushdownPropagation,
65};
66
67use datafusion_physical_plan::coop::cooperative;
68use datafusion_physical_plan::execution_plan::SchedulingType;
69use log::{debug, warn};
70
71#[derive(Clone)]
143pub struct FileScanConfig {
144 pub object_store_url: ObjectStoreUrl,
156 pub file_schema: SchemaRef,
165 pub file_groups: Vec<FileGroup>,
175 pub constraints: Constraints,
177 pub projection: Option<Vec<usize>>,
180 pub limit: Option<usize>,
183 pub table_partition_cols: Vec<FieldRef>,
185 pub output_ordering: Vec<LexOrdering>,
187 pub file_compression_type: FileCompressionType,
189 pub new_lines_in_values: bool,
191 pub file_source: Arc<dyn FileSource>,
193 pub batch_size: Option<usize>,
196 pub expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
199}
200
201#[derive(Clone)]
251pub struct FileScanConfigBuilder {
252 object_store_url: ObjectStoreUrl,
253 file_schema: SchemaRef,
266 file_source: Arc<dyn FileSource>,
267 limit: Option<usize>,
268 projection: Option<Vec<usize>>,
269 table_partition_cols: Vec<FieldRef>,
270 constraints: Option<Constraints>,
271 file_groups: Vec<FileGroup>,
272 statistics: Option<Statistics>,
273 output_ordering: Vec<LexOrdering>,
274 file_compression_type: Option<FileCompressionType>,
275 new_lines_in_values: Option<bool>,
276 batch_size: Option<usize>,
277 expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
278}
279
280impl FileScanConfigBuilder {
281 pub fn new(
288 object_store_url: ObjectStoreUrl,
289 file_schema: SchemaRef,
290 file_source: Arc<dyn FileSource>,
291 ) -> Self {
292 Self {
293 object_store_url,
294 file_schema,
295 file_source,
296 file_groups: vec![],
297 statistics: None,
298 output_ordering: vec![],
299 file_compression_type: None,
300 new_lines_in_values: None,
301 limit: None,
302 projection: None,
303 table_partition_cols: vec![],
304 constraints: None,
305 batch_size: None,
306 expr_adapter_factory: None,
307 }
308 }
309
310 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
313 self.limit = limit;
314 self
315 }
316
317 pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
322 self.file_source = file_source;
323 self
324 }
325
326 pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
329 self.projection = projection;
330 self
331 }
332
333 pub fn with_table_partition_cols(mut self, table_partition_cols: Vec<Field>) -> Self {
335 self.table_partition_cols = table_partition_cols
336 .into_iter()
337 .map(|f| Arc::new(f) as FieldRef)
338 .collect();
339 self
340 }
341
342 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
344 self.constraints = Some(constraints);
345 self
346 }
347
348 pub fn with_statistics(mut self, statistics: Statistics) -> Self {
351 self.statistics = Some(statistics);
352 self
353 }
354
355 pub fn with_file_groups(mut self, file_groups: Vec<FileGroup>) -> Self {
365 self.file_groups = file_groups;
366 self
367 }
368
369 pub fn with_file_group(mut self, file_group: FileGroup) -> Self {
373 self.file_groups.push(file_group);
374 self
375 }
376
377 pub fn with_file(self, file: PartitionedFile) -> Self {
381 self.with_file_group(FileGroup::new(vec![file]))
382 }
383
384 pub fn with_output_ordering(mut self, output_ordering: Vec<LexOrdering>) -> Self {
386 self.output_ordering = output_ordering;
387 self
388 }
389
390 pub fn with_file_compression_type(
392 mut self,
393 file_compression_type: FileCompressionType,
394 ) -> Self {
395 self.file_compression_type = Some(file_compression_type);
396 self
397 }
398
399 pub fn with_newlines_in_values(mut self, new_lines_in_values: bool) -> Self {
405 self.new_lines_in_values = Some(new_lines_in_values);
406 self
407 }
408
409 pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
411 self.batch_size = batch_size;
412 self
413 }
414
415 pub fn with_expr_adapter(
422 mut self,
423 expr_adapter: Option<Arc<dyn PhysicalExprAdapterFactory>>,
424 ) -> Self {
425 self.expr_adapter_factory = expr_adapter;
426 self
427 }
428
429 pub fn build(self) -> FileScanConfig {
434 let Self {
435 object_store_url,
436 file_schema,
437 file_source,
438 limit,
439 projection,
440 table_partition_cols,
441 constraints,
442 file_groups,
443 statistics,
444 output_ordering,
445 file_compression_type,
446 new_lines_in_values,
447 batch_size,
448 expr_adapter_factory: expr_adapter,
449 } = self;
450
451 let constraints = constraints.unwrap_or_default();
452 let statistics =
453 statistics.unwrap_or_else(|| Statistics::new_unknown(&file_schema));
454
455 let file_source = file_source
456 .with_statistics(statistics.clone())
457 .with_schema(Arc::clone(&file_schema));
458 let file_compression_type =
459 file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED);
460 let new_lines_in_values = new_lines_in_values.unwrap_or(false);
461
462 FileScanConfig {
463 object_store_url,
464 file_schema,
465 file_source,
466 limit,
467 projection,
468 table_partition_cols,
469 constraints,
470 file_groups,
471 output_ordering,
472 file_compression_type,
473 new_lines_in_values,
474 batch_size,
475 expr_adapter_factory: expr_adapter,
476 }
477 }
478}
479
480impl From<FileScanConfig> for FileScanConfigBuilder {
481 fn from(config: FileScanConfig) -> Self {
482 Self {
483 object_store_url: config.object_store_url,
484 file_schema: config.file_schema,
485 file_source: Arc::<dyn FileSource>::clone(&config.file_source),
486 file_groups: config.file_groups,
487 statistics: config.file_source.statistics().ok(),
488 output_ordering: config.output_ordering,
489 file_compression_type: Some(config.file_compression_type),
490 new_lines_in_values: Some(config.new_lines_in_values),
491 limit: config.limit,
492 projection: config.projection,
493 table_partition_cols: config.table_partition_cols,
494 constraints: Some(config.constraints),
495 batch_size: config.batch_size,
496 expr_adapter_factory: config.expr_adapter_factory,
497 }
498 }
499}
500
501impl DataSource for FileScanConfig {
502 fn open(
503 &self,
504 partition: usize,
505 context: Arc<TaskContext>,
506 ) -> Result<SendableRecordBatchStream> {
507 let object_store = context.runtime_env().object_store(&self.object_store_url)?;
508 let batch_size = self
509 .batch_size
510 .unwrap_or_else(|| context.session_config().batch_size());
511
512 let source = self
513 .file_source
514 .with_batch_size(batch_size)
515 .with_projection(self);
516
517 let opener = source.create_file_opener(object_store, self, partition);
518
519 let stream = FileStream::new(self, partition, opener, source.metrics())?;
520 Ok(Box::pin(cooperative(stream)))
521 }
522
523 fn as_any(&self) -> &dyn Any {
524 self
525 }
526
527 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
528 match t {
529 DisplayFormatType::Default | DisplayFormatType::Verbose => {
530 let schema = self.projected_schema();
531 let orderings = get_projected_output_ordering(self, &schema);
532
533 write!(f, "file_groups=")?;
534 FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
535
536 if !schema.fields().is_empty() {
537 write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
538 }
539
540 if let Some(limit) = self.limit {
541 write!(f, ", limit={limit}")?;
542 }
543
544 display_orderings(f, &orderings)?;
545
546 if !self.constraints.is_empty() {
547 write!(f, ", {}", self.constraints)?;
548 }
549
550 self.fmt_file_source(t, f)
551 }
552 DisplayFormatType::TreeRender => {
553 writeln!(f, "format={}", self.file_source.file_type())?;
554 self.file_source.fmt_extra(t, f)?;
555 let num_files = self.file_groups.iter().map(|fg| fg.len()).sum::<usize>();
556 writeln!(f, "files={num_files}")?;
557 Ok(())
558 }
559 }
560 }
561
562 fn repartitioned(
564 &self,
565 target_partitions: usize,
566 repartition_file_min_size: usize,
567 output_ordering: Option<LexOrdering>,
568 ) -> Result<Option<Arc<dyn DataSource>>> {
569 let source = self.file_source.repartitioned(
570 target_partitions,
571 repartition_file_min_size,
572 output_ordering,
573 self,
574 )?;
575
576 Ok(source.map(|s| Arc::new(s) as _))
577 }
578
579 fn output_partitioning(&self) -> Partitioning {
580 Partitioning::UnknownPartitioning(self.file_groups.len())
581 }
582
583 fn eq_properties(&self) -> EquivalenceProperties {
584 let (schema, constraints, _, orderings) = self.project();
585 let mut eq_properties =
586 EquivalenceProperties::new_with_orderings(Arc::clone(&schema), orderings)
587 .with_constraints(constraints);
588 if let Some(filter) = self.file_source.filter() {
589 match reassign_predicate_columns(filter, &schema, true) {
592 Ok(filter) => {
593 match Self::add_filter_equivalence_info(
594 filter,
595 &mut eq_properties,
596 &schema,
597 ) {
598 Ok(()) => {}
599 Err(e) => {
600 warn!("Failed to add filter equivalence info: {e}");
601 #[cfg(debug_assertions)]
602 panic!("Failed to add filter equivalence info: {e}");
603 }
604 }
605 }
606 Err(e) => {
607 warn!("Failed to reassign predicate columns: {e}");
608 #[cfg(debug_assertions)]
609 panic!("Failed to reassign predicate columns: {e}");
610 }
611 };
612 }
613 eq_properties
614 }
615
616 fn scheduling_type(&self) -> SchedulingType {
617 SchedulingType::Cooperative
618 }
619
620 fn statistics(&self) -> Result<Statistics> {
621 Ok(self.projected_stats())
622 }
623
624 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
625 let source = FileScanConfigBuilder::from(self.clone())
626 .with_limit(limit)
627 .build();
628 Some(Arc::new(source))
629 }
630
631 fn fetch(&self) -> Option<usize> {
632 self.limit
633 }
634
635 fn metrics(&self) -> ExecutionPlanMetricsSet {
636 self.file_source.metrics().clone()
637 }
638
639 fn try_swapping_with_projection(
640 &self,
641 projection: &[ProjectionExpr],
642 ) -> Result<Option<Arc<dyn DataSource>>> {
643 let partitioned_columns_in_proj = projection.iter().any(|proj_expr| {
647 proj_expr
648 .expr
649 .as_any()
650 .downcast_ref::<Column>()
651 .map(|expr| expr.index() >= self.file_schema.fields().len())
652 .unwrap_or(false)
653 });
654
655 let no_aliases = all_alias_free_columns(projection);
657
658 Ok((no_aliases && !partitioned_columns_in_proj).then(|| {
659 let file_scan = self.clone();
660 let source = Arc::clone(&file_scan.file_source);
661 let new_projections = new_projections_for_columns(
662 projection,
663 &file_scan
664 .projection
665 .clone()
666 .unwrap_or_else(|| (0..self.file_schema.fields().len()).collect()),
667 );
668
669 Arc::new(
670 FileScanConfigBuilder::from(file_scan)
671 .with_projection(Some(new_projections))
673 .with_source(source)
674 .build(),
675 ) as _
676 }))
677 }
678
679 fn try_pushdown_filters(
680 &self,
681 filters: Vec<Arc<dyn PhysicalExpr>>,
682 config: &ConfigOptions,
683 ) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
684 let result = self.file_source.try_pushdown_filters(filters, config)?;
685 match result.updated_node {
686 Some(new_file_source) => {
687 let file_scan_config = FileScanConfigBuilder::from(self.clone())
688 .with_source(new_file_source)
689 .build();
690 Ok(FilterPushdownPropagation {
691 filters: result.filters,
692 updated_node: Some(Arc::new(file_scan_config) as _),
693 })
694 }
695 None => {
696 Ok(FilterPushdownPropagation {
698 filters: result.filters,
699 updated_node: None,
700 })
701 }
702 }
703 }
704}
705
706impl FileScanConfig {
707 fn projection_indices(&self) -> Vec<usize> {
708 match &self.projection {
709 Some(proj) => proj.clone(),
710 None => (0..self.file_schema.fields().len()
711 + self.table_partition_cols.len())
712 .collect(),
713 }
714 }
715
716 pub fn projected_stats(&self) -> Statistics {
717 let statistics = self.file_source.statistics().unwrap();
718
719 let table_cols_stats = self
720 .projection_indices()
721 .into_iter()
722 .map(|idx| {
723 if idx < self.file_schema.fields().len() {
724 statistics.column_statistics[idx].clone()
725 } else {
726 ColumnStatistics::new_unknown()
728 }
729 })
730 .collect();
731
732 Statistics {
733 num_rows: statistics.num_rows,
734 total_byte_size: statistics.total_byte_size,
736 column_statistics: table_cols_stats,
737 }
738 }
739
740 pub fn projected_schema(&self) -> Arc<Schema> {
741 let table_fields: Vec<_> = self
742 .projection_indices()
743 .into_iter()
744 .map(|idx| {
745 if idx < self.file_schema.fields().len() {
746 self.file_schema.field(idx).clone()
747 } else {
748 let partition_idx = idx - self.file_schema.fields().len();
749 Arc::unwrap_or_clone(Arc::clone(
750 &self.table_partition_cols[partition_idx],
751 ))
752 }
753 })
754 .collect();
755
756 Arc::new(Schema::new_with_metadata(
757 table_fields,
758 self.file_schema.metadata().clone(),
759 ))
760 }
761
762 fn add_filter_equivalence_info(
763 filter: Arc<dyn PhysicalExpr>,
764 eq_properties: &mut EquivalenceProperties,
765 schema: &Schema,
766 ) -> Result<()> {
767 macro_rules! ignore_dangling_col {
768 ($col:expr) => {
769 if let Some(col) = $col.as_any().downcast_ref::<Column>() {
770 if schema.index_of(col.name()).is_err() {
771 continue;
772 }
773 }
774 };
775 }
776
777 let (equal_pairs, _) = collect_columns_from_predicate(&filter);
778 for (lhs, rhs) in equal_pairs {
779 ignore_dangling_col!(lhs);
782 ignore_dangling_col!(rhs);
783 eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
784 }
785 Ok(())
786 }
787
788 pub fn projected_constraints(&self) -> Constraints {
789 let indexes = self.projection_indices();
790 self.constraints.project(&indexes).unwrap_or_default()
791 }
792
793 pub fn newlines_in_values(&self) -> bool {
801 self.new_lines_in_values
802 }
803
804 pub fn project(&self) -> (SchemaRef, Constraints, Statistics, Vec<LexOrdering>) {
806 if self.projection.is_none() && self.table_partition_cols.is_empty() {
807 return (
808 Arc::clone(&self.file_schema),
809 self.constraints.clone(),
810 self.file_source.statistics().unwrap().clone(),
811 self.output_ordering.clone(),
812 );
813 }
814
815 let schema = self.projected_schema();
816 let constraints = self.projected_constraints();
817 let stats = self.projected_stats();
818
819 let output_ordering = get_projected_output_ordering(self, &schema);
820
821 (schema, constraints, stats, output_ordering)
822 }
823
824 pub fn projected_file_column_names(&self) -> Option<Vec<String>> {
825 self.projection.as_ref().map(|p| {
826 p.iter()
827 .filter(|col_idx| **col_idx < self.file_schema.fields().len())
828 .map(|col_idx| self.file_schema.field(*col_idx).name())
829 .cloned()
830 .collect()
831 })
832 }
833
834 pub fn projected_file_schema(&self) -> SchemaRef {
836 let fields = self.file_column_projection_indices().map(|indices| {
837 indices
838 .iter()
839 .map(|col_idx| self.file_schema.field(*col_idx))
840 .cloned()
841 .collect::<Vec<_>>()
842 });
843
844 fields.map_or_else(
845 || Arc::clone(&self.file_schema),
846 |f| {
847 Arc::new(Schema::new_with_metadata(
848 f,
849 self.file_schema.metadata.clone(),
850 ))
851 },
852 )
853 }
854
855 pub fn file_column_projection_indices(&self) -> Option<Vec<usize>> {
856 self.projection.as_ref().map(|p| {
857 p.iter()
858 .filter(|col_idx| **col_idx < self.file_schema.fields().len())
859 .copied()
860 .collect()
861 })
862 }
863
864 pub fn split_groups_by_statistics_with_target_partitions(
886 table_schema: &SchemaRef,
887 file_groups: &[FileGroup],
888 sort_order: &LexOrdering,
889 target_partitions: usize,
890 ) -> Result<Vec<FileGroup>> {
891 if target_partitions == 0 {
892 return Err(DataFusionError::Internal(
893 "target_partitions must be greater than 0".to_string(),
894 ));
895 }
896
897 let flattened_files = file_groups
898 .iter()
899 .flat_map(FileGroup::iter)
900 .collect::<Vec<_>>();
901
902 if flattened_files.is_empty() {
903 return Ok(vec![]);
904 }
905
906 let statistics = MinMaxStatistics::new_from_files(
907 sort_order,
908 table_schema,
909 None,
910 flattened_files.iter().copied(),
911 )?;
912
913 let indices_sorted_by_min = statistics.min_values_sorted();
914
915 let mut file_groups_indices: Vec<Vec<usize>> = vec![vec![]; target_partitions];
917
918 for (idx, min) in indices_sorted_by_min {
919 if let Some((_, group)) = file_groups_indices
920 .iter_mut()
921 .enumerate()
922 .filter(|(_, group)| {
923 group.is_empty()
924 || min
925 > statistics
926 .max(*group.last().expect("groups should not be empty"))
927 })
928 .min_by_key(|(_, group)| group.len())
929 {
930 group.push(idx);
931 } else {
932 file_groups_indices.push(vec![idx]);
934 }
935 }
936
937 file_groups_indices.retain(|group| !group.is_empty());
939
940 Ok(file_groups_indices
942 .into_iter()
943 .map(|file_group_indices| {
944 FileGroup::new(
945 file_group_indices
946 .into_iter()
947 .map(|idx| flattened_files[idx].clone())
948 .collect(),
949 )
950 })
951 .collect())
952 }
953
954 pub fn split_groups_by_statistics(
958 table_schema: &SchemaRef,
959 file_groups: &[FileGroup],
960 sort_order: &LexOrdering,
961 ) -> Result<Vec<FileGroup>> {
962 let flattened_files = file_groups
963 .iter()
964 .flat_map(FileGroup::iter)
965 .collect::<Vec<_>>();
966 if flattened_files.is_empty() {
978 return Ok(vec![]);
979 }
980
981 let statistics = MinMaxStatistics::new_from_files(
982 sort_order,
983 table_schema,
984 None,
985 flattened_files.iter().copied(),
986 )
987 .map_err(|e| {
988 e.context("construct min/max statistics for split_groups_by_statistics")
989 })?;
990
991 let indices_sorted_by_min = statistics.min_values_sorted();
992 let mut file_groups_indices: Vec<Vec<usize>> = vec![];
993
994 for (idx, min) in indices_sorted_by_min {
995 let file_group_to_insert = file_groups_indices.iter_mut().find(|group| {
996 min > statistics.max(
999 *group
1000 .last()
1001 .expect("groups should be nonempty at construction"),
1002 )
1003 });
1004 match file_group_to_insert {
1005 Some(group) => group.push(idx),
1006 None => file_groups_indices.push(vec![idx]),
1007 }
1008 }
1009
1010 Ok(file_groups_indices
1012 .into_iter()
1013 .map(|file_group_indices| {
1014 file_group_indices
1015 .into_iter()
1016 .map(|idx| flattened_files[idx].clone())
1017 .collect()
1018 })
1019 .collect())
1020 }
1021
1022 fn fmt_file_source(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
1024 write!(f, ", file_type={}", self.file_source.file_type())?;
1025 self.file_source.fmt_extra(t, f)
1026 }
1027
1028 pub fn file_source(&self) -> &Arc<dyn FileSource> {
1030 &self.file_source
1031 }
1032}
1033
1034impl Debug for FileScanConfig {
1035 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1036 write!(f, "FileScanConfig {{")?;
1037 write!(f, "object_store_url={:?}, ", self.object_store_url)?;
1038
1039 write!(
1040 f,
1041 "statistics={:?}, ",
1042 self.file_source.statistics().unwrap()
1043 )?;
1044
1045 DisplayAs::fmt_as(self, DisplayFormatType::Verbose, f)?;
1046 write!(f, "}}")
1047 }
1048}
1049
1050impl DisplayAs for FileScanConfig {
1051 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
1052 let schema = self.projected_schema();
1053 let orderings = get_projected_output_ordering(self, &schema);
1054
1055 write!(f, "file_groups=")?;
1056 FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
1057
1058 if !schema.fields().is_empty() {
1059 write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
1060 }
1061
1062 if let Some(limit) = self.limit {
1063 write!(f, ", limit={limit}")?;
1064 }
1065
1066 display_orderings(f, &orderings)?;
1067
1068 if !self.constraints.is_empty() {
1069 write!(f, ", {}", self.constraints)?;
1070 }
1071
1072 Ok(())
1073 }
1074}
1075
1076pub struct PartitionColumnProjector {
1083 key_buffer_cache: ZeroBufferGenerators,
1087 projected_partition_indexes: Vec<(usize, usize)>,
1091 projected_schema: SchemaRef,
1093}
1094
1095impl PartitionColumnProjector {
1096 pub fn new(projected_schema: SchemaRef, table_partition_cols: &[String]) -> Self {
1100 let mut idx_map = HashMap::new();
1101 for (partition_idx, partition_name) in table_partition_cols.iter().enumerate() {
1102 if let Ok(schema_idx) = projected_schema.index_of(partition_name) {
1103 idx_map.insert(partition_idx, schema_idx);
1104 }
1105 }
1106
1107 let mut projected_partition_indexes: Vec<_> = idx_map.into_iter().collect();
1108 projected_partition_indexes.sort_by(|(_, a), (_, b)| a.cmp(b));
1109
1110 Self {
1111 projected_partition_indexes,
1112 key_buffer_cache: Default::default(),
1113 projected_schema,
1114 }
1115 }
1116
1117 pub fn project(
1122 &mut self,
1123 file_batch: RecordBatch,
1124 partition_values: &[ScalarValue],
1125 ) -> Result<RecordBatch> {
1126 let expected_cols =
1127 self.projected_schema.fields().len() - self.projected_partition_indexes.len();
1128
1129 if file_batch.columns().len() != expected_cols {
1130 return exec_err!(
1131 "Unexpected batch schema from file, expected {} cols but got {}",
1132 expected_cols,
1133 file_batch.columns().len()
1134 );
1135 }
1136
1137 let mut cols = file_batch.columns().to_vec();
1138 for &(pidx, sidx) in &self.projected_partition_indexes {
1139 let p_value =
1140 partition_values
1141 .get(pidx)
1142 .ok_or(DataFusionError::Execution(
1143 "Invalid partitioning found on disk".to_string(),
1144 ))?;
1145
1146 let mut partition_value = Cow::Borrowed(p_value);
1147
1148 let field = self.projected_schema.field(sidx);
1150 let expected_data_type = field.data_type();
1151 let actual_data_type = partition_value.data_type();
1152 if let DataType::Dictionary(key_type, _) = expected_data_type {
1153 if !matches!(actual_data_type, DataType::Dictionary(_, _)) {
1154 warn!("Partition value for column {} was not dictionary-encoded, applied auto-fix.", field.name());
1155 partition_value = Cow::Owned(ScalarValue::Dictionary(
1156 key_type.clone(),
1157 Box::new(partition_value.as_ref().clone()),
1158 ));
1159 }
1160 }
1161
1162 cols.insert(
1163 sidx,
1164 create_output_array(
1165 &mut self.key_buffer_cache,
1166 partition_value.as_ref(),
1167 file_batch.num_rows(),
1168 )?,
1169 )
1170 }
1171
1172 RecordBatch::try_new_with_options(
1173 Arc::clone(&self.projected_schema),
1174 cols,
1175 &RecordBatchOptions::new().with_row_count(Some(file_batch.num_rows())),
1176 )
1177 .map_err(Into::into)
1178 }
1179}
1180
1181#[derive(Debug, Default)]
1182struct ZeroBufferGenerators {
1183 gen_i8: ZeroBufferGenerator<i8>,
1184 gen_i16: ZeroBufferGenerator<i16>,
1185 gen_i32: ZeroBufferGenerator<i32>,
1186 gen_i64: ZeroBufferGenerator<i64>,
1187 gen_u8: ZeroBufferGenerator<u8>,
1188 gen_u16: ZeroBufferGenerator<u16>,
1189 gen_u32: ZeroBufferGenerator<u32>,
1190 gen_u64: ZeroBufferGenerator<u64>,
1191}
1192
1193#[derive(Debug, Default)]
1195struct ZeroBufferGenerator<T>
1196where
1197 T: ArrowNativeType,
1198{
1199 cache: Option<Buffer>,
1200 _t: PhantomData<T>,
1201}
1202
1203impl<T> ZeroBufferGenerator<T>
1204where
1205 T: ArrowNativeType,
1206{
1207 const SIZE: usize = size_of::<T>();
1208
1209 fn get_buffer(&mut self, n_vals: usize) -> Buffer {
1210 match &mut self.cache {
1211 Some(buf) if buf.len() >= n_vals * Self::SIZE => {
1212 buf.slice_with_length(0, n_vals * Self::SIZE)
1213 }
1214 _ => {
1215 let mut key_buffer_builder = BufferBuilder::<T>::new(n_vals);
1216 key_buffer_builder.advance(n_vals); self.cache.insert(key_buffer_builder.finish()).clone()
1218 }
1219 }
1220 }
1221}
1222
1223fn create_dict_array<T>(
1224 buffer_gen: &mut ZeroBufferGenerator<T>,
1225 dict_val: &ScalarValue,
1226 len: usize,
1227 data_type: DataType,
1228) -> Result<ArrayRef>
1229where
1230 T: ArrowNativeType,
1231{
1232 let dict_vals = dict_val.to_array()?;
1233
1234 let sliced_key_buffer = buffer_gen.get_buffer(len);
1235
1236 let mut builder = ArrayData::builder(data_type)
1238 .len(len)
1239 .add_buffer(sliced_key_buffer);
1240 builder = builder.add_child_data(dict_vals.to_data());
1241 Ok(Arc::new(DictionaryArray::<UInt16Type>::from(
1242 builder.build().unwrap(),
1243 )))
1244}
1245
1246fn create_output_array(
1247 key_buffer_cache: &mut ZeroBufferGenerators,
1248 val: &ScalarValue,
1249 len: usize,
1250) -> Result<ArrayRef> {
1251 if let ScalarValue::Dictionary(key_type, dict_val) = &val {
1252 match key_type.as_ref() {
1253 DataType::Int8 => {
1254 return create_dict_array(
1255 &mut key_buffer_cache.gen_i8,
1256 dict_val,
1257 len,
1258 val.data_type(),
1259 );
1260 }
1261 DataType::Int16 => {
1262 return create_dict_array(
1263 &mut key_buffer_cache.gen_i16,
1264 dict_val,
1265 len,
1266 val.data_type(),
1267 );
1268 }
1269 DataType::Int32 => {
1270 return create_dict_array(
1271 &mut key_buffer_cache.gen_i32,
1272 dict_val,
1273 len,
1274 val.data_type(),
1275 );
1276 }
1277 DataType::Int64 => {
1278 return create_dict_array(
1279 &mut key_buffer_cache.gen_i64,
1280 dict_val,
1281 len,
1282 val.data_type(),
1283 );
1284 }
1285 DataType::UInt8 => {
1286 return create_dict_array(
1287 &mut key_buffer_cache.gen_u8,
1288 dict_val,
1289 len,
1290 val.data_type(),
1291 );
1292 }
1293 DataType::UInt16 => {
1294 return create_dict_array(
1295 &mut key_buffer_cache.gen_u16,
1296 dict_val,
1297 len,
1298 val.data_type(),
1299 );
1300 }
1301 DataType::UInt32 => {
1302 return create_dict_array(
1303 &mut key_buffer_cache.gen_u32,
1304 dict_val,
1305 len,
1306 val.data_type(),
1307 );
1308 }
1309 DataType::UInt64 => {
1310 return create_dict_array(
1311 &mut key_buffer_cache.gen_u64,
1312 dict_val,
1313 len,
1314 val.data_type(),
1315 );
1316 }
1317 _ => {}
1318 }
1319 }
1320
1321 val.to_array_of_size(len)
1322}
1323
1324fn get_projected_output_ordering(
1384 base_config: &FileScanConfig,
1385 projected_schema: &SchemaRef,
1386) -> Vec<LexOrdering> {
1387 let mut all_orderings = vec![];
1388 for output_ordering in &base_config.output_ordering {
1389 let mut new_ordering = vec![];
1390 for PhysicalSortExpr { expr, options } in output_ordering.iter() {
1391 if let Some(col) = expr.as_any().downcast_ref::<Column>() {
1392 let name = col.name();
1393 if let Some((idx, _)) = projected_schema.column_with_name(name) {
1394 new_ordering.push(PhysicalSortExpr::new(
1396 Arc::new(Column::new(name, idx)),
1397 *options,
1398 ));
1399 continue;
1400 }
1401 }
1402 break;
1405 }
1406
1407 let Some(new_ordering) = LexOrdering::new(new_ordering) else {
1408 continue;
1409 };
1410
1411 if base_config.file_groups.iter().any(|group| {
1413 if group.len() <= 1 {
1414 return false;
1416 }
1417
1418 let statistics = match MinMaxStatistics::new_from_files(
1419 &new_ordering,
1420 projected_schema,
1421 base_config.projection.as_deref(),
1422 group.iter(),
1423 ) {
1424 Ok(statistics) => statistics,
1425 Err(e) => {
1426 log::trace!("Error fetching statistics for file group: {e}");
1427 return true;
1429 }
1430 };
1431
1432 !statistics.is_sorted()
1433 }) {
1434 debug!(
1435 "Skipping specified output ordering {:?}. \
1436 Some file groups couldn't be determined to be sorted: {:?}",
1437 base_config.output_ordering[0], base_config.file_groups
1438 );
1439 continue;
1440 }
1441
1442 all_orderings.push(new_ordering);
1443 }
1444 all_orderings
1445}
1446
1447pub fn wrap_partition_type_in_dict(val_type: DataType) -> DataType {
1458 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type))
1459}
1460
1461pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {
1465 ScalarValue::Dictionary(Box::new(DataType::UInt16), Box::new(val))
1466}
1467
1468#[cfg(test)]
1469mod tests {
1470 use super::*;
1471 use crate::test_util::col;
1472 use crate::{
1473 generate_test_files, test_util::MockSource, tests::aggr_test_schema,
1474 verify_sort_integrity,
1475 };
1476
1477 use arrow::array::{Int32Array, RecordBatch};
1478 use datafusion_common::stats::Precision;
1479 use datafusion_common::{assert_batches_eq, internal_err};
1480 use datafusion_expr::{Operator, SortExpr};
1481 use datafusion_physical_expr::create_physical_sort_expr;
1482 use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
1483
1484 pub fn columns(schema: &Schema) -> Vec<String> {
1486 schema.fields().iter().map(|f| f.name().clone()).collect()
1487 }
1488
1489 #[test]
1490 fn physical_plan_config_no_projection() {
1491 let file_schema = aggr_test_schema();
1492 let conf = config_for_projection(
1493 Arc::clone(&file_schema),
1494 None,
1495 Statistics::new_unknown(&file_schema),
1496 to_partition_cols(vec![(
1497 "date".to_owned(),
1498 wrap_partition_type_in_dict(DataType::Utf8),
1499 )]),
1500 );
1501
1502 let (proj_schema, _, proj_statistics, _) = conf.project();
1503 assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
1504 assert_eq!(
1505 proj_schema.field(file_schema.fields().len()).name(),
1506 "date",
1507 "partition columns are the last columns"
1508 );
1509 assert_eq!(
1510 proj_statistics.column_statistics.len(),
1511 file_schema.fields().len() + 1
1512 );
1513 let col_names = conf.projected_file_column_names();
1516 assert_eq!(col_names, None);
1517
1518 let col_indices = conf.file_column_projection_indices();
1519 assert_eq!(col_indices, None);
1520 }
1521
1522 #[test]
1523 fn physical_plan_config_no_projection_tab_cols_as_field() {
1524 let file_schema = aggr_test_schema();
1525
1526 let table_partition_col =
1528 Field::new("date", wrap_partition_type_in_dict(DataType::Utf8), true)
1529 .with_metadata(HashMap::from_iter(vec![(
1530 "key_whatever".to_owned(),
1531 "value_whatever".to_owned(),
1532 )]));
1533
1534 let conf = config_for_projection(
1535 Arc::clone(&file_schema),
1536 None,
1537 Statistics::new_unknown(&file_schema),
1538 vec![table_partition_col.clone()],
1539 );
1540
1541 let proj_schema = conf.projected_schema();
1543 assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
1544 assert_eq!(
1545 *proj_schema.field(file_schema.fields().len()),
1546 table_partition_col,
1547 "partition columns are the last columns and ust have all values defined in created field"
1548 );
1549 }
1550
1551 #[test]
1552 fn physical_plan_config_with_projection() {
1553 let file_schema = aggr_test_schema();
1554 let conf = config_for_projection(
1555 Arc::clone(&file_schema),
1556 Some(vec![file_schema.fields().len(), 0]),
1557 Statistics {
1558 num_rows: Precision::Inexact(10),
1559 column_statistics: (0..file_schema.fields().len())
1562 .map(|i| ColumnStatistics {
1563 distinct_count: Precision::Inexact(i),
1564 ..Default::default()
1565 })
1566 .collect(),
1567 total_byte_size: Precision::Absent,
1568 },
1569 to_partition_cols(vec![(
1570 "date".to_owned(),
1571 wrap_partition_type_in_dict(DataType::Utf8),
1572 )]),
1573 );
1574
1575 let (proj_schema, _, proj_statistics, _) = conf.project();
1576 assert_eq!(
1577 columns(&proj_schema),
1578 vec!["date".to_owned(), "c1".to_owned()]
1579 );
1580 let proj_stat_cols = proj_statistics.column_statistics;
1581 assert_eq!(proj_stat_cols.len(), 2);
1582 assert_eq!(proj_stat_cols[1].distinct_count, Precision::Inexact(0));
1585
1586 let col_names = conf.projected_file_column_names();
1587 assert_eq!(col_names, Some(vec!["c1".to_owned()]));
1588
1589 let col_indices = conf.file_column_projection_indices();
1590 assert_eq!(col_indices, Some(vec![0]));
1591 }
1592
1593 #[test]
1594 fn partition_column_projector() {
1595 let file_batch = build_table_i32(
1596 ("a", &vec![0, 1, 2]),
1597 ("b", &vec![-2, -1, 0]),
1598 ("c", &vec![10, 11, 12]),
1599 );
1600 let partition_cols = vec![
1601 (
1602 "year".to_owned(),
1603 wrap_partition_type_in_dict(DataType::Utf8),
1604 ),
1605 (
1606 "month".to_owned(),
1607 wrap_partition_type_in_dict(DataType::Utf8),
1608 ),
1609 (
1610 "day".to_owned(),
1611 wrap_partition_type_in_dict(DataType::Utf8),
1612 ),
1613 ];
1614 let statistics = Statistics {
1616 num_rows: Precision::Inexact(3),
1617 total_byte_size: Precision::Absent,
1618 column_statistics: Statistics::unknown_column(&file_batch.schema()),
1619 };
1620
1621 let conf = config_for_projection(
1622 file_batch.schema(),
1623 Some(vec![
1625 0,
1626 1,
1627 2,
1628 file_batch.schema().fields().len(),
1629 file_batch.schema().fields().len() + 2,
1630 ]),
1631 statistics.clone(),
1632 to_partition_cols(partition_cols.clone()),
1633 );
1634
1635 let source_statistics = conf.file_source.statistics().unwrap();
1636 let conf_stats = conf.statistics().unwrap();
1637
1638 assert_eq!(conf_stats.num_rows, Precision::Inexact(3));
1640
1641 assert_eq!(conf_stats.column_statistics.len(), 5);
1643
1644 assert_eq!(source_statistics, statistics);
1646 assert_eq!(source_statistics.column_statistics.len(), 3);
1647
1648 let proj_schema = conf.projected_schema();
1649 let mut proj = PartitionColumnProjector::new(
1651 proj_schema,
1652 &partition_cols
1653 .iter()
1654 .map(|x| x.0.clone())
1655 .collect::<Vec<_>>(),
1656 );
1657
1658 let projected_batch = proj
1660 .project(
1661 file_batch,
1663 &[
1664 wrap_partition_value_in_dict(ScalarValue::from("2021")),
1665 wrap_partition_value_in_dict(ScalarValue::from("10")),
1666 wrap_partition_value_in_dict(ScalarValue::from("26")),
1667 ],
1668 )
1669 .expect("Projection of partition columns into record batch failed");
1670 let expected = [
1671 "+---+----+----+------+-----+",
1672 "| a | b | c | year | day |",
1673 "+---+----+----+------+-----+",
1674 "| 0 | -2 | 10 | 2021 | 26 |",
1675 "| 1 | -1 | 11 | 2021 | 26 |",
1676 "| 2 | 0 | 12 | 2021 | 26 |",
1677 "+---+----+----+------+-----+",
1678 ];
1679 assert_batches_eq!(expected, &[projected_batch]);
1680
1681 let file_batch = build_table_i32(
1683 ("a", &vec![5, 6, 7, 8, 9]),
1684 ("b", &vec![-10, -9, -8, -7, -6]),
1685 ("c", &vec![12, 13, 14, 15, 16]),
1686 );
1687 let projected_batch = proj
1688 .project(
1689 file_batch,
1691 &[
1692 wrap_partition_value_in_dict(ScalarValue::from("2021")),
1693 wrap_partition_value_in_dict(ScalarValue::from("10")),
1694 wrap_partition_value_in_dict(ScalarValue::from("27")),
1695 ],
1696 )
1697 .expect("Projection of partition columns into record batch failed");
1698 let expected = [
1699 "+---+-----+----+------+-----+",
1700 "| a | b | c | year | day |",
1701 "+---+-----+----+------+-----+",
1702 "| 5 | -10 | 12 | 2021 | 27 |",
1703 "| 6 | -9 | 13 | 2021 | 27 |",
1704 "| 7 | -8 | 14 | 2021 | 27 |",
1705 "| 8 | -7 | 15 | 2021 | 27 |",
1706 "| 9 | -6 | 16 | 2021 | 27 |",
1707 "+---+-----+----+------+-----+",
1708 ];
1709 assert_batches_eq!(expected, &[projected_batch]);
1710
1711 let file_batch = build_table_i32(
1713 ("a", &vec![0, 1, 3]),
1714 ("b", &vec![2, 3, 4]),
1715 ("c", &vec![4, 5, 6]),
1716 );
1717 let projected_batch = proj
1718 .project(
1719 file_batch,
1721 &[
1722 wrap_partition_value_in_dict(ScalarValue::from("2021")),
1723 wrap_partition_value_in_dict(ScalarValue::from("10")),
1724 wrap_partition_value_in_dict(ScalarValue::from("28")),
1725 ],
1726 )
1727 .expect("Projection of partition columns into record batch failed");
1728 let expected = [
1729 "+---+---+---+------+-----+",
1730 "| a | b | c | year | day |",
1731 "+---+---+---+------+-----+",
1732 "| 0 | 2 | 4 | 2021 | 28 |",
1733 "| 1 | 3 | 5 | 2021 | 28 |",
1734 "| 3 | 4 | 6 | 2021 | 28 |",
1735 "+---+---+---+------+-----+",
1736 ];
1737 assert_batches_eq!(expected, &[projected_batch]);
1738
1739 let file_batch = build_table_i32(
1741 ("a", &vec![0, 1, 2]),
1742 ("b", &vec![-2, -1, 0]),
1743 ("c", &vec![10, 11, 12]),
1744 );
1745 let projected_batch = proj
1746 .project(
1747 file_batch,
1749 &[
1750 ScalarValue::from("2021"),
1751 ScalarValue::from("10"),
1752 ScalarValue::from("26"),
1753 ],
1754 )
1755 .expect("Projection of partition columns into record batch failed");
1756 let expected = [
1757 "+---+----+----+------+-----+",
1758 "| a | b | c | year | day |",
1759 "+---+----+----+------+-----+",
1760 "| 0 | -2 | 10 | 2021 | 26 |",
1761 "| 1 | -1 | 11 | 2021 | 26 |",
1762 "| 2 | 0 | 12 | 2021 | 26 |",
1763 "+---+----+----+------+-----+",
1764 ];
1765 assert_batches_eq!(expected, &[projected_batch]);
1766 }
1767
1768 #[test]
1769 fn test_projected_file_schema_with_partition_col() {
1770 let schema = aggr_test_schema();
1771 let partition_cols = vec![
1772 (
1773 "part1".to_owned(),
1774 wrap_partition_type_in_dict(DataType::Utf8),
1775 ),
1776 (
1777 "part2".to_owned(),
1778 wrap_partition_type_in_dict(DataType::Utf8),
1779 ),
1780 ];
1781
1782 let projection = config_for_projection(
1784 schema.clone(),
1785 Some(vec![0, 3, 5, schema.fields().len()]),
1786 Statistics::new_unknown(&schema),
1787 to_partition_cols(partition_cols),
1788 )
1789 .projected_file_schema();
1790
1791 let expected_columns = vec!["c1", "c4", "c6"];
1793 let actual_columns = projection
1794 .fields()
1795 .iter()
1796 .map(|f| f.name().clone())
1797 .collect::<Vec<_>>();
1798 assert_eq!(expected_columns, actual_columns);
1799 }
1800
1801 #[test]
1802 fn test_projected_file_schema_without_projection() {
1803 let schema = aggr_test_schema();
1804 let partition_cols = vec![
1805 (
1806 "part1".to_owned(),
1807 wrap_partition_type_in_dict(DataType::Utf8),
1808 ),
1809 (
1810 "part2".to_owned(),
1811 wrap_partition_type_in_dict(DataType::Utf8),
1812 ),
1813 ];
1814
1815 let projection = config_for_projection(
1817 schema.clone(),
1818 None,
1819 Statistics::new_unknown(&schema),
1820 to_partition_cols(partition_cols),
1821 )
1822 .projected_file_schema();
1823
1824 assert_eq!(projection.fields(), schema.fields());
1826 }
1827
1828 #[test]
1829 fn test_split_groups_by_statistics() -> Result<()> {
1830 use chrono::TimeZone;
1831 use datafusion_common::DFSchema;
1832 use datafusion_expr::execution_props::ExecutionProps;
1833 use object_store::{path::Path, ObjectMeta};
1834
1835 struct File {
1836 name: &'static str,
1837 date: &'static str,
1838 statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
1839 }
1840 impl File {
1841 fn new(
1842 name: &'static str,
1843 date: &'static str,
1844 statistics: Vec<Option<(f64, f64)>>,
1845 ) -> Self {
1846 Self::new_nullable(
1847 name,
1848 date,
1849 statistics
1850 .into_iter()
1851 .map(|opt| opt.map(|(min, max)| (Some(min), Some(max))))
1852 .collect(),
1853 )
1854 }
1855
1856 fn new_nullable(
1857 name: &'static str,
1858 date: &'static str,
1859 statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
1860 ) -> Self {
1861 Self {
1862 name,
1863 date,
1864 statistics,
1865 }
1866 }
1867 }
1868
1869 struct TestCase {
1870 name: &'static str,
1871 file_schema: Schema,
1872 files: Vec<File>,
1873 sort: Vec<SortExpr>,
1874 expected_result: Result<Vec<Vec<&'static str>>, &'static str>,
1875 }
1876
1877 use datafusion_expr::col;
1878 let cases = vec![
1879 TestCase {
1880 name: "test sort",
1881 file_schema: Schema::new(vec![Field::new(
1882 "value".to_string(),
1883 DataType::Float64,
1884 false,
1885 )]),
1886 files: vec![
1887 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1888 File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1889 File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1890 ],
1891 sort: vec![col("value").sort(true, false)],
1892 expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1893 },
1894 TestCase {
1897 name: "test sort with files ordered differently",
1898 file_schema: Schema::new(vec![Field::new(
1899 "value".to_string(),
1900 DataType::Float64,
1901 false,
1902 )]),
1903 files: vec![
1904 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1905 File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1906 File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1907 ],
1908 sort: vec![col("value").sort(true, false)],
1909 expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1910 },
1911 TestCase {
1912 name: "reverse sort",
1913 file_schema: Schema::new(vec![Field::new(
1914 "value".to_string(),
1915 DataType::Float64,
1916 false,
1917 )]),
1918 files: vec![
1919 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1920 File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1921 File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1922 ],
1923 sort: vec![col("value").sort(false, true)],
1924 expected_result: Ok(vec![vec!["1", "0"], vec!["2"]]),
1925 },
1926 TestCase {
1927 name: "nullable sort columns, nulls last",
1928 file_schema: Schema::new(vec![Field::new(
1929 "value".to_string(),
1930 DataType::Float64,
1931 true,
1932 )]),
1933 files: vec![
1934 File::new_nullable("0", "2023-01-01", vec![Some((Some(0.00), Some(0.49)))]),
1935 File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), None))]),
1936 File::new_nullable("2", "2023-01-02", vec![Some((Some(0.00), None))]),
1937 ],
1938 sort: vec![col("value").sort(true, false)],
1939 expected_result: Ok(vec![vec!["0", "1"], vec!["2"]])
1940 },
1941 TestCase {
1942 name: "nullable sort columns, nulls first",
1943 file_schema: Schema::new(vec![Field::new(
1944 "value".to_string(),
1945 DataType::Float64,
1946 true,
1947 )]),
1948 files: vec![
1949 File::new_nullable("0", "2023-01-01", vec![Some((None, Some(0.49)))]),
1950 File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), Some(1.00)))]),
1951 File::new_nullable("2", "2023-01-02", vec![Some((None, Some(1.00)))]),
1952 ],
1953 sort: vec![col("value").sort(true, true)],
1954 expected_result: Ok(vec![vec!["0", "1"], vec!["2"]])
1955 },
1956 TestCase {
1957 name: "all three non-overlapping",
1958 file_schema: Schema::new(vec![Field::new(
1959 "value".to_string(),
1960 DataType::Float64,
1961 false,
1962 )]),
1963 files: vec![
1964 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1965 File::new("1", "2023-01-01", vec![Some((0.50, 0.99))]),
1966 File::new("2", "2023-01-02", vec![Some((1.00, 1.49))]),
1967 ],
1968 sort: vec![col("value").sort(true, false)],
1969 expected_result: Ok(vec![vec!["0", "1", "2"]]),
1970 },
1971 TestCase {
1972 name: "all three overlapping",
1973 file_schema: Schema::new(vec![Field::new(
1974 "value".to_string(),
1975 DataType::Float64,
1976 false,
1977 )]),
1978 files: vec![
1979 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1980 File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
1981 File::new("2", "2023-01-02", vec![Some((0.00, 0.49))]),
1982 ],
1983 sort: vec![col("value").sort(true, false)],
1984 expected_result: Ok(vec![vec!["0"], vec!["1"], vec!["2"]]),
1985 },
1986 TestCase {
1987 name: "empty input",
1988 file_schema: Schema::new(vec![Field::new(
1989 "value".to_string(),
1990 DataType::Float64,
1991 false,
1992 )]),
1993 files: vec![],
1994 sort: vec![col("value").sort(true, false)],
1995 expected_result: Ok(vec![]),
1996 },
1997 TestCase {
1998 name: "one file missing statistics",
1999 file_schema: Schema::new(vec![Field::new(
2000 "value".to_string(),
2001 DataType::Float64,
2002 false,
2003 )]),
2004 files: vec![
2005 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
2006 File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
2007 File::new("2", "2023-01-02", vec![None]),
2008 ],
2009 sort: vec![col("value").sort(true, false)],
2010 expected_result: Err("construct min/max statistics for split_groups_by_statistics\ncaused by\ncollect min/max values\ncaused by\nget min/max for column: 'value'\ncaused by\nError during planning: statistics not found"),
2011 },
2012 ];
2013
2014 for case in cases {
2015 let table_schema = Arc::new(Schema::new(
2016 case.file_schema
2017 .fields()
2018 .clone()
2019 .into_iter()
2020 .cloned()
2021 .chain(Some(Arc::new(Field::new(
2022 "date".to_string(),
2023 DataType::Utf8,
2024 false,
2025 ))))
2026 .collect::<Vec<_>>(),
2027 ));
2028 let Some(sort_order) = LexOrdering::new(
2029 case.sort
2030 .into_iter()
2031 .map(|expr| {
2032 create_physical_sort_expr(
2033 &expr,
2034 &DFSchema::try_from(Arc::clone(&table_schema))?,
2035 &ExecutionProps::default(),
2036 )
2037 })
2038 .collect::<Result<Vec<_>>>()?,
2039 ) else {
2040 return internal_err!("This test should always use an ordering");
2041 };
2042
2043 let partitioned_files = FileGroup::new(
2044 case.files.into_iter().map(From::from).collect::<Vec<_>>(),
2045 );
2046 let result = FileScanConfig::split_groups_by_statistics(
2047 &table_schema,
2048 std::slice::from_ref(&partitioned_files),
2049 &sort_order,
2050 );
2051 let results_by_name = result
2052 .as_ref()
2053 .map(|file_groups| {
2054 file_groups
2055 .iter()
2056 .map(|file_group| {
2057 file_group
2058 .iter()
2059 .map(|file| {
2060 partitioned_files
2061 .iter()
2062 .find_map(|f| {
2063 if f.object_meta == file.object_meta {
2064 Some(
2065 f.object_meta
2066 .location
2067 .as_ref()
2068 .rsplit('/')
2069 .next()
2070 .unwrap()
2071 .trim_end_matches(".parquet"),
2072 )
2073 } else {
2074 None
2075 }
2076 })
2077 .unwrap()
2078 })
2079 .collect::<Vec<_>>()
2080 })
2081 .collect::<Vec<_>>()
2082 })
2083 .map_err(|e| e.strip_backtrace().leak() as &'static str);
2084
2085 assert_eq!(results_by_name, case.expected_result, "{}", case.name);
2086 }
2087
2088 return Ok(());
2089
2090 impl From<File> for PartitionedFile {
2091 fn from(file: File) -> Self {
2092 PartitionedFile {
2093 object_meta: ObjectMeta {
2094 location: Path::from(format!(
2095 "data/date={}/{}.parquet",
2096 file.date, file.name
2097 )),
2098 last_modified: chrono::Utc.timestamp_nanos(0),
2099 size: 0,
2100 e_tag: None,
2101 version: None,
2102 },
2103 partition_values: vec![ScalarValue::from(file.date)],
2104 range: None,
2105 statistics: Some(Arc::new(Statistics {
2106 num_rows: Precision::Absent,
2107 total_byte_size: Precision::Absent,
2108 column_statistics: file
2109 .statistics
2110 .into_iter()
2111 .map(|stats| {
2112 stats
2113 .map(|(min, max)| ColumnStatistics {
2114 min_value: Precision::Exact(
2115 ScalarValue::Float64(min),
2116 ),
2117 max_value: Precision::Exact(
2118 ScalarValue::Float64(max),
2119 ),
2120 ..Default::default()
2121 })
2122 .unwrap_or_default()
2123 })
2124 .collect::<Vec<_>>(),
2125 })),
2126 extensions: None,
2127 metadata_size_hint: None,
2128 }
2129 }
2130 }
2131 }
2132
2133 fn config_for_projection(
2135 file_schema: SchemaRef,
2136 projection: Option<Vec<usize>>,
2137 statistics: Statistics,
2138 table_partition_cols: Vec<Field>,
2139 ) -> FileScanConfig {
2140 FileScanConfigBuilder::new(
2141 ObjectStoreUrl::parse("test:///").unwrap(),
2142 file_schema,
2143 Arc::new(MockSource::default()),
2144 )
2145 .with_projection(projection)
2146 .with_statistics(statistics)
2147 .with_table_partition_cols(table_partition_cols)
2148 .build()
2149 }
2150
2151 fn to_partition_cols(table_partition_cols: Vec<(String, DataType)>) -> Vec<Field> {
2153 table_partition_cols
2154 .iter()
2155 .map(|(name, dtype)| Field::new(name, dtype.clone(), false))
2156 .collect::<Vec<_>>()
2157 }
2158
2159 pub fn build_table_i32(
2161 a: (&str, &Vec<i32>),
2162 b: (&str, &Vec<i32>),
2163 c: (&str, &Vec<i32>),
2164 ) -> RecordBatch {
2165 let schema = Schema::new(vec![
2166 Field::new(a.0, DataType::Int32, false),
2167 Field::new(b.0, DataType::Int32, false),
2168 Field::new(c.0, DataType::Int32, false),
2169 ]);
2170
2171 RecordBatch::try_new(
2172 Arc::new(schema),
2173 vec![
2174 Arc::new(Int32Array::from(a.1.clone())),
2175 Arc::new(Int32Array::from(b.1.clone())),
2176 Arc::new(Int32Array::from(c.1.clone())),
2177 ],
2178 )
2179 .unwrap()
2180 }
2181
2182 #[test]
2183 fn test_file_scan_config_builder() {
2184 let file_schema = aggr_test_schema();
2185 let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2186 let file_source: Arc<dyn FileSource> = Arc::new(MockSource::default());
2187
2188 let builder = FileScanConfigBuilder::new(
2190 object_store_url.clone(),
2191 Arc::clone(&file_schema),
2192 Arc::clone(&file_source),
2193 );
2194
2195 let config = builder
2197 .with_limit(Some(1000))
2198 .with_projection(Some(vec![0, 1]))
2199 .with_table_partition_cols(vec![Field::new(
2200 "date",
2201 wrap_partition_type_in_dict(DataType::Utf8),
2202 false,
2203 )])
2204 .with_statistics(Statistics::new_unknown(&file_schema))
2205 .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new(
2206 "test.parquet".to_string(),
2207 1024,
2208 )])])
2209 .with_output_ordering(vec![[PhysicalSortExpr::new_default(Arc::new(
2210 Column::new("date", 0),
2211 ))]
2212 .into()])
2213 .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
2214 .with_newlines_in_values(true)
2215 .build();
2216
2217 assert_eq!(config.object_store_url, object_store_url);
2219 assert_eq!(config.file_schema, file_schema);
2220 assert_eq!(config.limit, Some(1000));
2221 assert_eq!(config.projection, Some(vec![0, 1]));
2222 assert_eq!(config.table_partition_cols.len(), 1);
2223 assert_eq!(config.table_partition_cols[0].name(), "date");
2224 assert_eq!(config.file_groups.len(), 1);
2225 assert_eq!(config.file_groups[0].len(), 1);
2226 assert_eq!(
2227 config.file_groups[0][0].object_meta.location.as_ref(),
2228 "test.parquet"
2229 );
2230 assert_eq!(
2231 config.file_compression_type,
2232 FileCompressionType::UNCOMPRESSED
2233 );
2234 assert!(config.new_lines_in_values);
2235 assert_eq!(config.output_ordering.len(), 1);
2236 }
2237
2238 #[test]
2239 fn equivalence_properties_after_schema_change() {
2240 let file_schema = aggr_test_schema();
2241 let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2242 let file_source: Arc<dyn FileSource> =
2244 Arc::new(MockSource::default().with_filter(Arc::new(BinaryExpr::new(
2245 col("c2", &file_schema).unwrap(),
2246 Operator::Eq,
2247 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2248 ))));
2249
2250 let config = FileScanConfigBuilder::new(
2251 object_store_url.clone(),
2252 Arc::clone(&file_schema),
2253 Arc::clone(&file_source),
2254 )
2255 .with_projection(Some(vec![0, 1, 2]))
2256 .build();
2257
2258 let data_source = config
2261 .try_swapping_with_projection(&[ProjectionExpr::new(
2262 col("c3", &file_schema).unwrap(),
2263 "c3".to_string(),
2264 )])
2265 .unwrap()
2266 .unwrap();
2267
2268 let eq_properties = data_source.eq_properties();
2271 let eq_group = eq_properties.eq_group();
2272
2273 for class in eq_group.iter() {
2274 for expr in class.iter() {
2275 if let Some(col) = expr.as_any().downcast_ref::<Column>() {
2276 assert_ne!(
2277 col.name(),
2278 "c2",
2279 "c2 should not be present in any equivalence class"
2280 );
2281 }
2282 }
2283 }
2284 }
2285
2286 #[test]
2287 fn test_file_scan_config_builder_defaults() {
2288 let file_schema = aggr_test_schema();
2289 let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2290 let file_source: Arc<dyn FileSource> = Arc::new(MockSource::default());
2291
2292 let config = FileScanConfigBuilder::new(
2294 object_store_url.clone(),
2295 Arc::clone(&file_schema),
2296 Arc::clone(&file_source),
2297 )
2298 .build();
2299
2300 assert_eq!(config.object_store_url, object_store_url);
2302 assert_eq!(config.file_schema, file_schema);
2303 assert_eq!(config.limit, None);
2304 assert_eq!(config.projection, None);
2305 assert!(config.table_partition_cols.is_empty());
2306 assert!(config.file_groups.is_empty());
2307 assert_eq!(
2308 config.file_compression_type,
2309 FileCompressionType::UNCOMPRESSED
2310 );
2311 assert!(!config.new_lines_in_values);
2312 assert!(config.output_ordering.is_empty());
2313 assert!(config.constraints.is_empty());
2314
2315 assert_eq!(
2317 config.file_source.statistics().unwrap().num_rows,
2318 Precision::Absent
2319 );
2320 assert_eq!(
2321 config.file_source.statistics().unwrap().total_byte_size,
2322 Precision::Absent
2323 );
2324 assert_eq!(
2325 config
2326 .file_source
2327 .statistics()
2328 .unwrap()
2329 .column_statistics
2330 .len(),
2331 file_schema.fields().len()
2332 );
2333 for stat in config.file_source.statistics().unwrap().column_statistics {
2334 assert_eq!(stat.distinct_count, Precision::Absent);
2335 assert_eq!(stat.min_value, Precision::Absent);
2336 assert_eq!(stat.max_value, Precision::Absent);
2337 assert_eq!(stat.null_count, Precision::Absent);
2338 }
2339 }
2340
2341 #[test]
2342 fn test_file_scan_config_builder_new_from() {
2343 let schema = aggr_test_schema();
2344 let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2345 let file_source: Arc<dyn FileSource> = Arc::new(MockSource::default());
2346 let partition_cols = vec![Field::new(
2347 "date",
2348 wrap_partition_type_in_dict(DataType::Utf8),
2349 false,
2350 )];
2351 let file = PartitionedFile::new("test_file.parquet", 100);
2352
2353 let original_config = FileScanConfigBuilder::new(
2355 object_store_url.clone(),
2356 Arc::clone(&schema),
2357 Arc::clone(&file_source),
2358 )
2359 .with_projection(Some(vec![0, 2]))
2360 .with_limit(Some(10))
2361 .with_table_partition_cols(partition_cols.clone())
2362 .with_file(file.clone())
2363 .with_constraints(Constraints::default())
2364 .with_newlines_in_values(true)
2365 .build();
2366
2367 let new_builder = FileScanConfigBuilder::from(original_config);
2369
2370 let new_config = new_builder.build();
2372
2373 let partition_cols = partition_cols.into_iter().map(Arc::new).collect::<Vec<_>>();
2375 assert_eq!(new_config.object_store_url, object_store_url);
2376 assert_eq!(new_config.file_schema, schema);
2377 assert_eq!(new_config.projection, Some(vec![0, 2]));
2378 assert_eq!(new_config.limit, Some(10));
2379 assert_eq!(new_config.table_partition_cols, partition_cols);
2380 assert_eq!(new_config.file_groups.len(), 1);
2381 assert_eq!(new_config.file_groups[0].len(), 1);
2382 assert_eq!(
2383 new_config.file_groups[0][0].object_meta.location.as_ref(),
2384 "test_file.parquet"
2385 );
2386 assert_eq!(new_config.constraints, Constraints::default());
2387 assert!(new_config.new_lines_in_values);
2388 }
2389
2390 #[test]
2391 fn test_split_groups_by_statistics_with_target_partitions() -> Result<()> {
2392 use datafusion_common::DFSchema;
2393 use datafusion_expr::{col, execution_props::ExecutionProps};
2394
2395 let schema = Arc::new(Schema::new(vec![Field::new(
2396 "value",
2397 DataType::Float64,
2398 false,
2399 )]));
2400
2401 let exec_props = ExecutionProps::new();
2403 let df_schema = DFSchema::try_from_qualified_schema("test", schema.as_ref())?;
2404 let sort_expr = [col("value").sort(true, false)];
2405 let sort_ordering = sort_expr
2406 .map(|expr| {
2407 create_physical_sort_expr(&expr, &df_schema, &exec_props).unwrap()
2408 })
2409 .into();
2410
2411 struct TestCase {
2413 name: String,
2414 file_count: usize,
2415 overlap_factor: f64,
2416 target_partitions: usize,
2417 expected_partition_count: usize,
2418 }
2419
2420 let test_cases = vec![
2421 TestCase {
2423 name: "no_overlap_10_files_4_partitions".to_string(),
2424 file_count: 10,
2425 overlap_factor: 0.0,
2426 target_partitions: 4,
2427 expected_partition_count: 4,
2428 },
2429 TestCase {
2430 name: "medium_overlap_20_files_5_partitions".to_string(),
2431 file_count: 20,
2432 overlap_factor: 0.5,
2433 target_partitions: 5,
2434 expected_partition_count: 5,
2435 },
2436 TestCase {
2437 name: "high_overlap_30_files_3_partitions".to_string(),
2438 file_count: 30,
2439 overlap_factor: 0.8,
2440 target_partitions: 3,
2441 expected_partition_count: 7,
2442 },
2443 TestCase {
2445 name: "fewer_files_than_partitions".to_string(),
2446 file_count: 3,
2447 overlap_factor: 0.0,
2448 target_partitions: 10,
2449 expected_partition_count: 3, },
2451 TestCase {
2452 name: "single_file".to_string(),
2453 file_count: 1,
2454 overlap_factor: 0.0,
2455 target_partitions: 5,
2456 expected_partition_count: 1, },
2458 TestCase {
2459 name: "empty_files".to_string(),
2460 file_count: 0,
2461 overlap_factor: 0.0,
2462 target_partitions: 3,
2463 expected_partition_count: 0, },
2465 ];
2466
2467 for case in test_cases {
2468 println!("Running test case: {}", case.name);
2469
2470 let file_groups = generate_test_files(case.file_count, case.overlap_factor);
2472
2473 let result =
2475 FileScanConfig::split_groups_by_statistics_with_target_partitions(
2476 &schema,
2477 &file_groups,
2478 &sort_ordering,
2479 case.target_partitions,
2480 )?;
2481
2482 println!(
2484 "Created {} partitions (target was {})",
2485 result.len(),
2486 case.target_partitions
2487 );
2488
2489 assert_eq!(
2491 result.len(),
2492 case.expected_partition_count,
2493 "Case '{}': Unexpected partition count",
2494 case.name
2495 );
2496
2497 assert!(
2499 verify_sort_integrity(&result),
2500 "Case '{}': Files within partitions are not properly ordered",
2501 case.name
2502 );
2503
2504 if case.file_count > 1 && case.expected_partition_count > 1 {
2506 let group_sizes: Vec<usize> = result.iter().map(FileGroup::len).collect();
2507 let max_size = *group_sizes.iter().max().unwrap();
2508 let min_size = *group_sizes.iter().min().unwrap();
2509
2510 let avg_files_per_partition =
2512 case.file_count as f64 / case.expected_partition_count as f64;
2513 assert!(
2514 (max_size as f64) < 2.0 * avg_files_per_partition,
2515 "Case '{}': Unbalanced distribution. Max partition size {} exceeds twice the average {}",
2516 case.name,
2517 max_size,
2518 avg_files_per_partition
2519 );
2520
2521 println!("Distribution - min files: {min_size}, max files: {max_size}");
2522 }
2523 }
2524
2525 let empty_groups: Vec<FileGroup> = vec![];
2527 let err = FileScanConfig::split_groups_by_statistics_with_target_partitions(
2528 &schema,
2529 &empty_groups,
2530 &sort_ordering,
2531 0,
2532 )
2533 .unwrap_err();
2534
2535 assert!(
2536 err.to_string()
2537 .contains("target_partitions must be greater than 0"),
2538 "Expected error for zero target partitions"
2539 );
2540
2541 Ok(())
2542 }
2543}