1use std::{
22 any::Any, borrow::Cow, collections::HashMap, fmt::Debug, fmt::Formatter,
23 fmt::Result as FmtResult, marker::PhantomData, sync::Arc,
24};
25
26use crate::file_groups::FileGroup;
27#[allow(unused_imports)]
28use crate::schema_adapter::SchemaAdapterFactory;
29use crate::{
30 display::FileGroupsDisplay, file::FileSource,
31 file_compression_type::FileCompressionType, file_stream::FileStream,
32 source::DataSource, statistics::MinMaxStatistics, PartitionedFile,
33};
34use arrow::datatypes::FieldRef;
35use arrow::{
36 array::{
37 ArrayData, ArrayRef, BufferBuilder, DictionaryArray, RecordBatch,
38 RecordBatchOptions,
39 },
40 buffer::Buffer,
41 datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef, UInt16Type},
42};
43use datafusion_common::config::ConfigOptions;
44use datafusion_common::{
45 exec_err, ColumnStatistics, Constraints, DataFusionError, Result, ScalarValue,
46 Statistics,
47};
48use datafusion_execution::{
49 object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext,
50};
51use datafusion_physical_expr::{expressions::Column, utils::reassign_predicate_columns};
52use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
53use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
54use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
55use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
56use datafusion_physical_plan::projection::ProjectionExpr;
57use datafusion_physical_plan::{
58 display::{display_orderings, ProjectSchemaDisplay},
59 metrics::ExecutionPlanMetricsSet,
60 projection::{all_alias_free_columns, new_projections_for_columns},
61 DisplayAs, DisplayFormatType,
62};
63use datafusion_physical_plan::{
64 filter::collect_columns_from_predicate, filter_pushdown::FilterPushdownPropagation,
65};
66
67use datafusion_physical_plan::coop::cooperative;
68use datafusion_physical_plan::execution_plan::SchedulingType;
69use log::{debug, warn};
70
71#[derive(Clone)]
143pub struct FileScanConfig {
144 pub object_store_url: ObjectStoreUrl,
156 pub file_schema: SchemaRef,
165 pub file_groups: Vec<FileGroup>,
175 pub constraints: Constraints,
177 pub projection: Option<Vec<usize>>,
180 pub limit: Option<usize>,
183 pub table_partition_cols: Vec<FieldRef>,
185 pub output_ordering: Vec<LexOrdering>,
187 pub file_compression_type: FileCompressionType,
189 pub new_lines_in_values: bool,
191 pub file_source: Arc<dyn FileSource>,
193 pub batch_size: Option<usize>,
196 pub expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
199}
200
201#[derive(Clone)]
251pub struct FileScanConfigBuilder {
252 object_store_url: ObjectStoreUrl,
253 file_schema: SchemaRef,
266 file_source: Arc<dyn FileSource>,
267 limit: Option<usize>,
268 projection: Option<Vec<usize>>,
269 table_partition_cols: Vec<FieldRef>,
270 constraints: Option<Constraints>,
271 file_groups: Vec<FileGroup>,
272 statistics: Option<Statistics>,
273 output_ordering: Vec<LexOrdering>,
274 file_compression_type: Option<FileCompressionType>,
275 new_lines_in_values: Option<bool>,
276 batch_size: Option<usize>,
277 expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
278}
279
280impl FileScanConfigBuilder {
281 pub fn new(
288 object_store_url: ObjectStoreUrl,
289 file_schema: SchemaRef,
290 file_source: Arc<dyn FileSource>,
291 ) -> Self {
292 Self {
293 object_store_url,
294 file_schema,
295 file_source,
296 file_groups: vec![],
297 statistics: None,
298 output_ordering: vec![],
299 file_compression_type: None,
300 new_lines_in_values: None,
301 limit: None,
302 projection: None,
303 table_partition_cols: vec![],
304 constraints: None,
305 batch_size: None,
306 expr_adapter_factory: None,
307 }
308 }
309
310 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
313 self.limit = limit;
314 self
315 }
316
317 pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
322 self.file_source = file_source;
323 self
324 }
325
326 pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
329 self.projection = projection;
330 self
331 }
332
333 pub fn with_table_partition_cols(mut self, table_partition_cols: Vec<Field>) -> Self {
335 self.table_partition_cols = table_partition_cols
336 .into_iter()
337 .map(|f| Arc::new(f) as FieldRef)
338 .collect();
339 self
340 }
341
342 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
344 self.constraints = Some(constraints);
345 self
346 }
347
348 pub fn with_statistics(mut self, statistics: Statistics) -> Self {
351 self.statistics = Some(statistics);
352 self
353 }
354
355 pub fn with_file_groups(mut self, file_groups: Vec<FileGroup>) -> Self {
365 self.file_groups = file_groups;
366 self
367 }
368
369 pub fn with_file_group(mut self, file_group: FileGroup) -> Self {
373 self.file_groups.push(file_group);
374 self
375 }
376
377 pub fn with_file(self, file: PartitionedFile) -> Self {
381 self.with_file_group(FileGroup::new(vec![file]))
382 }
383
384 pub fn with_output_ordering(mut self, output_ordering: Vec<LexOrdering>) -> Self {
386 self.output_ordering = output_ordering;
387 self
388 }
389
390 pub fn with_file_compression_type(
392 mut self,
393 file_compression_type: FileCompressionType,
394 ) -> Self {
395 self.file_compression_type = Some(file_compression_type);
396 self
397 }
398
399 pub fn with_newlines_in_values(mut self, new_lines_in_values: bool) -> Self {
405 self.new_lines_in_values = Some(new_lines_in_values);
406 self
407 }
408
409 pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
411 self.batch_size = batch_size;
412 self
413 }
414
415 pub fn with_expr_adapter(
422 mut self,
423 expr_adapter: Option<Arc<dyn PhysicalExprAdapterFactory>>,
424 ) -> Self {
425 self.expr_adapter_factory = expr_adapter;
426 self
427 }
428
429 pub fn build(self) -> FileScanConfig {
434 let Self {
435 object_store_url,
436 file_schema,
437 file_source,
438 limit,
439 projection,
440 table_partition_cols,
441 constraints,
442 file_groups,
443 statistics,
444 output_ordering,
445 file_compression_type,
446 new_lines_in_values,
447 batch_size,
448 expr_adapter_factory: expr_adapter,
449 } = self;
450
451 let constraints = constraints.unwrap_or_default();
452 let statistics =
453 statistics.unwrap_or_else(|| Statistics::new_unknown(&file_schema));
454
455 let file_source = file_source
456 .with_statistics(statistics.clone())
457 .with_schema(Arc::clone(&file_schema));
458 let file_compression_type =
459 file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED);
460 let new_lines_in_values = new_lines_in_values.unwrap_or(false);
461
462 FileScanConfig {
463 object_store_url,
464 file_schema,
465 file_source,
466 limit,
467 projection,
468 table_partition_cols,
469 constraints,
470 file_groups,
471 output_ordering,
472 file_compression_type,
473 new_lines_in_values,
474 batch_size,
475 expr_adapter_factory: expr_adapter,
476 }
477 }
478}
479
480impl From<FileScanConfig> for FileScanConfigBuilder {
481 fn from(config: FileScanConfig) -> Self {
482 Self {
483 object_store_url: config.object_store_url,
484 file_schema: config.file_schema,
485 file_source: Arc::<dyn FileSource>::clone(&config.file_source),
486 file_groups: config.file_groups,
487 statistics: config.file_source.statistics().ok(),
488 output_ordering: config.output_ordering,
489 file_compression_type: Some(config.file_compression_type),
490 new_lines_in_values: Some(config.new_lines_in_values),
491 limit: config.limit,
492 projection: config.projection,
493 table_partition_cols: config.table_partition_cols,
494 constraints: Some(config.constraints),
495 batch_size: config.batch_size,
496 expr_adapter_factory: config.expr_adapter_factory,
497 }
498 }
499}
500
501impl DataSource for FileScanConfig {
502 fn open(
503 &self,
504 partition: usize,
505 context: Arc<TaskContext>,
506 ) -> Result<SendableRecordBatchStream> {
507 let object_store = context.runtime_env().object_store(&self.object_store_url)?;
508 let batch_size = self
509 .batch_size
510 .unwrap_or_else(|| context.session_config().batch_size());
511
512 let source = self
513 .file_source
514 .with_batch_size(batch_size)
515 .with_projection(self);
516
517 let opener = source.create_file_opener(object_store, self, partition);
518
519 let stream = FileStream::new(self, partition, opener, source.metrics())?;
520 Ok(Box::pin(cooperative(stream)))
521 }
522
523 fn as_any(&self) -> &dyn Any {
524 self
525 }
526
527 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
528 match t {
529 DisplayFormatType::Default | DisplayFormatType::Verbose => {
530 let schema = self.projected_schema();
531 let orderings = get_projected_output_ordering(self, &schema);
532
533 write!(f, "file_groups=")?;
534 FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
535
536 if !schema.fields().is_empty() {
537 write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
538 }
539
540 if let Some(limit) = self.limit {
541 write!(f, ", limit={limit}")?;
542 }
543
544 display_orderings(f, &orderings)?;
545
546 if !self.constraints.is_empty() {
547 write!(f, ", {}", self.constraints)?;
548 }
549
550 self.fmt_file_source(t, f)
551 }
552 DisplayFormatType::TreeRender => {
553 writeln!(f, "format={}", self.file_source.file_type())?;
554 self.file_source.fmt_extra(t, f)?;
555 let num_files = self.file_groups.iter().map(|fg| fg.len()).sum::<usize>();
556 writeln!(f, "files={num_files}")?;
557 Ok(())
558 }
559 }
560 }
561
562 fn repartitioned(
564 &self,
565 target_partitions: usize,
566 repartition_file_min_size: usize,
567 output_ordering: Option<LexOrdering>,
568 ) -> Result<Option<Arc<dyn DataSource>>> {
569 let source = self.file_source.repartitioned(
570 target_partitions,
571 repartition_file_min_size,
572 output_ordering,
573 self,
574 )?;
575
576 Ok(source.map(|s| Arc::new(s) as _))
577 }
578
579 fn output_partitioning(&self) -> Partitioning {
580 Partitioning::UnknownPartitioning(self.file_groups.len())
581 }
582
583 fn eq_properties(&self) -> EquivalenceProperties {
584 let (schema, constraints, _, orderings) = self.project();
585 let mut eq_properties =
586 EquivalenceProperties::new_with_orderings(Arc::clone(&schema), orderings)
587 .with_constraints(constraints);
588 if let Some(filter) = self.file_source.filter() {
589 match reassign_predicate_columns(filter, &schema, true) {
592 Ok(filter) => {
593 match Self::add_filter_equivalence_info(filter, &mut eq_properties) {
594 Ok(()) => {}
595 Err(e) => {
596 warn!("Failed to add filter equivalence info: {e}");
597 #[cfg(debug_assertions)]
598 panic!("Failed to add filter equivalence info: {e}");
599 }
600 }
601 }
602 Err(e) => {
603 warn!("Failed to reassign predicate columns: {e}");
604 #[cfg(debug_assertions)]
605 panic!("Failed to reassign predicate columns: {e}");
606 }
607 };
608 }
609 eq_properties
610 }
611
612 fn scheduling_type(&self) -> SchedulingType {
613 SchedulingType::Cooperative
614 }
615
616 fn statistics(&self) -> Result<Statistics> {
617 Ok(self.projected_stats())
618 }
619
620 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
621 let source = FileScanConfigBuilder::from(self.clone())
622 .with_limit(limit)
623 .build();
624 Some(Arc::new(source))
625 }
626
627 fn fetch(&self) -> Option<usize> {
628 self.limit
629 }
630
631 fn metrics(&self) -> ExecutionPlanMetricsSet {
632 self.file_source.metrics().clone()
633 }
634
635 fn try_swapping_with_projection(
636 &self,
637 projection: &[ProjectionExpr],
638 ) -> Result<Option<Arc<dyn DataSource>>> {
639 let partitioned_columns_in_proj = projection.iter().any(|proj_expr| {
643 proj_expr
644 .expr
645 .as_any()
646 .downcast_ref::<Column>()
647 .map(|expr| expr.index() >= self.file_schema.fields().len())
648 .unwrap_or(false)
649 });
650
651 let no_aliases = all_alias_free_columns(projection);
653
654 Ok((no_aliases && !partitioned_columns_in_proj).then(|| {
655 let file_scan = self.clone();
656 let source = Arc::clone(&file_scan.file_source);
657 let new_projections = new_projections_for_columns(
658 projection,
659 &file_scan
660 .projection
661 .clone()
662 .unwrap_or_else(|| (0..self.file_schema.fields().len()).collect()),
663 );
664
665 Arc::new(
666 FileScanConfigBuilder::from(file_scan)
667 .with_projection(Some(new_projections))
669 .with_source(source)
670 .build(),
671 ) as _
672 }))
673 }
674
675 fn try_pushdown_filters(
676 &self,
677 filters: Vec<Arc<dyn PhysicalExpr>>,
678 config: &ConfigOptions,
679 ) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
680 let result = self.file_source.try_pushdown_filters(filters, config)?;
681 match result.updated_node {
682 Some(new_file_source) => {
683 let file_scan_config = FileScanConfigBuilder::from(self.clone())
684 .with_source(new_file_source)
685 .build();
686 Ok(FilterPushdownPropagation {
687 filters: result.filters,
688 updated_node: Some(Arc::new(file_scan_config) as _),
689 })
690 }
691 None => {
692 Ok(FilterPushdownPropagation {
694 filters: result.filters,
695 updated_node: None,
696 })
697 }
698 }
699 }
700}
701
702impl FileScanConfig {
703 fn projection_indices(&self) -> Vec<usize> {
704 match &self.projection {
705 Some(proj) => proj.clone(),
706 None => (0..self.file_schema.fields().len()
707 + self.table_partition_cols.len())
708 .collect(),
709 }
710 }
711
712 pub fn projected_stats(&self) -> Statistics {
713 let statistics = self.file_source.statistics().unwrap();
714
715 let table_cols_stats = self
716 .projection_indices()
717 .into_iter()
718 .map(|idx| {
719 if idx < self.file_schema.fields().len() {
720 statistics.column_statistics[idx].clone()
721 } else {
722 ColumnStatistics::new_unknown()
724 }
725 })
726 .collect();
727
728 Statistics {
729 num_rows: statistics.num_rows,
730 total_byte_size: statistics.total_byte_size,
732 column_statistics: table_cols_stats,
733 }
734 }
735
736 pub fn projected_schema(&self) -> Arc<Schema> {
737 let table_fields: Vec<_> = self
738 .projection_indices()
739 .into_iter()
740 .map(|idx| {
741 if idx < self.file_schema.fields().len() {
742 self.file_schema.field(idx).clone()
743 } else {
744 let partition_idx = idx - self.file_schema.fields().len();
745 Arc::unwrap_or_clone(Arc::clone(
746 &self.table_partition_cols[partition_idx],
747 ))
748 }
749 })
750 .collect();
751
752 Arc::new(Schema::new_with_metadata(
753 table_fields,
754 self.file_schema.metadata().clone(),
755 ))
756 }
757
758 fn add_filter_equivalence_info(
759 filter: Arc<dyn PhysicalExpr>,
760 eq_properties: &mut EquivalenceProperties,
761 ) -> Result<()> {
762 let (equal_pairs, _) = collect_columns_from_predicate(&filter);
763 for (lhs, rhs) in equal_pairs {
764 eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
765 }
766 Ok(())
767 }
768
769 pub fn projected_constraints(&self) -> Constraints {
770 let indexes = self.projection_indices();
771 self.constraints.project(&indexes).unwrap_or_default()
772 }
773
774 pub fn newlines_in_values(&self) -> bool {
782 self.new_lines_in_values
783 }
784
785 pub fn project(&self) -> (SchemaRef, Constraints, Statistics, Vec<LexOrdering>) {
787 if self.projection.is_none() && self.table_partition_cols.is_empty() {
788 return (
789 Arc::clone(&self.file_schema),
790 self.constraints.clone(),
791 self.file_source.statistics().unwrap().clone(),
792 self.output_ordering.clone(),
793 );
794 }
795
796 let schema = self.projected_schema();
797 let constraints = self.projected_constraints();
798 let stats = self.projected_stats();
799
800 let output_ordering = get_projected_output_ordering(self, &schema);
801
802 (schema, constraints, stats, output_ordering)
803 }
804
805 pub fn projected_file_column_names(&self) -> Option<Vec<String>> {
806 self.projection.as_ref().map(|p| {
807 p.iter()
808 .filter(|col_idx| **col_idx < self.file_schema.fields().len())
809 .map(|col_idx| self.file_schema.field(*col_idx).name())
810 .cloned()
811 .collect()
812 })
813 }
814
815 pub fn projected_file_schema(&self) -> SchemaRef {
817 let fields = self.file_column_projection_indices().map(|indices| {
818 indices
819 .iter()
820 .map(|col_idx| self.file_schema.field(*col_idx))
821 .cloned()
822 .collect::<Vec<_>>()
823 });
824
825 fields.map_or_else(
826 || Arc::clone(&self.file_schema),
827 |f| {
828 Arc::new(Schema::new_with_metadata(
829 f,
830 self.file_schema.metadata.clone(),
831 ))
832 },
833 )
834 }
835
836 pub fn file_column_projection_indices(&self) -> Option<Vec<usize>> {
837 self.projection.as_ref().map(|p| {
838 p.iter()
839 .filter(|col_idx| **col_idx < self.file_schema.fields().len())
840 .copied()
841 .collect()
842 })
843 }
844
845 pub fn split_groups_by_statistics_with_target_partitions(
867 table_schema: &SchemaRef,
868 file_groups: &[FileGroup],
869 sort_order: &LexOrdering,
870 target_partitions: usize,
871 ) -> Result<Vec<FileGroup>> {
872 if target_partitions == 0 {
873 return Err(DataFusionError::Internal(
874 "target_partitions must be greater than 0".to_string(),
875 ));
876 }
877
878 let flattened_files = file_groups
879 .iter()
880 .flat_map(FileGroup::iter)
881 .collect::<Vec<_>>();
882
883 if flattened_files.is_empty() {
884 return Ok(vec![]);
885 }
886
887 let statistics = MinMaxStatistics::new_from_files(
888 sort_order,
889 table_schema,
890 None,
891 flattened_files.iter().copied(),
892 )?;
893
894 let indices_sorted_by_min = statistics.min_values_sorted();
895
896 let mut file_groups_indices: Vec<Vec<usize>> = vec![vec![]; target_partitions];
898
899 for (idx, min) in indices_sorted_by_min {
900 if let Some((_, group)) = file_groups_indices
901 .iter_mut()
902 .enumerate()
903 .filter(|(_, group)| {
904 group.is_empty()
905 || min
906 > statistics
907 .max(*group.last().expect("groups should not be empty"))
908 })
909 .min_by_key(|(_, group)| group.len())
910 {
911 group.push(idx);
912 } else {
913 file_groups_indices.push(vec![idx]);
915 }
916 }
917
918 file_groups_indices.retain(|group| !group.is_empty());
920
921 Ok(file_groups_indices
923 .into_iter()
924 .map(|file_group_indices| {
925 FileGroup::new(
926 file_group_indices
927 .into_iter()
928 .map(|idx| flattened_files[idx].clone())
929 .collect(),
930 )
931 })
932 .collect())
933 }
934
935 pub fn split_groups_by_statistics(
939 table_schema: &SchemaRef,
940 file_groups: &[FileGroup],
941 sort_order: &LexOrdering,
942 ) -> Result<Vec<FileGroup>> {
943 let flattened_files = file_groups
944 .iter()
945 .flat_map(FileGroup::iter)
946 .collect::<Vec<_>>();
947 if flattened_files.is_empty() {
959 return Ok(vec![]);
960 }
961
962 let statistics = MinMaxStatistics::new_from_files(
963 sort_order,
964 table_schema,
965 None,
966 flattened_files.iter().copied(),
967 )
968 .map_err(|e| {
969 e.context("construct min/max statistics for split_groups_by_statistics")
970 })?;
971
972 let indices_sorted_by_min = statistics.min_values_sorted();
973 let mut file_groups_indices: Vec<Vec<usize>> = vec![];
974
975 for (idx, min) in indices_sorted_by_min {
976 let file_group_to_insert = file_groups_indices.iter_mut().find(|group| {
977 min > statistics.max(
980 *group
981 .last()
982 .expect("groups should be nonempty at construction"),
983 )
984 });
985 match file_group_to_insert {
986 Some(group) => group.push(idx),
987 None => file_groups_indices.push(vec![idx]),
988 }
989 }
990
991 Ok(file_groups_indices
993 .into_iter()
994 .map(|file_group_indices| {
995 file_group_indices
996 .into_iter()
997 .map(|idx| flattened_files[idx].clone())
998 .collect()
999 })
1000 .collect())
1001 }
1002
1003 fn fmt_file_source(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
1005 write!(f, ", file_type={}", self.file_source.file_type())?;
1006 self.file_source.fmt_extra(t, f)
1007 }
1008
1009 pub fn file_source(&self) -> &Arc<dyn FileSource> {
1011 &self.file_source
1012 }
1013}
1014
1015impl Debug for FileScanConfig {
1016 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1017 write!(f, "FileScanConfig {{")?;
1018 write!(f, "object_store_url={:?}, ", self.object_store_url)?;
1019
1020 write!(
1021 f,
1022 "statistics={:?}, ",
1023 self.file_source.statistics().unwrap()
1024 )?;
1025
1026 DisplayAs::fmt_as(self, DisplayFormatType::Verbose, f)?;
1027 write!(f, "}}")
1028 }
1029}
1030
1031impl DisplayAs for FileScanConfig {
1032 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
1033 let schema = self.projected_schema();
1034 let orderings = get_projected_output_ordering(self, &schema);
1035
1036 write!(f, "file_groups=")?;
1037 FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
1038
1039 if !schema.fields().is_empty() {
1040 write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
1041 }
1042
1043 if let Some(limit) = self.limit {
1044 write!(f, ", limit={limit}")?;
1045 }
1046
1047 display_orderings(f, &orderings)?;
1048
1049 if !self.constraints.is_empty() {
1050 write!(f, ", {}", self.constraints)?;
1051 }
1052
1053 Ok(())
1054 }
1055}
1056
1057pub struct PartitionColumnProjector {
1064 key_buffer_cache: ZeroBufferGenerators,
1068 projected_partition_indexes: Vec<(usize, usize)>,
1072 projected_schema: SchemaRef,
1074}
1075
1076impl PartitionColumnProjector {
1077 pub fn new(projected_schema: SchemaRef, table_partition_cols: &[String]) -> Self {
1081 let mut idx_map = HashMap::new();
1082 for (partition_idx, partition_name) in table_partition_cols.iter().enumerate() {
1083 if let Ok(schema_idx) = projected_schema.index_of(partition_name) {
1084 idx_map.insert(partition_idx, schema_idx);
1085 }
1086 }
1087
1088 let mut projected_partition_indexes: Vec<_> = idx_map.into_iter().collect();
1089 projected_partition_indexes.sort_by(|(_, a), (_, b)| a.cmp(b));
1090
1091 Self {
1092 projected_partition_indexes,
1093 key_buffer_cache: Default::default(),
1094 projected_schema,
1095 }
1096 }
1097
1098 pub fn project(
1103 &mut self,
1104 file_batch: RecordBatch,
1105 partition_values: &[ScalarValue],
1106 ) -> Result<RecordBatch> {
1107 let expected_cols =
1108 self.projected_schema.fields().len() - self.projected_partition_indexes.len();
1109
1110 if file_batch.columns().len() != expected_cols {
1111 return exec_err!(
1112 "Unexpected batch schema from file, expected {} cols but got {}",
1113 expected_cols,
1114 file_batch.columns().len()
1115 );
1116 }
1117
1118 let mut cols = file_batch.columns().to_vec();
1119 for &(pidx, sidx) in &self.projected_partition_indexes {
1120 let p_value =
1121 partition_values
1122 .get(pidx)
1123 .ok_or(DataFusionError::Execution(
1124 "Invalid partitioning found on disk".to_string(),
1125 ))?;
1126
1127 let mut partition_value = Cow::Borrowed(p_value);
1128
1129 let field = self.projected_schema.field(sidx);
1131 let expected_data_type = field.data_type();
1132 let actual_data_type = partition_value.data_type();
1133 if let DataType::Dictionary(key_type, _) = expected_data_type {
1134 if !matches!(actual_data_type, DataType::Dictionary(_, _)) {
1135 warn!("Partition value for column {} was not dictionary-encoded, applied auto-fix.", field.name());
1136 partition_value = Cow::Owned(ScalarValue::Dictionary(
1137 key_type.clone(),
1138 Box::new(partition_value.as_ref().clone()),
1139 ));
1140 }
1141 }
1142
1143 cols.insert(
1144 sidx,
1145 create_output_array(
1146 &mut self.key_buffer_cache,
1147 partition_value.as_ref(),
1148 file_batch.num_rows(),
1149 )?,
1150 )
1151 }
1152
1153 RecordBatch::try_new_with_options(
1154 Arc::clone(&self.projected_schema),
1155 cols,
1156 &RecordBatchOptions::new().with_row_count(Some(file_batch.num_rows())),
1157 )
1158 .map_err(Into::into)
1159 }
1160}
1161
1162#[derive(Debug, Default)]
1163struct ZeroBufferGenerators {
1164 gen_i8: ZeroBufferGenerator<i8>,
1165 gen_i16: ZeroBufferGenerator<i16>,
1166 gen_i32: ZeroBufferGenerator<i32>,
1167 gen_i64: ZeroBufferGenerator<i64>,
1168 gen_u8: ZeroBufferGenerator<u8>,
1169 gen_u16: ZeroBufferGenerator<u16>,
1170 gen_u32: ZeroBufferGenerator<u32>,
1171 gen_u64: ZeroBufferGenerator<u64>,
1172}
1173
1174#[derive(Debug, Default)]
1176struct ZeroBufferGenerator<T>
1177where
1178 T: ArrowNativeType,
1179{
1180 cache: Option<Buffer>,
1181 _t: PhantomData<T>,
1182}
1183
1184impl<T> ZeroBufferGenerator<T>
1185where
1186 T: ArrowNativeType,
1187{
1188 const SIZE: usize = size_of::<T>();
1189
1190 fn get_buffer(&mut self, n_vals: usize) -> Buffer {
1191 match &mut self.cache {
1192 Some(buf) if buf.len() >= n_vals * Self::SIZE => {
1193 buf.slice_with_length(0, n_vals * Self::SIZE)
1194 }
1195 _ => {
1196 let mut key_buffer_builder = BufferBuilder::<T>::new(n_vals);
1197 key_buffer_builder.advance(n_vals); self.cache.insert(key_buffer_builder.finish()).clone()
1199 }
1200 }
1201 }
1202}
1203
1204fn create_dict_array<T>(
1205 buffer_gen: &mut ZeroBufferGenerator<T>,
1206 dict_val: &ScalarValue,
1207 len: usize,
1208 data_type: DataType,
1209) -> Result<ArrayRef>
1210where
1211 T: ArrowNativeType,
1212{
1213 let dict_vals = dict_val.to_array()?;
1214
1215 let sliced_key_buffer = buffer_gen.get_buffer(len);
1216
1217 let mut builder = ArrayData::builder(data_type)
1219 .len(len)
1220 .add_buffer(sliced_key_buffer);
1221 builder = builder.add_child_data(dict_vals.to_data());
1222 Ok(Arc::new(DictionaryArray::<UInt16Type>::from(
1223 builder.build().unwrap(),
1224 )))
1225}
1226
1227fn create_output_array(
1228 key_buffer_cache: &mut ZeroBufferGenerators,
1229 val: &ScalarValue,
1230 len: usize,
1231) -> Result<ArrayRef> {
1232 if let ScalarValue::Dictionary(key_type, dict_val) = &val {
1233 match key_type.as_ref() {
1234 DataType::Int8 => {
1235 return create_dict_array(
1236 &mut key_buffer_cache.gen_i8,
1237 dict_val,
1238 len,
1239 val.data_type(),
1240 );
1241 }
1242 DataType::Int16 => {
1243 return create_dict_array(
1244 &mut key_buffer_cache.gen_i16,
1245 dict_val,
1246 len,
1247 val.data_type(),
1248 );
1249 }
1250 DataType::Int32 => {
1251 return create_dict_array(
1252 &mut key_buffer_cache.gen_i32,
1253 dict_val,
1254 len,
1255 val.data_type(),
1256 );
1257 }
1258 DataType::Int64 => {
1259 return create_dict_array(
1260 &mut key_buffer_cache.gen_i64,
1261 dict_val,
1262 len,
1263 val.data_type(),
1264 );
1265 }
1266 DataType::UInt8 => {
1267 return create_dict_array(
1268 &mut key_buffer_cache.gen_u8,
1269 dict_val,
1270 len,
1271 val.data_type(),
1272 );
1273 }
1274 DataType::UInt16 => {
1275 return create_dict_array(
1276 &mut key_buffer_cache.gen_u16,
1277 dict_val,
1278 len,
1279 val.data_type(),
1280 );
1281 }
1282 DataType::UInt32 => {
1283 return create_dict_array(
1284 &mut key_buffer_cache.gen_u32,
1285 dict_val,
1286 len,
1287 val.data_type(),
1288 );
1289 }
1290 DataType::UInt64 => {
1291 return create_dict_array(
1292 &mut key_buffer_cache.gen_u64,
1293 dict_val,
1294 len,
1295 val.data_type(),
1296 );
1297 }
1298 _ => {}
1299 }
1300 }
1301
1302 val.to_array_of_size(len)
1303}
1304
1305fn get_projected_output_ordering(
1365 base_config: &FileScanConfig,
1366 projected_schema: &SchemaRef,
1367) -> Vec<LexOrdering> {
1368 let mut all_orderings = vec![];
1369 for output_ordering in &base_config.output_ordering {
1370 let mut new_ordering = vec![];
1371 for PhysicalSortExpr { expr, options } in output_ordering.iter() {
1372 if let Some(col) = expr.as_any().downcast_ref::<Column>() {
1373 let name = col.name();
1374 if let Some((idx, _)) = projected_schema.column_with_name(name) {
1375 new_ordering.push(PhysicalSortExpr::new(
1377 Arc::new(Column::new(name, idx)),
1378 *options,
1379 ));
1380 continue;
1381 }
1382 }
1383 break;
1386 }
1387
1388 let Some(new_ordering) = LexOrdering::new(new_ordering) else {
1389 continue;
1390 };
1391
1392 if base_config.file_groups.iter().any(|group| {
1394 if group.len() <= 1 {
1395 return false;
1397 }
1398
1399 let statistics = match MinMaxStatistics::new_from_files(
1400 &new_ordering,
1401 projected_schema,
1402 base_config.projection.as_deref(),
1403 group.iter(),
1404 ) {
1405 Ok(statistics) => statistics,
1406 Err(e) => {
1407 log::trace!("Error fetching statistics for file group: {e}");
1408 return true;
1410 }
1411 };
1412
1413 !statistics.is_sorted()
1414 }) {
1415 debug!(
1416 "Skipping specified output ordering {:?}. \
1417 Some file groups couldn't be determined to be sorted: {:?}",
1418 base_config.output_ordering[0], base_config.file_groups
1419 );
1420 continue;
1421 }
1422
1423 all_orderings.push(new_ordering);
1424 }
1425 all_orderings
1426}
1427
1428pub fn wrap_partition_type_in_dict(val_type: DataType) -> DataType {
1439 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type))
1440}
1441
1442pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {
1446 ScalarValue::Dictionary(Box::new(DataType::UInt16), Box::new(val))
1447}
1448
1449#[cfg(test)]
1450mod tests {
1451 use super::*;
1452 use crate::{
1453 generate_test_files, test_util::MockSource, tests::aggr_test_schema,
1454 verify_sort_integrity,
1455 };
1456
1457 use arrow::array::{Int32Array, RecordBatch};
1458 use datafusion_common::stats::Precision;
1459 use datafusion_common::{assert_batches_eq, internal_err};
1460 use datafusion_expr::SortExpr;
1461 use datafusion_physical_expr::create_physical_sort_expr;
1462
1463 pub fn columns(schema: &Schema) -> Vec<String> {
1465 schema.fields().iter().map(|f| f.name().clone()).collect()
1466 }
1467
1468 #[test]
1469 fn physical_plan_config_no_projection() {
1470 let file_schema = aggr_test_schema();
1471 let conf = config_for_projection(
1472 Arc::clone(&file_schema),
1473 None,
1474 Statistics::new_unknown(&file_schema),
1475 to_partition_cols(vec![(
1476 "date".to_owned(),
1477 wrap_partition_type_in_dict(DataType::Utf8),
1478 )]),
1479 );
1480
1481 let (proj_schema, _, proj_statistics, _) = conf.project();
1482 assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
1483 assert_eq!(
1484 proj_schema.field(file_schema.fields().len()).name(),
1485 "date",
1486 "partition columns are the last columns"
1487 );
1488 assert_eq!(
1489 proj_statistics.column_statistics.len(),
1490 file_schema.fields().len() + 1
1491 );
1492 let col_names = conf.projected_file_column_names();
1495 assert_eq!(col_names, None);
1496
1497 let col_indices = conf.file_column_projection_indices();
1498 assert_eq!(col_indices, None);
1499 }
1500
1501 #[test]
1502 fn physical_plan_config_no_projection_tab_cols_as_field() {
1503 let file_schema = aggr_test_schema();
1504
1505 let table_partition_col =
1507 Field::new("date", wrap_partition_type_in_dict(DataType::Utf8), true)
1508 .with_metadata(HashMap::from_iter(vec![(
1509 "key_whatever".to_owned(),
1510 "value_whatever".to_owned(),
1511 )]));
1512
1513 let conf = config_for_projection(
1514 Arc::clone(&file_schema),
1515 None,
1516 Statistics::new_unknown(&file_schema),
1517 vec![table_partition_col.clone()],
1518 );
1519
1520 let proj_schema = conf.projected_schema();
1522 assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
1523 assert_eq!(
1524 *proj_schema.field(file_schema.fields().len()),
1525 table_partition_col,
1526 "partition columns are the last columns and ust have all values defined in created field"
1527 );
1528 }
1529
1530 #[test]
1531 fn physical_plan_config_with_projection() {
1532 let file_schema = aggr_test_schema();
1533 let conf = config_for_projection(
1534 Arc::clone(&file_schema),
1535 Some(vec![file_schema.fields().len(), 0]),
1536 Statistics {
1537 num_rows: Precision::Inexact(10),
1538 column_statistics: (0..file_schema.fields().len())
1541 .map(|i| ColumnStatistics {
1542 distinct_count: Precision::Inexact(i),
1543 ..Default::default()
1544 })
1545 .collect(),
1546 total_byte_size: Precision::Absent,
1547 },
1548 to_partition_cols(vec![(
1549 "date".to_owned(),
1550 wrap_partition_type_in_dict(DataType::Utf8),
1551 )]),
1552 );
1553
1554 let (proj_schema, _, proj_statistics, _) = conf.project();
1555 assert_eq!(
1556 columns(&proj_schema),
1557 vec!["date".to_owned(), "c1".to_owned()]
1558 );
1559 let proj_stat_cols = proj_statistics.column_statistics;
1560 assert_eq!(proj_stat_cols.len(), 2);
1561 assert_eq!(proj_stat_cols[1].distinct_count, Precision::Inexact(0));
1564
1565 let col_names = conf.projected_file_column_names();
1566 assert_eq!(col_names, Some(vec!["c1".to_owned()]));
1567
1568 let col_indices = conf.file_column_projection_indices();
1569 assert_eq!(col_indices, Some(vec![0]));
1570 }
1571
1572 #[test]
1573 fn partition_column_projector() {
1574 let file_batch = build_table_i32(
1575 ("a", &vec![0, 1, 2]),
1576 ("b", &vec![-2, -1, 0]),
1577 ("c", &vec![10, 11, 12]),
1578 );
1579 let partition_cols = vec![
1580 (
1581 "year".to_owned(),
1582 wrap_partition_type_in_dict(DataType::Utf8),
1583 ),
1584 (
1585 "month".to_owned(),
1586 wrap_partition_type_in_dict(DataType::Utf8),
1587 ),
1588 (
1589 "day".to_owned(),
1590 wrap_partition_type_in_dict(DataType::Utf8),
1591 ),
1592 ];
1593 let statistics = Statistics {
1595 num_rows: Precision::Inexact(3),
1596 total_byte_size: Precision::Absent,
1597 column_statistics: Statistics::unknown_column(&file_batch.schema()),
1598 };
1599
1600 let conf = config_for_projection(
1601 file_batch.schema(),
1602 Some(vec![
1604 0,
1605 1,
1606 2,
1607 file_batch.schema().fields().len(),
1608 file_batch.schema().fields().len() + 2,
1609 ]),
1610 statistics.clone(),
1611 to_partition_cols(partition_cols.clone()),
1612 );
1613
1614 let source_statistics = conf.file_source.statistics().unwrap();
1615 let conf_stats = conf.statistics().unwrap();
1616
1617 assert_eq!(conf_stats.num_rows, Precision::Inexact(3));
1619
1620 assert_eq!(conf_stats.column_statistics.len(), 5);
1622
1623 assert_eq!(source_statistics, statistics);
1625 assert_eq!(source_statistics.column_statistics.len(), 3);
1626
1627 let proj_schema = conf.projected_schema();
1628 let mut proj = PartitionColumnProjector::new(
1630 proj_schema,
1631 &partition_cols
1632 .iter()
1633 .map(|x| x.0.clone())
1634 .collect::<Vec<_>>(),
1635 );
1636
1637 let projected_batch = proj
1639 .project(
1640 file_batch,
1642 &[
1643 wrap_partition_value_in_dict(ScalarValue::from("2021")),
1644 wrap_partition_value_in_dict(ScalarValue::from("10")),
1645 wrap_partition_value_in_dict(ScalarValue::from("26")),
1646 ],
1647 )
1648 .expect("Projection of partition columns into record batch failed");
1649 let expected = [
1650 "+---+----+----+------+-----+",
1651 "| a | b | c | year | day |",
1652 "+---+----+----+------+-----+",
1653 "| 0 | -2 | 10 | 2021 | 26 |",
1654 "| 1 | -1 | 11 | 2021 | 26 |",
1655 "| 2 | 0 | 12 | 2021 | 26 |",
1656 "+---+----+----+------+-----+",
1657 ];
1658 assert_batches_eq!(expected, &[projected_batch]);
1659
1660 let file_batch = build_table_i32(
1662 ("a", &vec![5, 6, 7, 8, 9]),
1663 ("b", &vec![-10, -9, -8, -7, -6]),
1664 ("c", &vec![12, 13, 14, 15, 16]),
1665 );
1666 let projected_batch = proj
1667 .project(
1668 file_batch,
1670 &[
1671 wrap_partition_value_in_dict(ScalarValue::from("2021")),
1672 wrap_partition_value_in_dict(ScalarValue::from("10")),
1673 wrap_partition_value_in_dict(ScalarValue::from("27")),
1674 ],
1675 )
1676 .expect("Projection of partition columns into record batch failed");
1677 let expected = [
1678 "+---+-----+----+------+-----+",
1679 "| a | b | c | year | day |",
1680 "+---+-----+----+------+-----+",
1681 "| 5 | -10 | 12 | 2021 | 27 |",
1682 "| 6 | -9 | 13 | 2021 | 27 |",
1683 "| 7 | -8 | 14 | 2021 | 27 |",
1684 "| 8 | -7 | 15 | 2021 | 27 |",
1685 "| 9 | -6 | 16 | 2021 | 27 |",
1686 "+---+-----+----+------+-----+",
1687 ];
1688 assert_batches_eq!(expected, &[projected_batch]);
1689
1690 let file_batch = build_table_i32(
1692 ("a", &vec![0, 1, 3]),
1693 ("b", &vec![2, 3, 4]),
1694 ("c", &vec![4, 5, 6]),
1695 );
1696 let projected_batch = proj
1697 .project(
1698 file_batch,
1700 &[
1701 wrap_partition_value_in_dict(ScalarValue::from("2021")),
1702 wrap_partition_value_in_dict(ScalarValue::from("10")),
1703 wrap_partition_value_in_dict(ScalarValue::from("28")),
1704 ],
1705 )
1706 .expect("Projection of partition columns into record batch failed");
1707 let expected = [
1708 "+---+---+---+------+-----+",
1709 "| a | b | c | year | day |",
1710 "+---+---+---+------+-----+",
1711 "| 0 | 2 | 4 | 2021 | 28 |",
1712 "| 1 | 3 | 5 | 2021 | 28 |",
1713 "| 3 | 4 | 6 | 2021 | 28 |",
1714 "+---+---+---+------+-----+",
1715 ];
1716 assert_batches_eq!(expected, &[projected_batch]);
1717
1718 let file_batch = build_table_i32(
1720 ("a", &vec![0, 1, 2]),
1721 ("b", &vec![-2, -1, 0]),
1722 ("c", &vec![10, 11, 12]),
1723 );
1724 let projected_batch = proj
1725 .project(
1726 file_batch,
1728 &[
1729 ScalarValue::from("2021"),
1730 ScalarValue::from("10"),
1731 ScalarValue::from("26"),
1732 ],
1733 )
1734 .expect("Projection of partition columns into record batch failed");
1735 let expected = [
1736 "+---+----+----+------+-----+",
1737 "| a | b | c | year | day |",
1738 "+---+----+----+------+-----+",
1739 "| 0 | -2 | 10 | 2021 | 26 |",
1740 "| 1 | -1 | 11 | 2021 | 26 |",
1741 "| 2 | 0 | 12 | 2021 | 26 |",
1742 "+---+----+----+------+-----+",
1743 ];
1744 assert_batches_eq!(expected, &[projected_batch]);
1745 }
1746
1747 #[test]
1748 fn test_projected_file_schema_with_partition_col() {
1749 let schema = aggr_test_schema();
1750 let partition_cols = vec![
1751 (
1752 "part1".to_owned(),
1753 wrap_partition_type_in_dict(DataType::Utf8),
1754 ),
1755 (
1756 "part2".to_owned(),
1757 wrap_partition_type_in_dict(DataType::Utf8),
1758 ),
1759 ];
1760
1761 let projection = config_for_projection(
1763 schema.clone(),
1764 Some(vec![0, 3, 5, schema.fields().len()]),
1765 Statistics::new_unknown(&schema),
1766 to_partition_cols(partition_cols),
1767 )
1768 .projected_file_schema();
1769
1770 let expected_columns = vec!["c1", "c4", "c6"];
1772 let actual_columns = projection
1773 .fields()
1774 .iter()
1775 .map(|f| f.name().clone())
1776 .collect::<Vec<_>>();
1777 assert_eq!(expected_columns, actual_columns);
1778 }
1779
1780 #[test]
1781 fn test_projected_file_schema_without_projection() {
1782 let schema = aggr_test_schema();
1783 let partition_cols = vec![
1784 (
1785 "part1".to_owned(),
1786 wrap_partition_type_in_dict(DataType::Utf8),
1787 ),
1788 (
1789 "part2".to_owned(),
1790 wrap_partition_type_in_dict(DataType::Utf8),
1791 ),
1792 ];
1793
1794 let projection = config_for_projection(
1796 schema.clone(),
1797 None,
1798 Statistics::new_unknown(&schema),
1799 to_partition_cols(partition_cols),
1800 )
1801 .projected_file_schema();
1802
1803 assert_eq!(projection.fields(), schema.fields());
1805 }
1806
1807 #[test]
1808 fn test_split_groups_by_statistics() -> Result<()> {
1809 use chrono::TimeZone;
1810 use datafusion_common::DFSchema;
1811 use datafusion_expr::execution_props::ExecutionProps;
1812 use object_store::{path::Path, ObjectMeta};
1813
1814 struct File {
1815 name: &'static str,
1816 date: &'static str,
1817 statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
1818 }
1819 impl File {
1820 fn new(
1821 name: &'static str,
1822 date: &'static str,
1823 statistics: Vec<Option<(f64, f64)>>,
1824 ) -> Self {
1825 Self::new_nullable(
1826 name,
1827 date,
1828 statistics
1829 .into_iter()
1830 .map(|opt| opt.map(|(min, max)| (Some(min), Some(max))))
1831 .collect(),
1832 )
1833 }
1834
1835 fn new_nullable(
1836 name: &'static str,
1837 date: &'static str,
1838 statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
1839 ) -> Self {
1840 Self {
1841 name,
1842 date,
1843 statistics,
1844 }
1845 }
1846 }
1847
1848 struct TestCase {
1849 name: &'static str,
1850 file_schema: Schema,
1851 files: Vec<File>,
1852 sort: Vec<SortExpr>,
1853 expected_result: Result<Vec<Vec<&'static str>>, &'static str>,
1854 }
1855
1856 use datafusion_expr::col;
1857 let cases = vec![
1858 TestCase {
1859 name: "test sort",
1860 file_schema: Schema::new(vec![Field::new(
1861 "value".to_string(),
1862 DataType::Float64,
1863 false,
1864 )]),
1865 files: vec![
1866 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1867 File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1868 File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1869 ],
1870 sort: vec![col("value").sort(true, false)],
1871 expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1872 },
1873 TestCase {
1876 name: "test sort with files ordered differently",
1877 file_schema: Schema::new(vec![Field::new(
1878 "value".to_string(),
1879 DataType::Float64,
1880 false,
1881 )]),
1882 files: vec![
1883 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1884 File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1885 File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1886 ],
1887 sort: vec![col("value").sort(true, false)],
1888 expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1889 },
1890 TestCase {
1891 name: "reverse sort",
1892 file_schema: Schema::new(vec![Field::new(
1893 "value".to_string(),
1894 DataType::Float64,
1895 false,
1896 )]),
1897 files: vec![
1898 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1899 File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1900 File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1901 ],
1902 sort: vec![col("value").sort(false, true)],
1903 expected_result: Ok(vec![vec!["1", "0"], vec!["2"]]),
1904 },
1905 TestCase {
1906 name: "nullable sort columns, nulls last",
1907 file_schema: Schema::new(vec![Field::new(
1908 "value".to_string(),
1909 DataType::Float64,
1910 true,
1911 )]),
1912 files: vec![
1913 File::new_nullable("0", "2023-01-01", vec![Some((Some(0.00), Some(0.49)))]),
1914 File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), None))]),
1915 File::new_nullable("2", "2023-01-02", vec![Some((Some(0.00), None))]),
1916 ],
1917 sort: vec![col("value").sort(true, false)],
1918 expected_result: Ok(vec![vec!["0", "1"], vec!["2"]])
1919 },
1920 TestCase {
1921 name: "nullable sort columns, nulls first",
1922 file_schema: Schema::new(vec![Field::new(
1923 "value".to_string(),
1924 DataType::Float64,
1925 true,
1926 )]),
1927 files: vec![
1928 File::new_nullable("0", "2023-01-01", vec![Some((None, Some(0.49)))]),
1929 File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), Some(1.00)))]),
1930 File::new_nullable("2", "2023-01-02", vec![Some((None, Some(1.00)))]),
1931 ],
1932 sort: vec![col("value").sort(true, true)],
1933 expected_result: Ok(vec![vec!["0", "1"], vec!["2"]])
1934 },
1935 TestCase {
1936 name: "all three non-overlapping",
1937 file_schema: Schema::new(vec![Field::new(
1938 "value".to_string(),
1939 DataType::Float64,
1940 false,
1941 )]),
1942 files: vec![
1943 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1944 File::new("1", "2023-01-01", vec![Some((0.50, 0.99))]),
1945 File::new("2", "2023-01-02", vec![Some((1.00, 1.49))]),
1946 ],
1947 sort: vec![col("value").sort(true, false)],
1948 expected_result: Ok(vec![vec!["0", "1", "2"]]),
1949 },
1950 TestCase {
1951 name: "all three overlapping",
1952 file_schema: Schema::new(vec![Field::new(
1953 "value".to_string(),
1954 DataType::Float64,
1955 false,
1956 )]),
1957 files: vec![
1958 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1959 File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
1960 File::new("2", "2023-01-02", vec![Some((0.00, 0.49))]),
1961 ],
1962 sort: vec![col("value").sort(true, false)],
1963 expected_result: Ok(vec![vec!["0"], vec!["1"], vec!["2"]]),
1964 },
1965 TestCase {
1966 name: "empty input",
1967 file_schema: Schema::new(vec![Field::new(
1968 "value".to_string(),
1969 DataType::Float64,
1970 false,
1971 )]),
1972 files: vec![],
1973 sort: vec![col("value").sort(true, false)],
1974 expected_result: Ok(vec![]),
1975 },
1976 TestCase {
1977 name: "one file missing statistics",
1978 file_schema: Schema::new(vec![Field::new(
1979 "value".to_string(),
1980 DataType::Float64,
1981 false,
1982 )]),
1983 files: vec![
1984 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1985 File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
1986 File::new("2", "2023-01-02", vec![None]),
1987 ],
1988 sort: vec![col("value").sort(true, false)],
1989 expected_result: Err("construct min/max statistics for split_groups_by_statistics\ncaused by\ncollect min/max values\ncaused by\nget min/max for column: 'value'\ncaused by\nError during planning: statistics not found"),
1990 },
1991 ];
1992
1993 for case in cases {
1994 let table_schema = Arc::new(Schema::new(
1995 case.file_schema
1996 .fields()
1997 .clone()
1998 .into_iter()
1999 .cloned()
2000 .chain(Some(Arc::new(Field::new(
2001 "date".to_string(),
2002 DataType::Utf8,
2003 false,
2004 ))))
2005 .collect::<Vec<_>>(),
2006 ));
2007 let Some(sort_order) = LexOrdering::new(
2008 case.sort
2009 .into_iter()
2010 .map(|expr| {
2011 create_physical_sort_expr(
2012 &expr,
2013 &DFSchema::try_from(Arc::clone(&table_schema))?,
2014 &ExecutionProps::default(),
2015 )
2016 })
2017 .collect::<Result<Vec<_>>>()?,
2018 ) else {
2019 return internal_err!("This test should always use an ordering");
2020 };
2021
2022 let partitioned_files = FileGroup::new(
2023 case.files.into_iter().map(From::from).collect::<Vec<_>>(),
2024 );
2025 let result = FileScanConfig::split_groups_by_statistics(
2026 &table_schema,
2027 std::slice::from_ref(&partitioned_files),
2028 &sort_order,
2029 );
2030 let results_by_name = result
2031 .as_ref()
2032 .map(|file_groups| {
2033 file_groups
2034 .iter()
2035 .map(|file_group| {
2036 file_group
2037 .iter()
2038 .map(|file| {
2039 partitioned_files
2040 .iter()
2041 .find_map(|f| {
2042 if f.object_meta == file.object_meta {
2043 Some(
2044 f.object_meta
2045 .location
2046 .as_ref()
2047 .rsplit('/')
2048 .next()
2049 .unwrap()
2050 .trim_end_matches(".parquet"),
2051 )
2052 } else {
2053 None
2054 }
2055 })
2056 .unwrap()
2057 })
2058 .collect::<Vec<_>>()
2059 })
2060 .collect::<Vec<_>>()
2061 })
2062 .map_err(|e| e.strip_backtrace().leak() as &'static str);
2063
2064 assert_eq!(results_by_name, case.expected_result, "{}", case.name);
2065 }
2066
2067 return Ok(());
2068
2069 impl From<File> for PartitionedFile {
2070 fn from(file: File) -> Self {
2071 PartitionedFile {
2072 object_meta: ObjectMeta {
2073 location: Path::from(format!(
2074 "data/date={}/{}.parquet",
2075 file.date, file.name
2076 )),
2077 last_modified: chrono::Utc.timestamp_nanos(0),
2078 size: 0,
2079 e_tag: None,
2080 version: None,
2081 },
2082 partition_values: vec![ScalarValue::from(file.date)],
2083 range: None,
2084 statistics: Some(Arc::new(Statistics {
2085 num_rows: Precision::Absent,
2086 total_byte_size: Precision::Absent,
2087 column_statistics: file
2088 .statistics
2089 .into_iter()
2090 .map(|stats| {
2091 stats
2092 .map(|(min, max)| ColumnStatistics {
2093 min_value: Precision::Exact(
2094 ScalarValue::Float64(min),
2095 ),
2096 max_value: Precision::Exact(
2097 ScalarValue::Float64(max),
2098 ),
2099 ..Default::default()
2100 })
2101 .unwrap_or_default()
2102 })
2103 .collect::<Vec<_>>(),
2104 })),
2105 extensions: None,
2106 metadata_size_hint: None,
2107 }
2108 }
2109 }
2110 }
2111
2112 fn config_for_projection(
2114 file_schema: SchemaRef,
2115 projection: Option<Vec<usize>>,
2116 statistics: Statistics,
2117 table_partition_cols: Vec<Field>,
2118 ) -> FileScanConfig {
2119 FileScanConfigBuilder::new(
2120 ObjectStoreUrl::parse("test:///").unwrap(),
2121 file_schema,
2122 Arc::new(MockSource::default()),
2123 )
2124 .with_projection(projection)
2125 .with_statistics(statistics)
2126 .with_table_partition_cols(table_partition_cols)
2127 .build()
2128 }
2129
2130 fn to_partition_cols(table_partition_cols: Vec<(String, DataType)>) -> Vec<Field> {
2132 table_partition_cols
2133 .iter()
2134 .map(|(name, dtype)| Field::new(name, dtype.clone(), false))
2135 .collect::<Vec<_>>()
2136 }
2137
2138 pub fn build_table_i32(
2140 a: (&str, &Vec<i32>),
2141 b: (&str, &Vec<i32>),
2142 c: (&str, &Vec<i32>),
2143 ) -> RecordBatch {
2144 let schema = Schema::new(vec![
2145 Field::new(a.0, DataType::Int32, false),
2146 Field::new(b.0, DataType::Int32, false),
2147 Field::new(c.0, DataType::Int32, false),
2148 ]);
2149
2150 RecordBatch::try_new(
2151 Arc::new(schema),
2152 vec![
2153 Arc::new(Int32Array::from(a.1.clone())),
2154 Arc::new(Int32Array::from(b.1.clone())),
2155 Arc::new(Int32Array::from(c.1.clone())),
2156 ],
2157 )
2158 .unwrap()
2159 }
2160
2161 #[test]
2162 fn test_file_scan_config_builder() {
2163 let file_schema = aggr_test_schema();
2164 let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2165 let file_source: Arc<dyn FileSource> = Arc::new(MockSource::default());
2166
2167 let builder = FileScanConfigBuilder::new(
2169 object_store_url.clone(),
2170 Arc::clone(&file_schema),
2171 Arc::clone(&file_source),
2172 );
2173
2174 let config = builder
2176 .with_limit(Some(1000))
2177 .with_projection(Some(vec![0, 1]))
2178 .with_table_partition_cols(vec![Field::new(
2179 "date",
2180 wrap_partition_type_in_dict(DataType::Utf8),
2181 false,
2182 )])
2183 .with_statistics(Statistics::new_unknown(&file_schema))
2184 .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new(
2185 "test.parquet".to_string(),
2186 1024,
2187 )])])
2188 .with_output_ordering(vec![[PhysicalSortExpr::new_default(Arc::new(
2189 Column::new("date", 0),
2190 ))]
2191 .into()])
2192 .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
2193 .with_newlines_in_values(true)
2194 .build();
2195
2196 assert_eq!(config.object_store_url, object_store_url);
2198 assert_eq!(config.file_schema, file_schema);
2199 assert_eq!(config.limit, Some(1000));
2200 assert_eq!(config.projection, Some(vec![0, 1]));
2201 assert_eq!(config.table_partition_cols.len(), 1);
2202 assert_eq!(config.table_partition_cols[0].name(), "date");
2203 assert_eq!(config.file_groups.len(), 1);
2204 assert_eq!(config.file_groups[0].len(), 1);
2205 assert_eq!(
2206 config.file_groups[0][0].object_meta.location.as_ref(),
2207 "test.parquet"
2208 );
2209 assert_eq!(
2210 config.file_compression_type,
2211 FileCompressionType::UNCOMPRESSED
2212 );
2213 assert!(config.new_lines_in_values);
2214 assert_eq!(config.output_ordering.len(), 1);
2215 }
2216
2217 #[test]
2218 fn test_file_scan_config_builder_defaults() {
2219 let file_schema = aggr_test_schema();
2220 let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2221 let file_source: Arc<dyn FileSource> = Arc::new(MockSource::default());
2222
2223 let config = FileScanConfigBuilder::new(
2225 object_store_url.clone(),
2226 Arc::clone(&file_schema),
2227 Arc::clone(&file_source),
2228 )
2229 .build();
2230
2231 assert_eq!(config.object_store_url, object_store_url);
2233 assert_eq!(config.file_schema, file_schema);
2234 assert_eq!(config.limit, None);
2235 assert_eq!(config.projection, None);
2236 assert!(config.table_partition_cols.is_empty());
2237 assert!(config.file_groups.is_empty());
2238 assert_eq!(
2239 config.file_compression_type,
2240 FileCompressionType::UNCOMPRESSED
2241 );
2242 assert!(!config.new_lines_in_values);
2243 assert!(config.output_ordering.is_empty());
2244 assert!(config.constraints.is_empty());
2245
2246 assert_eq!(
2248 config.file_source.statistics().unwrap().num_rows,
2249 Precision::Absent
2250 );
2251 assert_eq!(
2252 config.file_source.statistics().unwrap().total_byte_size,
2253 Precision::Absent
2254 );
2255 assert_eq!(
2256 config
2257 .file_source
2258 .statistics()
2259 .unwrap()
2260 .column_statistics
2261 .len(),
2262 file_schema.fields().len()
2263 );
2264 for stat in config.file_source.statistics().unwrap().column_statistics {
2265 assert_eq!(stat.distinct_count, Precision::Absent);
2266 assert_eq!(stat.min_value, Precision::Absent);
2267 assert_eq!(stat.max_value, Precision::Absent);
2268 assert_eq!(stat.null_count, Precision::Absent);
2269 }
2270 }
2271
2272 #[test]
2273 fn test_file_scan_config_builder_new_from() {
2274 let schema = aggr_test_schema();
2275 let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2276 let file_source: Arc<dyn FileSource> = Arc::new(MockSource::default());
2277 let partition_cols = vec![Field::new(
2278 "date",
2279 wrap_partition_type_in_dict(DataType::Utf8),
2280 false,
2281 )];
2282 let file = PartitionedFile::new("test_file.parquet", 100);
2283
2284 let original_config = FileScanConfigBuilder::new(
2286 object_store_url.clone(),
2287 Arc::clone(&schema),
2288 Arc::clone(&file_source),
2289 )
2290 .with_projection(Some(vec![0, 2]))
2291 .with_limit(Some(10))
2292 .with_table_partition_cols(partition_cols.clone())
2293 .with_file(file.clone())
2294 .with_constraints(Constraints::default())
2295 .with_newlines_in_values(true)
2296 .build();
2297
2298 let new_builder = FileScanConfigBuilder::from(original_config);
2300
2301 let new_config = new_builder.build();
2303
2304 let partition_cols = partition_cols.into_iter().map(Arc::new).collect::<Vec<_>>();
2306 assert_eq!(new_config.object_store_url, object_store_url);
2307 assert_eq!(new_config.file_schema, schema);
2308 assert_eq!(new_config.projection, Some(vec![0, 2]));
2309 assert_eq!(new_config.limit, Some(10));
2310 assert_eq!(new_config.table_partition_cols, partition_cols);
2311 assert_eq!(new_config.file_groups.len(), 1);
2312 assert_eq!(new_config.file_groups[0].len(), 1);
2313 assert_eq!(
2314 new_config.file_groups[0][0].object_meta.location.as_ref(),
2315 "test_file.parquet"
2316 );
2317 assert_eq!(new_config.constraints, Constraints::default());
2318 assert!(new_config.new_lines_in_values);
2319 }
2320
2321 #[test]
2322 fn test_split_groups_by_statistics_with_target_partitions() -> Result<()> {
2323 use datafusion_common::DFSchema;
2324 use datafusion_expr::{col, execution_props::ExecutionProps};
2325
2326 let schema = Arc::new(Schema::new(vec![Field::new(
2327 "value",
2328 DataType::Float64,
2329 false,
2330 )]));
2331
2332 let exec_props = ExecutionProps::new();
2334 let df_schema = DFSchema::try_from_qualified_schema("test", schema.as_ref())?;
2335 let sort_expr = [col("value").sort(true, false)];
2336 let sort_ordering = sort_expr
2337 .map(|expr| {
2338 create_physical_sort_expr(&expr, &df_schema, &exec_props).unwrap()
2339 })
2340 .into();
2341
2342 struct TestCase {
2344 name: String,
2345 file_count: usize,
2346 overlap_factor: f64,
2347 target_partitions: usize,
2348 expected_partition_count: usize,
2349 }
2350
2351 let test_cases = vec![
2352 TestCase {
2354 name: "no_overlap_10_files_4_partitions".to_string(),
2355 file_count: 10,
2356 overlap_factor: 0.0,
2357 target_partitions: 4,
2358 expected_partition_count: 4,
2359 },
2360 TestCase {
2361 name: "medium_overlap_20_files_5_partitions".to_string(),
2362 file_count: 20,
2363 overlap_factor: 0.5,
2364 target_partitions: 5,
2365 expected_partition_count: 5,
2366 },
2367 TestCase {
2368 name: "high_overlap_30_files_3_partitions".to_string(),
2369 file_count: 30,
2370 overlap_factor: 0.8,
2371 target_partitions: 3,
2372 expected_partition_count: 7,
2373 },
2374 TestCase {
2376 name: "fewer_files_than_partitions".to_string(),
2377 file_count: 3,
2378 overlap_factor: 0.0,
2379 target_partitions: 10,
2380 expected_partition_count: 3, },
2382 TestCase {
2383 name: "single_file".to_string(),
2384 file_count: 1,
2385 overlap_factor: 0.0,
2386 target_partitions: 5,
2387 expected_partition_count: 1, },
2389 TestCase {
2390 name: "empty_files".to_string(),
2391 file_count: 0,
2392 overlap_factor: 0.0,
2393 target_partitions: 3,
2394 expected_partition_count: 0, },
2396 ];
2397
2398 for case in test_cases {
2399 println!("Running test case: {}", case.name);
2400
2401 let file_groups = generate_test_files(case.file_count, case.overlap_factor);
2403
2404 let result =
2406 FileScanConfig::split_groups_by_statistics_with_target_partitions(
2407 &schema,
2408 &file_groups,
2409 &sort_ordering,
2410 case.target_partitions,
2411 )?;
2412
2413 println!(
2415 "Created {} partitions (target was {})",
2416 result.len(),
2417 case.target_partitions
2418 );
2419
2420 assert_eq!(
2422 result.len(),
2423 case.expected_partition_count,
2424 "Case '{}': Unexpected partition count",
2425 case.name
2426 );
2427
2428 assert!(
2430 verify_sort_integrity(&result),
2431 "Case '{}': Files within partitions are not properly ordered",
2432 case.name
2433 );
2434
2435 if case.file_count > 1 && case.expected_partition_count > 1 {
2437 let group_sizes: Vec<usize> = result.iter().map(FileGroup::len).collect();
2438 let max_size = *group_sizes.iter().max().unwrap();
2439 let min_size = *group_sizes.iter().min().unwrap();
2440
2441 let avg_files_per_partition =
2443 case.file_count as f64 / case.expected_partition_count as f64;
2444 assert!(
2445 (max_size as f64) < 2.0 * avg_files_per_partition,
2446 "Case '{}': Unbalanced distribution. Max partition size {} exceeds twice the average {}",
2447 case.name,
2448 max_size,
2449 avg_files_per_partition
2450 );
2451
2452 println!("Distribution - min files: {min_size}, max files: {max_size}");
2453 }
2454 }
2455
2456 let empty_groups: Vec<FileGroup> = vec![];
2458 let err = FileScanConfig::split_groups_by_statistics_with_target_partitions(
2459 &schema,
2460 &empty_groups,
2461 &sort_ordering,
2462 0,
2463 )
2464 .unwrap_err();
2465
2466 assert!(
2467 err.to_string()
2468 .contains("target_partitions must be greater than 0"),
2469 "Expected error for zero target partitions"
2470 );
2471
2472 Ok(())
2473 }
2474}