1use std::{
22 any::Any, borrow::Cow, collections::HashMap, fmt::Debug, fmt::Formatter,
23 fmt::Result as FmtResult, marker::PhantomData, sync::Arc,
24};
25
26use crate::file_groups::FileGroup;
27#[allow(unused_imports)]
28use crate::schema_adapter::SchemaAdapterFactory;
29use crate::{
30 display::FileGroupsDisplay,
31 file::FileSource,
32 file_compression_type::FileCompressionType,
33 file_stream::FileStream,
34 source::{DataSource, DataSourceExec},
35 statistics::MinMaxStatistics,
36 PartitionedFile,
37};
38use arrow::datatypes::FieldRef;
39use arrow::{
40 array::{
41 ArrayData, ArrayRef, BufferBuilder, DictionaryArray, RecordBatch,
42 RecordBatchOptions,
43 },
44 buffer::Buffer,
45 datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef, UInt16Type},
46};
47use datafusion_common::config::ConfigOptions;
48use datafusion_common::{
49 exec_err, ColumnStatistics, Constraints, DataFusionError, Result, ScalarValue,
50 Statistics,
51};
52use datafusion_execution::{
53 object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext,
54};
55use datafusion_physical_expr::expressions::Column;
56use datafusion_physical_expr::schema_rewriter::PhysicalExprAdapterFactory;
57use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
58use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
59use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
60use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
61use datafusion_physical_plan::{
62 display::{display_orderings, ProjectSchemaDisplay},
63 metrics::ExecutionPlanMetricsSet,
64 projection::{all_alias_free_columns, new_projections_for_columns, ProjectionExec},
65 DisplayAs, DisplayFormatType, ExecutionPlan,
66};
67
68use datafusion_physical_plan::coop::cooperative;
69use datafusion_physical_plan::execution_plan::SchedulingType;
70use log::{debug, warn};
71
72#[derive(Clone)]
141pub struct FileScanConfig {
142 pub object_store_url: ObjectStoreUrl,
154 pub file_schema: SchemaRef,
161 pub file_groups: Vec<FileGroup>,
171 pub constraints: Constraints,
173 pub projection: Option<Vec<usize>>,
176 pub limit: Option<usize>,
179 pub table_partition_cols: Vec<FieldRef>,
181 pub output_ordering: Vec<LexOrdering>,
183 pub file_compression_type: FileCompressionType,
185 pub new_lines_in_values: bool,
187 pub file_source: Arc<dyn FileSource>,
189 pub batch_size: Option<usize>,
192 pub expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
195}
196
197#[derive(Clone)]
247pub struct FileScanConfigBuilder {
248 object_store_url: ObjectStoreUrl,
249 file_schema: SchemaRef,
260 file_source: Arc<dyn FileSource>,
261
262 limit: Option<usize>,
263 projection: Option<Vec<usize>>,
264 table_partition_cols: Vec<FieldRef>,
265 constraints: Option<Constraints>,
266 file_groups: Vec<FileGroup>,
267 statistics: Option<Statistics>,
268 output_ordering: Vec<LexOrdering>,
269 file_compression_type: Option<FileCompressionType>,
270 new_lines_in_values: Option<bool>,
271 batch_size: Option<usize>,
272 expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
273}
274
275impl FileScanConfigBuilder {
276 pub fn new(
283 object_store_url: ObjectStoreUrl,
284 file_schema: SchemaRef,
285 file_source: Arc<dyn FileSource>,
286 ) -> Self {
287 Self {
288 object_store_url,
289 file_schema,
290 file_source,
291 file_groups: vec![],
292 statistics: None,
293 output_ordering: vec![],
294 file_compression_type: None,
295 new_lines_in_values: None,
296 limit: None,
297 projection: None,
298 table_partition_cols: vec![],
299 constraints: None,
300 batch_size: None,
301 expr_adapter_factory: None,
302 }
303 }
304
305 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
308 self.limit = limit;
309 self
310 }
311
312 pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
317 self.file_source = file_source;
318 self
319 }
320
321 pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
324 self.projection = projection;
325 self
326 }
327
328 pub fn with_table_partition_cols(mut self, table_partition_cols: Vec<Field>) -> Self {
330 self.table_partition_cols = table_partition_cols
331 .into_iter()
332 .map(|f| Arc::new(f) as FieldRef)
333 .collect();
334 self
335 }
336
337 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
339 self.constraints = Some(constraints);
340 self
341 }
342
343 pub fn with_statistics(mut self, statistics: Statistics) -> Self {
346 self.statistics = Some(statistics);
347 self
348 }
349
350 pub fn with_file_groups(mut self, file_groups: Vec<FileGroup>) -> Self {
360 self.file_groups = file_groups;
361 self
362 }
363
364 pub fn with_file_group(mut self, file_group: FileGroup) -> Self {
368 self.file_groups.push(file_group);
369 self
370 }
371
372 pub fn with_file(self, file: PartitionedFile) -> Self {
376 self.with_file_group(FileGroup::new(vec![file]))
377 }
378
379 pub fn with_output_ordering(mut self, output_ordering: Vec<LexOrdering>) -> Self {
381 self.output_ordering = output_ordering;
382 self
383 }
384
385 pub fn with_file_compression_type(
387 mut self,
388 file_compression_type: FileCompressionType,
389 ) -> Self {
390 self.file_compression_type = Some(file_compression_type);
391 self
392 }
393
394 pub fn with_newlines_in_values(mut self, new_lines_in_values: bool) -> Self {
400 self.new_lines_in_values = Some(new_lines_in_values);
401 self
402 }
403
404 pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
406 self.batch_size = batch_size;
407 self
408 }
409
410 pub fn with_expr_adapter(
417 mut self,
418 expr_adapter: Option<Arc<dyn PhysicalExprAdapterFactory>>,
419 ) -> Self {
420 self.expr_adapter_factory = expr_adapter;
421 self
422 }
423
424 pub fn build(self) -> FileScanConfig {
429 let Self {
430 object_store_url,
431 file_schema,
432 file_source,
433 limit,
434 projection,
435 table_partition_cols,
436 constraints,
437 file_groups,
438 statistics,
439 output_ordering,
440 file_compression_type,
441 new_lines_in_values,
442 batch_size,
443 expr_adapter_factory: expr_adapter,
444 } = self;
445
446 let constraints = constraints.unwrap_or_default();
447 let statistics =
448 statistics.unwrap_or_else(|| Statistics::new_unknown(&file_schema));
449
450 let file_source = file_source
451 .with_statistics(statistics.clone())
452 .with_schema(Arc::clone(&file_schema));
453 let file_compression_type =
454 file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED);
455 let new_lines_in_values = new_lines_in_values.unwrap_or(false);
456
457 FileScanConfig {
458 object_store_url,
459 file_schema,
460 file_source,
461 limit,
462 projection,
463 table_partition_cols,
464 constraints,
465 file_groups,
466 output_ordering,
467 file_compression_type,
468 new_lines_in_values,
469 batch_size,
470 expr_adapter_factory: expr_adapter,
471 }
472 }
473}
474
475impl From<FileScanConfig> for FileScanConfigBuilder {
476 fn from(config: FileScanConfig) -> Self {
477 Self {
478 object_store_url: config.object_store_url,
479 file_schema: config.file_schema,
480 file_source: Arc::<dyn FileSource>::clone(&config.file_source),
481 file_groups: config.file_groups,
482 statistics: config.file_source.statistics().ok(),
483 output_ordering: config.output_ordering,
484 file_compression_type: Some(config.file_compression_type),
485 new_lines_in_values: Some(config.new_lines_in_values),
486 limit: config.limit,
487 projection: config.projection,
488 table_partition_cols: config.table_partition_cols,
489 constraints: Some(config.constraints),
490 batch_size: config.batch_size,
491 expr_adapter_factory: config.expr_adapter_factory,
492 }
493 }
494}
495
496impl DataSource for FileScanConfig {
497 fn open(
498 &self,
499 partition: usize,
500 context: Arc<TaskContext>,
501 ) -> Result<SendableRecordBatchStream> {
502 let object_store = context.runtime_env().object_store(&self.object_store_url)?;
503 let batch_size = self
504 .batch_size
505 .unwrap_or_else(|| context.session_config().batch_size());
506
507 let source = self
508 .file_source
509 .with_batch_size(batch_size)
510 .with_projection(self);
511
512 let opener = source.create_file_opener(object_store, self, partition);
513
514 let stream = FileStream::new(self, partition, opener, source.metrics())?;
515 Ok(Box::pin(cooperative(stream)))
516 }
517
518 fn as_any(&self) -> &dyn Any {
519 self
520 }
521
522 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
523 match t {
524 DisplayFormatType::Default | DisplayFormatType::Verbose => {
525 let schema = self.projected_schema();
526 let orderings = get_projected_output_ordering(self, &schema);
527
528 write!(f, "file_groups=")?;
529 FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
530
531 if !schema.fields().is_empty() {
532 write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
533 }
534
535 if let Some(limit) = self.limit {
536 write!(f, ", limit={limit}")?;
537 }
538
539 display_orderings(f, &orderings)?;
540
541 if !self.constraints.is_empty() {
542 write!(f, ", {}", self.constraints)?;
543 }
544
545 self.fmt_file_source(t, f)
546 }
547 DisplayFormatType::TreeRender => {
548 writeln!(f, "format={}", self.file_source.file_type())?;
549 self.file_source.fmt_extra(t, f)?;
550 let num_files = self.file_groups.iter().map(|fg| fg.len()).sum::<usize>();
551 writeln!(f, "files={num_files}")?;
552 Ok(())
553 }
554 }
555 }
556
557 fn repartitioned(
559 &self,
560 target_partitions: usize,
561 repartition_file_min_size: usize,
562 output_ordering: Option<LexOrdering>,
563 ) -> Result<Option<Arc<dyn DataSource>>> {
564 let source = self.file_source.repartitioned(
565 target_partitions,
566 repartition_file_min_size,
567 output_ordering,
568 self,
569 )?;
570
571 Ok(source.map(|s| Arc::new(s) as _))
572 }
573
574 fn output_partitioning(&self) -> Partitioning {
575 Partitioning::UnknownPartitioning(self.file_groups.len())
576 }
577
578 fn eq_properties(&self) -> EquivalenceProperties {
579 let (schema, constraints, _, orderings) = self.project();
580 EquivalenceProperties::new_with_orderings(schema, orderings)
581 .with_constraints(constraints)
582 }
583
584 fn scheduling_type(&self) -> SchedulingType {
585 SchedulingType::Cooperative
586 }
587
588 fn statistics(&self) -> Result<Statistics> {
589 Ok(self.projected_stats())
590 }
591
592 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
593 let source = FileScanConfigBuilder::from(self.clone())
594 .with_limit(limit)
595 .build();
596 Some(Arc::new(source))
597 }
598
599 fn fetch(&self) -> Option<usize> {
600 self.limit
601 }
602
603 fn metrics(&self) -> ExecutionPlanMetricsSet {
604 self.file_source.metrics().clone()
605 }
606
607 fn try_swapping_with_projection(
608 &self,
609 projection: &ProjectionExec,
610 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
611 let partitioned_columns_in_proj = projection.expr().iter().any(|(expr, _)| {
615 expr.as_any()
616 .downcast_ref::<Column>()
617 .map(|expr| expr.index() >= self.file_schema.fields().len())
618 .unwrap_or(false)
619 });
620
621 let no_aliases = all_alias_free_columns(projection.expr());
623
624 Ok((no_aliases && !partitioned_columns_in_proj).then(|| {
625 let file_scan = self.clone();
626 let source = Arc::clone(&file_scan.file_source);
627 let new_projections = new_projections_for_columns(
628 projection,
629 &file_scan
630 .projection
631 .clone()
632 .unwrap_or_else(|| (0..self.file_schema.fields().len()).collect()),
633 );
634 DataSourceExec::from_data_source(
635 FileScanConfigBuilder::from(file_scan)
636 .with_projection(Some(new_projections))
638 .with_source(source)
639 .build(),
640 ) as _
641 }))
642 }
643
644 fn try_pushdown_filters(
645 &self,
646 filters: Vec<Arc<dyn PhysicalExpr>>,
647 config: &ConfigOptions,
648 ) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
649 let result = self.file_source.try_pushdown_filters(filters, config)?;
650 match result.updated_node {
651 Some(new_file_source) => {
652 let file_scan_config = FileScanConfigBuilder::from(self.clone())
653 .with_source(new_file_source)
654 .build();
655 Ok(FilterPushdownPropagation {
656 filters: result.filters,
657 updated_node: Some(Arc::new(file_scan_config) as _),
658 })
659 }
660 None => {
661 Ok(FilterPushdownPropagation {
663 filters: result.filters,
664 updated_node: None,
665 })
666 }
667 }
668 }
669}
670
671impl FileScanConfig {
672 #[allow(deprecated)] pub fn new(
684 object_store_url: ObjectStoreUrl,
685 file_schema: SchemaRef,
686 file_source: Arc<dyn FileSource>,
687 ) -> Self {
688 let statistics = Statistics::new_unknown(&file_schema);
689 let file_source = file_source
690 .with_statistics(statistics.clone())
691 .with_schema(Arc::clone(&file_schema));
692 Self {
693 object_store_url,
694 file_schema,
695 file_groups: vec![],
696 constraints: Constraints::default(),
697 projection: None,
698 limit: None,
699 table_partition_cols: vec![],
700 output_ordering: vec![],
701 file_compression_type: FileCompressionType::UNCOMPRESSED,
702 new_lines_in_values: false,
703 file_source: Arc::clone(&file_source),
704 batch_size: None,
705 expr_adapter_factory: None,
706 }
707 }
708
709 #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
711 pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
712 self.file_source =
713 file_source.with_statistics(Statistics::new_unknown(&self.file_schema));
714 self
715 }
716
717 #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
719 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
720 self.constraints = constraints;
721 self
722 }
723
724 #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
726 pub fn with_statistics(mut self, statistics: Statistics) -> Self {
727 self.file_source = self.file_source.with_statistics(statistics);
728 self
729 }
730
731 fn projection_indices(&self) -> Vec<usize> {
732 match &self.projection {
733 Some(proj) => proj.clone(),
734 None => (0..self.file_schema.fields().len()
735 + self.table_partition_cols.len())
736 .collect(),
737 }
738 }
739
740 pub fn projected_stats(&self) -> Statistics {
741 let statistics = self.file_source.statistics().unwrap();
742
743 let table_cols_stats = self
744 .projection_indices()
745 .into_iter()
746 .map(|idx| {
747 if idx < self.file_schema.fields().len() {
748 statistics.column_statistics[idx].clone()
749 } else {
750 ColumnStatistics::new_unknown()
752 }
753 })
754 .collect();
755
756 Statistics {
757 num_rows: statistics.num_rows,
758 total_byte_size: statistics.total_byte_size,
760 column_statistics: table_cols_stats,
761 }
762 }
763
764 pub fn projected_schema(&self) -> Arc<Schema> {
765 let table_fields: Vec<_> = self
766 .projection_indices()
767 .into_iter()
768 .map(|idx| {
769 if idx < self.file_schema.fields().len() {
770 self.file_schema.field(idx).clone()
771 } else {
772 let partition_idx = idx - self.file_schema.fields().len();
773 Arc::unwrap_or_clone(Arc::clone(
774 &self.table_partition_cols[partition_idx],
775 ))
776 }
777 })
778 .collect();
779
780 Arc::new(Schema::new_with_metadata(
781 table_fields,
782 self.file_schema.metadata().clone(),
783 ))
784 }
785
786 pub fn projected_constraints(&self) -> Constraints {
787 let indexes = self.projection_indices();
788 self.constraints.project(&indexes).unwrap_or_default()
789 }
790
791 #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
793 pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
794 self.projection = projection;
795 self
796 }
797
798 #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
800 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
801 self.limit = limit;
802 self
803 }
804
805 #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
809 #[allow(deprecated)]
810 pub fn with_file(self, file: PartitionedFile) -> Self {
811 self.with_file_group(FileGroup::new(vec![file]))
812 }
813
814 #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
818 pub fn with_file_groups(mut self, mut file_groups: Vec<FileGroup>) -> Self {
819 self.file_groups.append(&mut file_groups);
820 self
821 }
822
823 #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
827 pub fn with_file_group(mut self, file_group: FileGroup) -> Self {
828 self.file_groups.push(file_group);
829 self
830 }
831
832 #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
834 pub fn with_table_partition_cols(mut self, table_partition_cols: Vec<Field>) -> Self {
835 self.table_partition_cols = table_partition_cols
836 .into_iter()
837 .map(|f| Arc::new(f) as FieldRef)
838 .collect();
839 self
840 }
841
842 #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
844 pub fn with_output_ordering(mut self, output_ordering: Vec<LexOrdering>) -> Self {
845 self.output_ordering = output_ordering;
846 self
847 }
848
849 #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
851 pub fn with_file_compression_type(
852 mut self,
853 file_compression_type: FileCompressionType,
854 ) -> Self {
855 self.file_compression_type = file_compression_type;
856 self
857 }
858
859 #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
861 pub fn with_newlines_in_values(mut self, new_lines_in_values: bool) -> Self {
862 self.new_lines_in_values = new_lines_in_values;
863 self
864 }
865
866 #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
868 pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
869 self.batch_size = batch_size;
870 self
871 }
872
873 pub fn newlines_in_values(&self) -> bool {
881 self.new_lines_in_values
882 }
883
884 pub fn project(&self) -> (SchemaRef, Constraints, Statistics, Vec<LexOrdering>) {
886 if self.projection.is_none() && self.table_partition_cols.is_empty() {
887 return (
888 Arc::clone(&self.file_schema),
889 self.constraints.clone(),
890 self.file_source.statistics().unwrap().clone(),
891 self.output_ordering.clone(),
892 );
893 }
894
895 let schema = self.projected_schema();
896 let constraints = self.projected_constraints();
897 let stats = self.projected_stats();
898
899 let output_ordering = get_projected_output_ordering(self, &schema);
900
901 (schema, constraints, stats, output_ordering)
902 }
903
904 pub fn projected_file_column_names(&self) -> Option<Vec<String>> {
905 self.projection.as_ref().map(|p| {
906 p.iter()
907 .filter(|col_idx| **col_idx < self.file_schema.fields().len())
908 .map(|col_idx| self.file_schema.field(*col_idx).name())
909 .cloned()
910 .collect()
911 })
912 }
913
914 pub fn projected_file_schema(&self) -> SchemaRef {
916 let fields = self.file_column_projection_indices().map(|indices| {
917 indices
918 .iter()
919 .map(|col_idx| self.file_schema.field(*col_idx))
920 .cloned()
921 .collect::<Vec<_>>()
922 });
923
924 fields.map_or_else(
925 || Arc::clone(&self.file_schema),
926 |f| {
927 Arc::new(Schema::new_with_metadata(
928 f,
929 self.file_schema.metadata.clone(),
930 ))
931 },
932 )
933 }
934
935 pub fn file_column_projection_indices(&self) -> Option<Vec<usize>> {
936 self.projection.as_ref().map(|p| {
937 p.iter()
938 .filter(|col_idx| **col_idx < self.file_schema.fields().len())
939 .copied()
940 .collect()
941 })
942 }
943
944 pub fn split_groups_by_statistics_with_target_partitions(
966 table_schema: &SchemaRef,
967 file_groups: &[FileGroup],
968 sort_order: &LexOrdering,
969 target_partitions: usize,
970 ) -> Result<Vec<FileGroup>> {
971 if target_partitions == 0 {
972 return Err(DataFusionError::Internal(
973 "target_partitions must be greater than 0".to_string(),
974 ));
975 }
976
977 let flattened_files = file_groups
978 .iter()
979 .flat_map(FileGroup::iter)
980 .collect::<Vec<_>>();
981
982 if flattened_files.is_empty() {
983 return Ok(vec![]);
984 }
985
986 let statistics = MinMaxStatistics::new_from_files(
987 sort_order,
988 table_schema,
989 None,
990 flattened_files.iter().copied(),
991 )?;
992
993 let indices_sorted_by_min = statistics.min_values_sorted();
994
995 let mut file_groups_indices: Vec<Vec<usize>> = vec![vec![]; target_partitions];
997
998 for (idx, min) in indices_sorted_by_min {
999 if let Some((_, group)) = file_groups_indices
1000 .iter_mut()
1001 .enumerate()
1002 .filter(|(_, group)| {
1003 group.is_empty()
1004 || min
1005 > statistics
1006 .max(*group.last().expect("groups should not be empty"))
1007 })
1008 .min_by_key(|(_, group)| group.len())
1009 {
1010 group.push(idx);
1011 } else {
1012 file_groups_indices.push(vec![idx]);
1014 }
1015 }
1016
1017 file_groups_indices.retain(|group| !group.is_empty());
1019
1020 Ok(file_groups_indices
1022 .into_iter()
1023 .map(|file_group_indices| {
1024 FileGroup::new(
1025 file_group_indices
1026 .into_iter()
1027 .map(|idx| flattened_files[idx].clone())
1028 .collect(),
1029 )
1030 })
1031 .collect())
1032 }
1033
1034 pub fn split_groups_by_statistics(
1038 table_schema: &SchemaRef,
1039 file_groups: &[FileGroup],
1040 sort_order: &LexOrdering,
1041 ) -> Result<Vec<FileGroup>> {
1042 let flattened_files = file_groups
1043 .iter()
1044 .flat_map(FileGroup::iter)
1045 .collect::<Vec<_>>();
1046 if flattened_files.is_empty() {
1058 return Ok(vec![]);
1059 }
1060
1061 let statistics = MinMaxStatistics::new_from_files(
1062 sort_order,
1063 table_schema,
1064 None,
1065 flattened_files.iter().copied(),
1066 )
1067 .map_err(|e| {
1068 e.context("construct min/max statistics for split_groups_by_statistics")
1069 })?;
1070
1071 let indices_sorted_by_min = statistics.min_values_sorted();
1072 let mut file_groups_indices: Vec<Vec<usize>> = vec![];
1073
1074 for (idx, min) in indices_sorted_by_min {
1075 let file_group_to_insert = file_groups_indices.iter_mut().find(|group| {
1076 min > statistics.max(
1079 *group
1080 .last()
1081 .expect("groups should be nonempty at construction"),
1082 )
1083 });
1084 match file_group_to_insert {
1085 Some(group) => group.push(idx),
1086 None => file_groups_indices.push(vec![idx]),
1087 }
1088 }
1089
1090 Ok(file_groups_indices
1092 .into_iter()
1093 .map(|file_group_indices| {
1094 file_group_indices
1095 .into_iter()
1096 .map(|idx| flattened_files[idx].clone())
1097 .collect()
1098 })
1099 .collect())
1100 }
1101
1102 #[deprecated(since = "47.0.0", note = "use DataSourceExec::new instead")]
1104 pub fn build(self) -> Arc<DataSourceExec> {
1105 DataSourceExec::from_data_source(self)
1106 }
1107
1108 fn fmt_file_source(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
1110 write!(f, ", file_type={}", self.file_source.file_type())?;
1111 self.file_source.fmt_extra(t, f)
1112 }
1113
1114 pub fn file_source(&self) -> &Arc<dyn FileSource> {
1116 &self.file_source
1117 }
1118}
1119
1120impl Debug for FileScanConfig {
1121 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1122 write!(f, "FileScanConfig {{")?;
1123 write!(f, "object_store_url={:?}, ", self.object_store_url)?;
1124
1125 write!(
1126 f,
1127 "statistics={:?}, ",
1128 self.file_source.statistics().unwrap()
1129 )?;
1130
1131 DisplayAs::fmt_as(self, DisplayFormatType::Verbose, f)?;
1132 write!(f, "}}")
1133 }
1134}
1135
1136impl DisplayAs for FileScanConfig {
1137 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
1138 let schema = self.projected_schema();
1139 let orderings = get_projected_output_ordering(self, &schema);
1140
1141 write!(f, "file_groups=")?;
1142 FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
1143
1144 if !schema.fields().is_empty() {
1145 write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
1146 }
1147
1148 if let Some(limit) = self.limit {
1149 write!(f, ", limit={limit}")?;
1150 }
1151
1152 display_orderings(f, &orderings)?;
1153
1154 if !self.constraints.is_empty() {
1155 write!(f, ", {}", self.constraints)?;
1156 }
1157
1158 Ok(())
1159 }
1160}
1161
1162pub struct PartitionColumnProjector {
1169 key_buffer_cache: ZeroBufferGenerators,
1173 projected_partition_indexes: Vec<(usize, usize)>,
1177 projected_schema: SchemaRef,
1179}
1180
1181impl PartitionColumnProjector {
1182 pub fn new(projected_schema: SchemaRef, table_partition_cols: &[String]) -> Self {
1186 let mut idx_map = HashMap::new();
1187 for (partition_idx, partition_name) in table_partition_cols.iter().enumerate() {
1188 if let Ok(schema_idx) = projected_schema.index_of(partition_name) {
1189 idx_map.insert(partition_idx, schema_idx);
1190 }
1191 }
1192
1193 let mut projected_partition_indexes: Vec<_> = idx_map.into_iter().collect();
1194 projected_partition_indexes.sort_by(|(_, a), (_, b)| a.cmp(b));
1195
1196 Self {
1197 projected_partition_indexes,
1198 key_buffer_cache: Default::default(),
1199 projected_schema,
1200 }
1201 }
1202
1203 pub fn project(
1208 &mut self,
1209 file_batch: RecordBatch,
1210 partition_values: &[ScalarValue],
1211 ) -> Result<RecordBatch> {
1212 let expected_cols =
1213 self.projected_schema.fields().len() - self.projected_partition_indexes.len();
1214
1215 if file_batch.columns().len() != expected_cols {
1216 return exec_err!(
1217 "Unexpected batch schema from file, expected {} cols but got {}",
1218 expected_cols,
1219 file_batch.columns().len()
1220 );
1221 }
1222
1223 let mut cols = file_batch.columns().to_vec();
1224 for &(pidx, sidx) in &self.projected_partition_indexes {
1225 let p_value =
1226 partition_values
1227 .get(pidx)
1228 .ok_or(DataFusionError::Execution(
1229 "Invalid partitioning found on disk".to_string(),
1230 ))?;
1231
1232 let mut partition_value = Cow::Borrowed(p_value);
1233
1234 let field = self.projected_schema.field(sidx);
1236 let expected_data_type = field.data_type();
1237 let actual_data_type = partition_value.data_type();
1238 if let DataType::Dictionary(key_type, _) = expected_data_type {
1239 if !matches!(actual_data_type, DataType::Dictionary(_, _)) {
1240 warn!("Partition value for column {} was not dictionary-encoded, applied auto-fix.", field.name());
1241 partition_value = Cow::Owned(ScalarValue::Dictionary(
1242 key_type.clone(),
1243 Box::new(partition_value.as_ref().clone()),
1244 ));
1245 }
1246 }
1247
1248 cols.insert(
1249 sidx,
1250 create_output_array(
1251 &mut self.key_buffer_cache,
1252 partition_value.as_ref(),
1253 file_batch.num_rows(),
1254 )?,
1255 )
1256 }
1257
1258 RecordBatch::try_new_with_options(
1259 Arc::clone(&self.projected_schema),
1260 cols,
1261 &RecordBatchOptions::new().with_row_count(Some(file_batch.num_rows())),
1262 )
1263 .map_err(Into::into)
1264 }
1265}
1266
1267#[derive(Debug, Default)]
1268struct ZeroBufferGenerators {
1269 gen_i8: ZeroBufferGenerator<i8>,
1270 gen_i16: ZeroBufferGenerator<i16>,
1271 gen_i32: ZeroBufferGenerator<i32>,
1272 gen_i64: ZeroBufferGenerator<i64>,
1273 gen_u8: ZeroBufferGenerator<u8>,
1274 gen_u16: ZeroBufferGenerator<u16>,
1275 gen_u32: ZeroBufferGenerator<u32>,
1276 gen_u64: ZeroBufferGenerator<u64>,
1277}
1278
1279#[derive(Debug, Default)]
1281struct ZeroBufferGenerator<T>
1282where
1283 T: ArrowNativeType,
1284{
1285 cache: Option<Buffer>,
1286 _t: PhantomData<T>,
1287}
1288
1289impl<T> ZeroBufferGenerator<T>
1290where
1291 T: ArrowNativeType,
1292{
1293 const SIZE: usize = size_of::<T>();
1294
1295 fn get_buffer(&mut self, n_vals: usize) -> Buffer {
1296 match &mut self.cache {
1297 Some(buf) if buf.len() >= n_vals * Self::SIZE => {
1298 buf.slice_with_length(0, n_vals * Self::SIZE)
1299 }
1300 _ => {
1301 let mut key_buffer_builder = BufferBuilder::<T>::new(n_vals);
1302 key_buffer_builder.advance(n_vals); self.cache.insert(key_buffer_builder.finish()).clone()
1304 }
1305 }
1306 }
1307}
1308
1309fn create_dict_array<T>(
1310 buffer_gen: &mut ZeroBufferGenerator<T>,
1311 dict_val: &ScalarValue,
1312 len: usize,
1313 data_type: DataType,
1314) -> Result<ArrayRef>
1315where
1316 T: ArrowNativeType,
1317{
1318 let dict_vals = dict_val.to_array()?;
1319
1320 let sliced_key_buffer = buffer_gen.get_buffer(len);
1321
1322 let mut builder = ArrayData::builder(data_type)
1324 .len(len)
1325 .add_buffer(sliced_key_buffer);
1326 builder = builder.add_child_data(dict_vals.to_data());
1327 Ok(Arc::new(DictionaryArray::<UInt16Type>::from(
1328 builder.build().unwrap(),
1329 )))
1330}
1331
1332fn create_output_array(
1333 key_buffer_cache: &mut ZeroBufferGenerators,
1334 val: &ScalarValue,
1335 len: usize,
1336) -> Result<ArrayRef> {
1337 if let ScalarValue::Dictionary(key_type, dict_val) = &val {
1338 match key_type.as_ref() {
1339 DataType::Int8 => {
1340 return create_dict_array(
1341 &mut key_buffer_cache.gen_i8,
1342 dict_val,
1343 len,
1344 val.data_type(),
1345 );
1346 }
1347 DataType::Int16 => {
1348 return create_dict_array(
1349 &mut key_buffer_cache.gen_i16,
1350 dict_val,
1351 len,
1352 val.data_type(),
1353 );
1354 }
1355 DataType::Int32 => {
1356 return create_dict_array(
1357 &mut key_buffer_cache.gen_i32,
1358 dict_val,
1359 len,
1360 val.data_type(),
1361 );
1362 }
1363 DataType::Int64 => {
1364 return create_dict_array(
1365 &mut key_buffer_cache.gen_i64,
1366 dict_val,
1367 len,
1368 val.data_type(),
1369 );
1370 }
1371 DataType::UInt8 => {
1372 return create_dict_array(
1373 &mut key_buffer_cache.gen_u8,
1374 dict_val,
1375 len,
1376 val.data_type(),
1377 );
1378 }
1379 DataType::UInt16 => {
1380 return create_dict_array(
1381 &mut key_buffer_cache.gen_u16,
1382 dict_val,
1383 len,
1384 val.data_type(),
1385 );
1386 }
1387 DataType::UInt32 => {
1388 return create_dict_array(
1389 &mut key_buffer_cache.gen_u32,
1390 dict_val,
1391 len,
1392 val.data_type(),
1393 );
1394 }
1395 DataType::UInt64 => {
1396 return create_dict_array(
1397 &mut key_buffer_cache.gen_u64,
1398 dict_val,
1399 len,
1400 val.data_type(),
1401 );
1402 }
1403 _ => {}
1404 }
1405 }
1406
1407 val.to_array_of_size(len)
1408}
1409
1410fn get_projected_output_ordering(
1470 base_config: &FileScanConfig,
1471 projected_schema: &SchemaRef,
1472) -> Vec<LexOrdering> {
1473 let mut all_orderings = vec![];
1474 for output_ordering in &base_config.output_ordering {
1475 let mut new_ordering = vec![];
1476 for PhysicalSortExpr { expr, options } in output_ordering.iter() {
1477 if let Some(col) = expr.as_any().downcast_ref::<Column>() {
1478 let name = col.name();
1479 if let Some((idx, _)) = projected_schema.column_with_name(name) {
1480 new_ordering.push(PhysicalSortExpr::new(
1482 Arc::new(Column::new(name, idx)),
1483 *options,
1484 ));
1485 continue;
1486 }
1487 }
1488 break;
1491 }
1492
1493 let Some(new_ordering) = LexOrdering::new(new_ordering) else {
1494 continue;
1495 };
1496
1497 if base_config.file_groups.iter().any(|group| {
1499 if group.len() <= 1 {
1500 return false;
1502 }
1503
1504 let statistics = match MinMaxStatistics::new_from_files(
1505 &new_ordering,
1506 projected_schema,
1507 base_config.projection.as_deref(),
1508 group.iter(),
1509 ) {
1510 Ok(statistics) => statistics,
1511 Err(e) => {
1512 log::trace!("Error fetching statistics for file group: {e}");
1513 return true;
1515 }
1516 };
1517
1518 !statistics.is_sorted()
1519 }) {
1520 debug!(
1521 "Skipping specified output ordering {:?}. \
1522 Some file groups couldn't be determined to be sorted: {:?}",
1523 base_config.output_ordering[0], base_config.file_groups
1524 );
1525 continue;
1526 }
1527
1528 all_orderings.push(new_ordering);
1529 }
1530 all_orderings
1531}
1532
1533pub fn wrap_partition_type_in_dict(val_type: DataType) -> DataType {
1544 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type))
1545}
1546
1547pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {
1551 ScalarValue::Dictionary(Box::new(DataType::UInt16), Box::new(val))
1552}
1553
1554#[cfg(test)]
1555mod tests {
1556 use super::*;
1557 use crate::{
1558 generate_test_files, test_util::MockSource, tests::aggr_test_schema,
1559 verify_sort_integrity,
1560 };
1561
1562 use arrow::array::{Int32Array, RecordBatch};
1563 use datafusion_common::stats::Precision;
1564 use datafusion_common::{assert_batches_eq, internal_err};
1565 use datafusion_expr::SortExpr;
1566 use datafusion_physical_expr::create_physical_sort_expr;
1567
1568 pub fn columns(schema: &Schema) -> Vec<String> {
1570 schema.fields().iter().map(|f| f.name().clone()).collect()
1571 }
1572
1573 #[test]
1574 fn physical_plan_config_no_projection() {
1575 let file_schema = aggr_test_schema();
1576 let conf = config_for_projection(
1577 Arc::clone(&file_schema),
1578 None,
1579 Statistics::new_unknown(&file_schema),
1580 to_partition_cols(vec![(
1581 "date".to_owned(),
1582 wrap_partition_type_in_dict(DataType::Utf8),
1583 )]),
1584 );
1585
1586 let (proj_schema, _, proj_statistics, _) = conf.project();
1587 assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
1588 assert_eq!(
1589 proj_schema.field(file_schema.fields().len()).name(),
1590 "date",
1591 "partition columns are the last columns"
1592 );
1593 assert_eq!(
1594 proj_statistics.column_statistics.len(),
1595 file_schema.fields().len() + 1
1596 );
1597 let col_names = conf.projected_file_column_names();
1600 assert_eq!(col_names, None);
1601
1602 let col_indices = conf.file_column_projection_indices();
1603 assert_eq!(col_indices, None);
1604 }
1605
1606 #[test]
1607 fn physical_plan_config_no_projection_tab_cols_as_field() {
1608 let file_schema = aggr_test_schema();
1609
1610 let table_partition_col =
1612 Field::new("date", wrap_partition_type_in_dict(DataType::Utf8), true)
1613 .with_metadata(HashMap::from_iter(vec![(
1614 "key_whatever".to_owned(),
1615 "value_whatever".to_owned(),
1616 )]));
1617
1618 let conf = config_for_projection(
1619 Arc::clone(&file_schema),
1620 None,
1621 Statistics::new_unknown(&file_schema),
1622 vec![table_partition_col.clone()],
1623 );
1624
1625 let proj_schema = conf.projected_schema();
1627 assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
1628 assert_eq!(
1629 *proj_schema.field(file_schema.fields().len()),
1630 table_partition_col,
1631 "partition columns are the last columns and ust have all values defined in created field"
1632 );
1633 }
1634
1635 #[test]
1636 fn physical_plan_config_with_projection() {
1637 let file_schema = aggr_test_schema();
1638 let conf = config_for_projection(
1639 Arc::clone(&file_schema),
1640 Some(vec![file_schema.fields().len(), 0]),
1641 Statistics {
1642 num_rows: Precision::Inexact(10),
1643 column_statistics: (0..file_schema.fields().len())
1646 .map(|i| ColumnStatistics {
1647 distinct_count: Precision::Inexact(i),
1648 ..Default::default()
1649 })
1650 .collect(),
1651 total_byte_size: Precision::Absent,
1652 },
1653 to_partition_cols(vec![(
1654 "date".to_owned(),
1655 wrap_partition_type_in_dict(DataType::Utf8),
1656 )]),
1657 );
1658
1659 let (proj_schema, _, proj_statistics, _) = conf.project();
1660 assert_eq!(
1661 columns(&proj_schema),
1662 vec!["date".to_owned(), "c1".to_owned()]
1663 );
1664 let proj_stat_cols = proj_statistics.column_statistics;
1665 assert_eq!(proj_stat_cols.len(), 2);
1666 assert_eq!(proj_stat_cols[1].distinct_count, Precision::Inexact(0));
1669
1670 let col_names = conf.projected_file_column_names();
1671 assert_eq!(col_names, Some(vec!["c1".to_owned()]));
1672
1673 let col_indices = conf.file_column_projection_indices();
1674 assert_eq!(col_indices, Some(vec![0]));
1675 }
1676
1677 #[test]
1678 fn partition_column_projector() {
1679 let file_batch = build_table_i32(
1680 ("a", &vec![0, 1, 2]),
1681 ("b", &vec![-2, -1, 0]),
1682 ("c", &vec![10, 11, 12]),
1683 );
1684 let partition_cols = vec![
1685 (
1686 "year".to_owned(),
1687 wrap_partition_type_in_dict(DataType::Utf8),
1688 ),
1689 (
1690 "month".to_owned(),
1691 wrap_partition_type_in_dict(DataType::Utf8),
1692 ),
1693 (
1694 "day".to_owned(),
1695 wrap_partition_type_in_dict(DataType::Utf8),
1696 ),
1697 ];
1698 let statistics = Statistics {
1700 num_rows: Precision::Inexact(3),
1701 total_byte_size: Precision::Absent,
1702 column_statistics: Statistics::unknown_column(&file_batch.schema()),
1703 };
1704
1705 let conf = config_for_projection(
1706 file_batch.schema(),
1707 Some(vec![
1709 0,
1710 1,
1711 2,
1712 file_batch.schema().fields().len(),
1713 file_batch.schema().fields().len() + 2,
1714 ]),
1715 statistics.clone(),
1716 to_partition_cols(partition_cols.clone()),
1717 );
1718
1719 let source_statistics = conf.file_source.statistics().unwrap();
1720 let conf_stats = conf.statistics().unwrap();
1721
1722 assert_eq!(conf_stats.num_rows, Precision::Inexact(3));
1724
1725 assert_eq!(conf_stats.column_statistics.len(), 5);
1727
1728 assert_eq!(source_statistics, statistics);
1730 assert_eq!(source_statistics.column_statistics.len(), 3);
1731
1732 let proj_schema = conf.projected_schema();
1733 let mut proj = PartitionColumnProjector::new(
1735 proj_schema,
1736 &partition_cols
1737 .iter()
1738 .map(|x| x.0.clone())
1739 .collect::<Vec<_>>(),
1740 );
1741
1742 let projected_batch = proj
1744 .project(
1745 file_batch,
1747 &[
1748 wrap_partition_value_in_dict(ScalarValue::from("2021")),
1749 wrap_partition_value_in_dict(ScalarValue::from("10")),
1750 wrap_partition_value_in_dict(ScalarValue::from("26")),
1751 ],
1752 )
1753 .expect("Projection of partition columns into record batch failed");
1754 let expected = [
1755 "+---+----+----+------+-----+",
1756 "| a | b | c | year | day |",
1757 "+---+----+----+------+-----+",
1758 "| 0 | -2 | 10 | 2021 | 26 |",
1759 "| 1 | -1 | 11 | 2021 | 26 |",
1760 "| 2 | 0 | 12 | 2021 | 26 |",
1761 "+---+----+----+------+-----+",
1762 ];
1763 assert_batches_eq!(expected, &[projected_batch]);
1764
1765 let file_batch = build_table_i32(
1767 ("a", &vec![5, 6, 7, 8, 9]),
1768 ("b", &vec![-10, -9, -8, -7, -6]),
1769 ("c", &vec![12, 13, 14, 15, 16]),
1770 );
1771 let projected_batch = proj
1772 .project(
1773 file_batch,
1775 &[
1776 wrap_partition_value_in_dict(ScalarValue::from("2021")),
1777 wrap_partition_value_in_dict(ScalarValue::from("10")),
1778 wrap_partition_value_in_dict(ScalarValue::from("27")),
1779 ],
1780 )
1781 .expect("Projection of partition columns into record batch failed");
1782 let expected = [
1783 "+---+-----+----+------+-----+",
1784 "| a | b | c | year | day |",
1785 "+---+-----+----+------+-----+",
1786 "| 5 | -10 | 12 | 2021 | 27 |",
1787 "| 6 | -9 | 13 | 2021 | 27 |",
1788 "| 7 | -8 | 14 | 2021 | 27 |",
1789 "| 8 | -7 | 15 | 2021 | 27 |",
1790 "| 9 | -6 | 16 | 2021 | 27 |",
1791 "+---+-----+----+------+-----+",
1792 ];
1793 assert_batches_eq!(expected, &[projected_batch]);
1794
1795 let file_batch = build_table_i32(
1797 ("a", &vec![0, 1, 3]),
1798 ("b", &vec![2, 3, 4]),
1799 ("c", &vec![4, 5, 6]),
1800 );
1801 let projected_batch = proj
1802 .project(
1803 file_batch,
1805 &[
1806 wrap_partition_value_in_dict(ScalarValue::from("2021")),
1807 wrap_partition_value_in_dict(ScalarValue::from("10")),
1808 wrap_partition_value_in_dict(ScalarValue::from("28")),
1809 ],
1810 )
1811 .expect("Projection of partition columns into record batch failed");
1812 let expected = [
1813 "+---+---+---+------+-----+",
1814 "| a | b | c | year | day |",
1815 "+---+---+---+------+-----+",
1816 "| 0 | 2 | 4 | 2021 | 28 |",
1817 "| 1 | 3 | 5 | 2021 | 28 |",
1818 "| 3 | 4 | 6 | 2021 | 28 |",
1819 "+---+---+---+------+-----+",
1820 ];
1821 assert_batches_eq!(expected, &[projected_batch]);
1822
1823 let file_batch = build_table_i32(
1825 ("a", &vec![0, 1, 2]),
1826 ("b", &vec![-2, -1, 0]),
1827 ("c", &vec![10, 11, 12]),
1828 );
1829 let projected_batch = proj
1830 .project(
1831 file_batch,
1833 &[
1834 ScalarValue::from("2021"),
1835 ScalarValue::from("10"),
1836 ScalarValue::from("26"),
1837 ],
1838 )
1839 .expect("Projection of partition columns into record batch failed");
1840 let expected = [
1841 "+---+----+----+------+-----+",
1842 "| a | b | c | year | day |",
1843 "+---+----+----+------+-----+",
1844 "| 0 | -2 | 10 | 2021 | 26 |",
1845 "| 1 | -1 | 11 | 2021 | 26 |",
1846 "| 2 | 0 | 12 | 2021 | 26 |",
1847 "+---+----+----+------+-----+",
1848 ];
1849 assert_batches_eq!(expected, &[projected_batch]);
1850 }
1851
1852 #[test]
1853 fn test_projected_file_schema_with_partition_col() {
1854 let schema = aggr_test_schema();
1855 let partition_cols = vec![
1856 (
1857 "part1".to_owned(),
1858 wrap_partition_type_in_dict(DataType::Utf8),
1859 ),
1860 (
1861 "part2".to_owned(),
1862 wrap_partition_type_in_dict(DataType::Utf8),
1863 ),
1864 ];
1865
1866 let projection = config_for_projection(
1868 schema.clone(),
1869 Some(vec![0, 3, 5, schema.fields().len()]),
1870 Statistics::new_unknown(&schema),
1871 to_partition_cols(partition_cols),
1872 )
1873 .projected_file_schema();
1874
1875 let expected_columns = vec!["c1", "c4", "c6"];
1877 let actual_columns = projection
1878 .fields()
1879 .iter()
1880 .map(|f| f.name().clone())
1881 .collect::<Vec<_>>();
1882 assert_eq!(expected_columns, actual_columns);
1883 }
1884
1885 #[test]
1886 fn test_projected_file_schema_without_projection() {
1887 let schema = aggr_test_schema();
1888 let partition_cols = vec![
1889 (
1890 "part1".to_owned(),
1891 wrap_partition_type_in_dict(DataType::Utf8),
1892 ),
1893 (
1894 "part2".to_owned(),
1895 wrap_partition_type_in_dict(DataType::Utf8),
1896 ),
1897 ];
1898
1899 let projection = config_for_projection(
1901 schema.clone(),
1902 None,
1903 Statistics::new_unknown(&schema),
1904 to_partition_cols(partition_cols),
1905 )
1906 .projected_file_schema();
1907
1908 assert_eq!(projection.fields(), schema.fields());
1910 }
1911
1912 #[test]
1913 fn test_split_groups_by_statistics() -> Result<()> {
1914 use chrono::TimeZone;
1915 use datafusion_common::DFSchema;
1916 use datafusion_expr::execution_props::ExecutionProps;
1917 use object_store::{path::Path, ObjectMeta};
1918
1919 struct File {
1920 name: &'static str,
1921 date: &'static str,
1922 statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
1923 }
1924 impl File {
1925 fn new(
1926 name: &'static str,
1927 date: &'static str,
1928 statistics: Vec<Option<(f64, f64)>>,
1929 ) -> Self {
1930 Self::new_nullable(
1931 name,
1932 date,
1933 statistics
1934 .into_iter()
1935 .map(|opt| opt.map(|(min, max)| (Some(min), Some(max))))
1936 .collect(),
1937 )
1938 }
1939
1940 fn new_nullable(
1941 name: &'static str,
1942 date: &'static str,
1943 statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
1944 ) -> Self {
1945 Self {
1946 name,
1947 date,
1948 statistics,
1949 }
1950 }
1951 }
1952
1953 struct TestCase {
1954 name: &'static str,
1955 file_schema: Schema,
1956 files: Vec<File>,
1957 sort: Vec<SortExpr>,
1958 expected_result: Result<Vec<Vec<&'static str>>, &'static str>,
1959 }
1960
1961 use datafusion_expr::col;
1962 let cases = vec![
1963 TestCase {
1964 name: "test sort",
1965 file_schema: Schema::new(vec![Field::new(
1966 "value".to_string(),
1967 DataType::Float64,
1968 false,
1969 )]),
1970 files: vec![
1971 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1972 File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1973 File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1974 ],
1975 sort: vec![col("value").sort(true, false)],
1976 expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1977 },
1978 TestCase {
1981 name: "test sort with files ordered differently",
1982 file_schema: Schema::new(vec![Field::new(
1983 "value".to_string(),
1984 DataType::Float64,
1985 false,
1986 )]),
1987 files: vec![
1988 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1989 File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1990 File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1991 ],
1992 sort: vec![col("value").sort(true, false)],
1993 expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1994 },
1995 TestCase {
1996 name: "reverse sort",
1997 file_schema: Schema::new(vec![Field::new(
1998 "value".to_string(),
1999 DataType::Float64,
2000 false,
2001 )]),
2002 files: vec![
2003 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
2004 File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
2005 File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
2006 ],
2007 sort: vec![col("value").sort(false, true)],
2008 expected_result: Ok(vec![vec!["1", "0"], vec!["2"]]),
2009 },
2010 TestCase {
2011 name: "nullable sort columns, nulls last",
2012 file_schema: Schema::new(vec![Field::new(
2013 "value".to_string(),
2014 DataType::Float64,
2015 true,
2016 )]),
2017 files: vec![
2018 File::new_nullable("0", "2023-01-01", vec![Some((Some(0.00), Some(0.49)))]),
2019 File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), None))]),
2020 File::new_nullable("2", "2023-01-02", vec![Some((Some(0.00), None))]),
2021 ],
2022 sort: vec![col("value").sort(true, false)],
2023 expected_result: Ok(vec![vec!["0", "1"], vec!["2"]])
2024 },
2025 TestCase {
2026 name: "nullable sort columns, nulls first",
2027 file_schema: Schema::new(vec![Field::new(
2028 "value".to_string(),
2029 DataType::Float64,
2030 true,
2031 )]),
2032 files: vec![
2033 File::new_nullable("0", "2023-01-01", vec![Some((None, Some(0.49)))]),
2034 File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), Some(1.00)))]),
2035 File::new_nullable("2", "2023-01-02", vec![Some((None, Some(1.00)))]),
2036 ],
2037 sort: vec![col("value").sort(true, true)],
2038 expected_result: Ok(vec![vec!["0", "1"], vec!["2"]])
2039 },
2040 TestCase {
2041 name: "all three non-overlapping",
2042 file_schema: Schema::new(vec![Field::new(
2043 "value".to_string(),
2044 DataType::Float64,
2045 false,
2046 )]),
2047 files: vec![
2048 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
2049 File::new("1", "2023-01-01", vec![Some((0.50, 0.99))]),
2050 File::new("2", "2023-01-02", vec![Some((1.00, 1.49))]),
2051 ],
2052 sort: vec![col("value").sort(true, false)],
2053 expected_result: Ok(vec![vec!["0", "1", "2"]]),
2054 },
2055 TestCase {
2056 name: "all three overlapping",
2057 file_schema: Schema::new(vec![Field::new(
2058 "value".to_string(),
2059 DataType::Float64,
2060 false,
2061 )]),
2062 files: vec![
2063 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
2064 File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
2065 File::new("2", "2023-01-02", vec![Some((0.00, 0.49))]),
2066 ],
2067 sort: vec![col("value").sort(true, false)],
2068 expected_result: Ok(vec![vec!["0"], vec!["1"], vec!["2"]]),
2069 },
2070 TestCase {
2071 name: "empty input",
2072 file_schema: Schema::new(vec![Field::new(
2073 "value".to_string(),
2074 DataType::Float64,
2075 false,
2076 )]),
2077 files: vec![],
2078 sort: vec![col("value").sort(true, false)],
2079 expected_result: Ok(vec![]),
2080 },
2081 TestCase {
2082 name: "one file missing statistics",
2083 file_schema: Schema::new(vec![Field::new(
2084 "value".to_string(),
2085 DataType::Float64,
2086 false,
2087 )]),
2088 files: vec![
2089 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
2090 File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
2091 File::new("2", "2023-01-02", vec![None]),
2092 ],
2093 sort: vec![col("value").sort(true, false)],
2094 expected_result: Err("construct min/max statistics for split_groups_by_statistics\ncaused by\ncollect min/max values\ncaused by\nget min/max for column: 'value'\ncaused by\nError during planning: statistics not found"),
2095 },
2096 ];
2097
2098 for case in cases {
2099 let table_schema = Arc::new(Schema::new(
2100 case.file_schema
2101 .fields()
2102 .clone()
2103 .into_iter()
2104 .cloned()
2105 .chain(Some(Arc::new(Field::new(
2106 "date".to_string(),
2107 DataType::Utf8,
2108 false,
2109 ))))
2110 .collect::<Vec<_>>(),
2111 ));
2112 let Some(sort_order) = LexOrdering::new(
2113 case.sort
2114 .into_iter()
2115 .map(|expr| {
2116 create_physical_sort_expr(
2117 &expr,
2118 &DFSchema::try_from(table_schema.as_ref().clone())?,
2119 &ExecutionProps::default(),
2120 )
2121 })
2122 .collect::<Result<Vec<_>>>()?,
2123 ) else {
2124 return internal_err!("This test should always use an ordering");
2125 };
2126
2127 let partitioned_files = FileGroup::new(
2128 case.files.into_iter().map(From::from).collect::<Vec<_>>(),
2129 );
2130 let result = FileScanConfig::split_groups_by_statistics(
2131 &table_schema,
2132 &[partitioned_files.clone()],
2133 &sort_order,
2134 );
2135 let results_by_name = result
2136 .as_ref()
2137 .map(|file_groups| {
2138 file_groups
2139 .iter()
2140 .map(|file_group| {
2141 file_group
2142 .iter()
2143 .map(|file| {
2144 partitioned_files
2145 .iter()
2146 .find_map(|f| {
2147 if f.object_meta == file.object_meta {
2148 Some(
2149 f.object_meta
2150 .location
2151 .as_ref()
2152 .rsplit('/')
2153 .next()
2154 .unwrap()
2155 .trim_end_matches(".parquet"),
2156 )
2157 } else {
2158 None
2159 }
2160 })
2161 .unwrap()
2162 })
2163 .collect::<Vec<_>>()
2164 })
2165 .collect::<Vec<_>>()
2166 })
2167 .map_err(|e| e.strip_backtrace().leak() as &'static str);
2168
2169 assert_eq!(results_by_name, case.expected_result, "{}", case.name);
2170 }
2171
2172 return Ok(());
2173
2174 impl From<File> for PartitionedFile {
2175 fn from(file: File) -> Self {
2176 PartitionedFile {
2177 object_meta: ObjectMeta {
2178 location: Path::from(format!(
2179 "data/date={}/{}.parquet",
2180 file.date, file.name
2181 )),
2182 last_modified: chrono::Utc.timestamp_nanos(0),
2183 size: 0,
2184 e_tag: None,
2185 version: None,
2186 },
2187 partition_values: vec![ScalarValue::from(file.date)],
2188 range: None,
2189 statistics: Some(Arc::new(Statistics {
2190 num_rows: Precision::Absent,
2191 total_byte_size: Precision::Absent,
2192 column_statistics: file
2193 .statistics
2194 .into_iter()
2195 .map(|stats| {
2196 stats
2197 .map(|(min, max)| ColumnStatistics {
2198 min_value: Precision::Exact(
2199 ScalarValue::Float64(min),
2200 ),
2201 max_value: Precision::Exact(
2202 ScalarValue::Float64(max),
2203 ),
2204 ..Default::default()
2205 })
2206 .unwrap_or_default()
2207 })
2208 .collect::<Vec<_>>(),
2209 })),
2210 extensions: None,
2211 metadata_size_hint: None,
2212 }
2213 }
2214 }
2215 }
2216
2217 fn config_for_projection(
2219 file_schema: SchemaRef,
2220 projection: Option<Vec<usize>>,
2221 statistics: Statistics,
2222 table_partition_cols: Vec<Field>,
2223 ) -> FileScanConfig {
2224 FileScanConfigBuilder::new(
2225 ObjectStoreUrl::parse("test:///").unwrap(),
2226 file_schema,
2227 Arc::new(MockSource::default()),
2228 )
2229 .with_projection(projection)
2230 .with_statistics(statistics)
2231 .with_table_partition_cols(table_partition_cols)
2232 .build()
2233 }
2234
2235 fn to_partition_cols(table_partition_cols: Vec<(String, DataType)>) -> Vec<Field> {
2237 table_partition_cols
2238 .iter()
2239 .map(|(name, dtype)| Field::new(name, dtype.clone(), false))
2240 .collect::<Vec<_>>()
2241 }
2242
2243 pub fn build_table_i32(
2245 a: (&str, &Vec<i32>),
2246 b: (&str, &Vec<i32>),
2247 c: (&str, &Vec<i32>),
2248 ) -> RecordBatch {
2249 let schema = Schema::new(vec![
2250 Field::new(a.0, DataType::Int32, false),
2251 Field::new(b.0, DataType::Int32, false),
2252 Field::new(c.0, DataType::Int32, false),
2253 ]);
2254
2255 RecordBatch::try_new(
2256 Arc::new(schema),
2257 vec![
2258 Arc::new(Int32Array::from(a.1.clone())),
2259 Arc::new(Int32Array::from(b.1.clone())),
2260 Arc::new(Int32Array::from(c.1.clone())),
2261 ],
2262 )
2263 .unwrap()
2264 }
2265
2266 #[test]
2267 fn test_file_scan_config_builder() {
2268 let file_schema = aggr_test_schema();
2269 let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2270 let file_source: Arc<dyn FileSource> = Arc::new(MockSource::default());
2271
2272 let builder = FileScanConfigBuilder::new(
2274 object_store_url.clone(),
2275 Arc::clone(&file_schema),
2276 Arc::clone(&file_source),
2277 );
2278
2279 let config = builder
2281 .with_limit(Some(1000))
2282 .with_projection(Some(vec![0, 1]))
2283 .with_table_partition_cols(vec![Field::new(
2284 "date",
2285 wrap_partition_type_in_dict(DataType::Utf8),
2286 false,
2287 )])
2288 .with_statistics(Statistics::new_unknown(&file_schema))
2289 .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new(
2290 "test.parquet".to_string(),
2291 1024,
2292 )])])
2293 .with_output_ordering(vec![[PhysicalSortExpr::new_default(Arc::new(
2294 Column::new("date", 0),
2295 ))]
2296 .into()])
2297 .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
2298 .with_newlines_in_values(true)
2299 .build();
2300
2301 assert_eq!(config.object_store_url, object_store_url);
2303 assert_eq!(config.file_schema, file_schema);
2304 assert_eq!(config.limit, Some(1000));
2305 assert_eq!(config.projection, Some(vec![0, 1]));
2306 assert_eq!(config.table_partition_cols.len(), 1);
2307 assert_eq!(config.table_partition_cols[0].name(), "date");
2308 assert_eq!(config.file_groups.len(), 1);
2309 assert_eq!(config.file_groups[0].len(), 1);
2310 assert_eq!(
2311 config.file_groups[0][0].object_meta.location.as_ref(),
2312 "test.parquet"
2313 );
2314 assert_eq!(
2315 config.file_compression_type,
2316 FileCompressionType::UNCOMPRESSED
2317 );
2318 assert!(config.new_lines_in_values);
2319 assert_eq!(config.output_ordering.len(), 1);
2320 }
2321
2322 #[test]
2323 fn test_file_scan_config_builder_defaults() {
2324 let file_schema = aggr_test_schema();
2325 let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2326 let file_source: Arc<dyn FileSource> = Arc::new(MockSource::default());
2327
2328 let config = FileScanConfigBuilder::new(
2330 object_store_url.clone(),
2331 Arc::clone(&file_schema),
2332 Arc::clone(&file_source),
2333 )
2334 .build();
2335
2336 assert_eq!(config.object_store_url, object_store_url);
2338 assert_eq!(config.file_schema, file_schema);
2339 assert_eq!(config.limit, None);
2340 assert_eq!(config.projection, None);
2341 assert!(config.table_partition_cols.is_empty());
2342 assert!(config.file_groups.is_empty());
2343 assert_eq!(
2344 config.file_compression_type,
2345 FileCompressionType::UNCOMPRESSED
2346 );
2347 assert!(!config.new_lines_in_values);
2348 assert!(config.output_ordering.is_empty());
2349 assert!(config.constraints.is_empty());
2350
2351 assert_eq!(
2353 config.file_source.statistics().unwrap().num_rows,
2354 Precision::Absent
2355 );
2356 assert_eq!(
2357 config.file_source.statistics().unwrap().total_byte_size,
2358 Precision::Absent
2359 );
2360 assert_eq!(
2361 config
2362 .file_source
2363 .statistics()
2364 .unwrap()
2365 .column_statistics
2366 .len(),
2367 file_schema.fields().len()
2368 );
2369 for stat in config.file_source.statistics().unwrap().column_statistics {
2370 assert_eq!(stat.distinct_count, Precision::Absent);
2371 assert_eq!(stat.min_value, Precision::Absent);
2372 assert_eq!(stat.max_value, Precision::Absent);
2373 assert_eq!(stat.null_count, Precision::Absent);
2374 }
2375 }
2376
2377 #[test]
2378 fn test_file_scan_config_builder_new_from() {
2379 let schema = aggr_test_schema();
2380 let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2381 let file_source: Arc<dyn FileSource> = Arc::new(MockSource::default());
2382 let partition_cols = vec![Field::new(
2383 "date",
2384 wrap_partition_type_in_dict(DataType::Utf8),
2385 false,
2386 )];
2387 let file = PartitionedFile::new("test_file.parquet", 100);
2388
2389 let original_config = FileScanConfigBuilder::new(
2391 object_store_url.clone(),
2392 Arc::clone(&schema),
2393 Arc::clone(&file_source),
2394 )
2395 .with_projection(Some(vec![0, 2]))
2396 .with_limit(Some(10))
2397 .with_table_partition_cols(partition_cols.clone())
2398 .with_file(file.clone())
2399 .with_constraints(Constraints::default())
2400 .with_newlines_in_values(true)
2401 .build();
2402
2403 let new_builder = FileScanConfigBuilder::from(original_config);
2405
2406 let new_config = new_builder.build();
2408
2409 let partition_cols = partition_cols.into_iter().map(Arc::new).collect::<Vec<_>>();
2411 assert_eq!(new_config.object_store_url, object_store_url);
2412 assert_eq!(new_config.file_schema, schema);
2413 assert_eq!(new_config.projection, Some(vec![0, 2]));
2414 assert_eq!(new_config.limit, Some(10));
2415 assert_eq!(new_config.table_partition_cols, partition_cols);
2416 assert_eq!(new_config.file_groups.len(), 1);
2417 assert_eq!(new_config.file_groups[0].len(), 1);
2418 assert_eq!(
2419 new_config.file_groups[0][0].object_meta.location.as_ref(),
2420 "test_file.parquet"
2421 );
2422 assert_eq!(new_config.constraints, Constraints::default());
2423 assert!(new_config.new_lines_in_values);
2424 }
2425
2426 #[test]
2427 fn test_split_groups_by_statistics_with_target_partitions() -> Result<()> {
2428 use datafusion_common::DFSchema;
2429 use datafusion_expr::{col, execution_props::ExecutionProps};
2430
2431 let schema = Arc::new(Schema::new(vec![Field::new(
2432 "value",
2433 DataType::Float64,
2434 false,
2435 )]));
2436
2437 let exec_props = ExecutionProps::new();
2439 let df_schema = DFSchema::try_from_qualified_schema("test", schema.as_ref())?;
2440 let sort_expr = [col("value").sort(true, false)];
2441 let sort_ordering = sort_expr
2442 .map(|expr| {
2443 create_physical_sort_expr(&expr, &df_schema, &exec_props).unwrap()
2444 })
2445 .into();
2446
2447 struct TestCase {
2449 name: String,
2450 file_count: usize,
2451 overlap_factor: f64,
2452 target_partitions: usize,
2453 expected_partition_count: usize,
2454 }
2455
2456 let test_cases = vec![
2457 TestCase {
2459 name: "no_overlap_10_files_4_partitions".to_string(),
2460 file_count: 10,
2461 overlap_factor: 0.0,
2462 target_partitions: 4,
2463 expected_partition_count: 4,
2464 },
2465 TestCase {
2466 name: "medium_overlap_20_files_5_partitions".to_string(),
2467 file_count: 20,
2468 overlap_factor: 0.5,
2469 target_partitions: 5,
2470 expected_partition_count: 5,
2471 },
2472 TestCase {
2473 name: "high_overlap_30_files_3_partitions".to_string(),
2474 file_count: 30,
2475 overlap_factor: 0.8,
2476 target_partitions: 3,
2477 expected_partition_count: 7,
2478 },
2479 TestCase {
2481 name: "fewer_files_than_partitions".to_string(),
2482 file_count: 3,
2483 overlap_factor: 0.0,
2484 target_partitions: 10,
2485 expected_partition_count: 3, },
2487 TestCase {
2488 name: "single_file".to_string(),
2489 file_count: 1,
2490 overlap_factor: 0.0,
2491 target_partitions: 5,
2492 expected_partition_count: 1, },
2494 TestCase {
2495 name: "empty_files".to_string(),
2496 file_count: 0,
2497 overlap_factor: 0.0,
2498 target_partitions: 3,
2499 expected_partition_count: 0, },
2501 ];
2502
2503 for case in test_cases {
2504 println!("Running test case: {}", case.name);
2505
2506 let file_groups = generate_test_files(case.file_count, case.overlap_factor);
2508
2509 let result =
2511 FileScanConfig::split_groups_by_statistics_with_target_partitions(
2512 &schema,
2513 &file_groups,
2514 &sort_ordering,
2515 case.target_partitions,
2516 )?;
2517
2518 println!(
2520 "Created {} partitions (target was {})",
2521 result.len(),
2522 case.target_partitions
2523 );
2524
2525 assert_eq!(
2527 result.len(),
2528 case.expected_partition_count,
2529 "Case '{}': Unexpected partition count",
2530 case.name
2531 );
2532
2533 assert!(
2535 verify_sort_integrity(&result),
2536 "Case '{}': Files within partitions are not properly ordered",
2537 case.name
2538 );
2539
2540 if case.file_count > 1 && case.expected_partition_count > 1 {
2542 let group_sizes: Vec<usize> = result.iter().map(FileGroup::len).collect();
2543 let max_size = *group_sizes.iter().max().unwrap();
2544 let min_size = *group_sizes.iter().min().unwrap();
2545
2546 let avg_files_per_partition =
2548 case.file_count as f64 / case.expected_partition_count as f64;
2549 assert!(
2550 (max_size as f64) < 2.0 * avg_files_per_partition,
2551 "Case '{}': Unbalanced distribution. Max partition size {} exceeds twice the average {}",
2552 case.name,
2553 max_size,
2554 avg_files_per_partition
2555 );
2556
2557 println!("Distribution - min files: {min_size}, max files: {max_size}");
2558 }
2559 }
2560
2561 let empty_groups: Vec<FileGroup> = vec![];
2563 let err = FileScanConfig::split_groups_by_statistics_with_target_partitions(
2564 &schema,
2565 &empty_groups,
2566 &sort_ordering,
2567 0,
2568 )
2569 .unwrap_err();
2570
2571 assert!(
2572 err.to_string()
2573 .contains("target_partitions must be greater than 0"),
2574 "Expected error for zero target partitions"
2575 );
2576
2577 Ok(())
2578 }
2579}