1use crate::file_groups::FileGroup;
22use crate::{
23 PartitionedFile, display::FileGroupsDisplay, file::FileSource,
24 file_compression_type::FileCompressionType, file_stream::FileStream,
25 source::DataSource, statistics::MinMaxStatistics,
26};
27use arrow::datatypes::FieldRef;
28use arrow::datatypes::{DataType, Schema, SchemaRef};
29use datafusion_common::config::ConfigOptions;
30use datafusion_common::{
31 Constraints, Result, ScalarValue, Statistics, internal_datafusion_err, internal_err,
32};
33use datafusion_execution::{
34 SendableRecordBatchStream, TaskContext, object_store::ObjectStoreUrl,
35};
36use datafusion_expr::Operator;
37
38use datafusion_physical_expr::equivalence::project_orderings;
39use datafusion_physical_expr::expressions::{BinaryExpr, Column};
40use datafusion_physical_expr::projection::ProjectionExprs;
41use datafusion_physical_expr::utils::reassign_expr_columns;
42use datafusion_physical_expr::{EquivalenceProperties, Partitioning, split_conjunction};
43use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
44use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
45use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
46use datafusion_physical_plan::SortOrderPushdownResult;
47use datafusion_physical_plan::coop::cooperative;
48use datafusion_physical_plan::execution_plan::SchedulingType;
49use datafusion_physical_plan::{
50 DisplayAs, DisplayFormatType,
51 display::{ProjectSchemaDisplay, display_orderings},
52 filter_pushdown::FilterPushdownPropagation,
53 metrics::ExecutionPlanMetricsSet,
54};
55use log::{debug, warn};
56use std::{any::Any, fmt::Debug, fmt::Formatter, fmt::Result as FmtResult, sync::Arc};
57
58#[derive(Clone)]
138pub struct FileScanConfig {
139 pub object_store_url: ObjectStoreUrl,
151 pub file_groups: Vec<FileGroup>,
161 pub constraints: Constraints,
163 pub limit: Option<usize>,
166 pub preserve_order: bool,
171 pub output_ordering: Vec<LexOrdering>,
179 pub file_compression_type: FileCompressionType,
181 pub file_source: Arc<dyn FileSource>,
183 pub batch_size: Option<usize>,
186 pub expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
189 pub(crate) statistics: Statistics,
199 pub partitioned_by_file_group: bool,
207}
208
209#[derive(Clone)]
264pub struct FileScanConfigBuilder {
265 object_store_url: ObjectStoreUrl,
266 file_source: Arc<dyn FileSource>,
267 limit: Option<usize>,
268 preserve_order: bool,
269 constraints: Option<Constraints>,
270 file_groups: Vec<FileGroup>,
271 statistics: Option<Statistics>,
272 output_ordering: Vec<LexOrdering>,
273 file_compression_type: Option<FileCompressionType>,
274 batch_size: Option<usize>,
275 expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
276 partitioned_by_file_group: bool,
277}
278
279impl FileScanConfigBuilder {
280 pub fn new(
287 object_store_url: ObjectStoreUrl,
288 file_source: Arc<dyn FileSource>,
289 ) -> Self {
290 Self {
291 object_store_url,
292 file_source,
293 file_groups: vec![],
294 statistics: None,
295 output_ordering: vec![],
296 file_compression_type: None,
297 limit: None,
298 preserve_order: false,
299 constraints: None,
300 batch_size: None,
301 expr_adapter_factory: None,
302 partitioned_by_file_group: false,
303 }
304 }
305
306 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
310 self.limit = limit;
311 self
312 }
313
314 pub fn with_preserve_order(mut self, order_sensitive: bool) -> Self {
321 self.preserve_order = order_sensitive;
322 self
323 }
324
325 pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
330 self.file_source = file_source;
331 self
332 }
333
334 pub fn table_schema(&self) -> &SchemaRef {
336 self.file_source.table_schema().table_schema()
337 }
338
339 #[deprecated(since = "51.0.0", note = "Use with_projection_indices instead")]
345 pub fn with_projection(self, indices: Option<Vec<usize>>) -> Self {
346 match self.clone().with_projection_indices(indices) {
347 Ok(builder) => builder,
348 Err(e) => {
349 warn!(
350 "Failed to push down projection in FileScanConfigBuilder::with_projection: {e}"
351 );
352 self
353 }
354 }
355 }
356
357 pub fn with_projection_indices(
366 mut self,
367 indices: Option<Vec<usize>>,
368 ) -> Result<Self> {
369 let projection_exprs = indices.map(|indices| {
370 ProjectionExprs::from_indices(
371 &indices,
372 self.file_source.table_schema().table_schema(),
373 )
374 });
375 let Some(projection_exprs) = projection_exprs else {
376 return Ok(self);
377 };
378 let new_source = self
379 .file_source
380 .try_pushdown_projection(&projection_exprs)
381 .map_err(|e| {
382 internal_datafusion_err!(
383 "Failed to push down projection in FileScanConfigBuilder::build: {e}"
384 )
385 })?;
386 if let Some(new_source) = new_source {
387 self.file_source = new_source;
388 } else {
389 internal_err!(
390 "FileSource {} does not support projection pushdown",
391 self.file_source.file_type()
392 )?;
393 }
394 Ok(self)
395 }
396
397 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
399 self.constraints = Some(constraints);
400 self
401 }
402
403 pub fn with_statistics(mut self, statistics: Statistics) -> Self {
416 self.statistics = Some(statistics);
417 self
418 }
419
420 pub fn with_file_groups(mut self, file_groups: Vec<FileGroup>) -> Self {
430 self.file_groups = file_groups;
431 self
432 }
433
434 pub fn with_file_group(mut self, file_group: FileGroup) -> Self {
438 self.file_groups.push(file_group);
439 self
440 }
441
442 pub fn with_file(self, partitioned_file: PartitionedFile) -> Self {
446 self.with_file_group(FileGroup::new(vec![partitioned_file]))
447 }
448
449 pub fn with_output_ordering(mut self, output_ordering: Vec<LexOrdering>) -> Self {
458 self.output_ordering = output_ordering;
459 self
460 }
461
462 pub fn with_file_compression_type(
464 mut self,
465 file_compression_type: FileCompressionType,
466 ) -> Self {
467 self.file_compression_type = Some(file_compression_type);
468 self
469 }
470
471 pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
473 self.batch_size = batch_size;
474 self
475 }
476
477 pub fn with_expr_adapter(
484 mut self,
485 expr_adapter: Option<Arc<dyn PhysicalExprAdapterFactory>>,
486 ) -> Self {
487 self.expr_adapter_factory = expr_adapter;
488 self
489 }
490
491 pub fn with_partitioned_by_file_group(
496 mut self,
497 partitioned_by_file_group: bool,
498 ) -> Self {
499 self.partitioned_by_file_group = partitioned_by_file_group;
500 self
501 }
502
503 pub fn build(self) -> FileScanConfig {
511 let Self {
512 object_store_url,
513 file_source,
514 limit,
515 preserve_order,
516 constraints,
517 file_groups,
518 statistics,
519 output_ordering,
520 file_compression_type,
521 batch_size,
522 expr_adapter_factory: expr_adapter,
523 partitioned_by_file_group,
524 } = self;
525
526 let constraints = constraints.unwrap_or_default();
527 let statistics = statistics.unwrap_or_else(|| {
528 Statistics::new_unknown(file_source.table_schema().table_schema())
529 });
530 let file_compression_type =
531 file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED);
532
533 let preserve_order = preserve_order || !output_ordering.is_empty();
535
536 FileScanConfig {
537 object_store_url,
538 file_source,
539 limit,
540 preserve_order,
541 constraints,
542 file_groups,
543 output_ordering,
544 file_compression_type,
545 batch_size,
546 expr_adapter_factory: expr_adapter,
547 statistics,
548 partitioned_by_file_group,
549 }
550 }
551}
552
553impl From<FileScanConfig> for FileScanConfigBuilder {
554 fn from(config: FileScanConfig) -> Self {
555 Self {
556 object_store_url: config.object_store_url,
557 file_source: Arc::<dyn FileSource>::clone(&config.file_source),
558 file_groups: config.file_groups,
559 statistics: Some(config.statistics),
560 output_ordering: config.output_ordering,
561 file_compression_type: Some(config.file_compression_type),
562 limit: config.limit,
563 preserve_order: config.preserve_order,
564 constraints: Some(config.constraints),
565 batch_size: config.batch_size,
566 expr_adapter_factory: config.expr_adapter_factory,
567 partitioned_by_file_group: config.partitioned_by_file_group,
568 }
569 }
570}
571
572impl DataSource for FileScanConfig {
573 fn open(
574 &self,
575 partition: usize,
576 context: Arc<TaskContext>,
577 ) -> Result<SendableRecordBatchStream> {
578 let object_store = context.runtime_env().object_store(&self.object_store_url)?;
579 let batch_size = self
580 .batch_size
581 .unwrap_or_else(|| context.session_config().batch_size());
582
583 let source = self.file_source.with_batch_size(batch_size);
584
585 let opener = source.create_file_opener(object_store, self, partition)?;
586
587 let stream = FileStream::new(self, partition, opener, source.metrics())?;
588 Ok(Box::pin(cooperative(stream)))
589 }
590
591 fn as_any(&self) -> &dyn Any {
592 self
593 }
594
595 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
596 match t {
597 DisplayFormatType::Default | DisplayFormatType::Verbose => {
598 let schema = self.projected_schema().map_err(|_| std::fmt::Error {})?;
599 let orderings = get_projected_output_ordering(self, &schema);
600
601 write!(f, "file_groups=")?;
602 FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
603
604 if !schema.fields().is_empty() {
605 if let Some(projection) = self.file_source.projection() {
606 let expr: Vec<String> = projection
609 .as_ref()
610 .iter()
611 .map(|proj_expr| {
612 if let Some(column) =
613 proj_expr.expr.as_any().downcast_ref::<Column>()
614 {
615 if column.name() == proj_expr.alias {
616 column.name().to_string()
617 } else {
618 format!(
619 "{} as {}",
620 proj_expr.expr, proj_expr.alias
621 )
622 }
623 } else {
624 format!("{} as {}", proj_expr.expr, proj_expr.alias)
625 }
626 })
627 .collect();
628 write!(f, ", projection=[{}]", expr.join(", "))?;
629 } else {
630 write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
631 }
632 }
633
634 if let Some(limit) = self.limit {
635 write!(f, ", limit={limit}")?;
636 }
637
638 display_orderings(f, &orderings)?;
639
640 if !self.constraints.is_empty() {
641 write!(f, ", {}", self.constraints)?;
642 }
643
644 self.fmt_file_source(t, f)
645 }
646 DisplayFormatType::TreeRender => {
647 writeln!(f, "format={}", self.file_source.file_type())?;
648 self.file_source.fmt_extra(t, f)?;
649 let num_files = self.file_groups.iter().map(|fg| fg.len()).sum::<usize>();
650 writeln!(f, "files={num_files}")?;
651 Ok(())
652 }
653 }
654 }
655
656 fn repartitioned(
658 &self,
659 target_partitions: usize,
660 repartition_file_min_size: usize,
661 output_ordering: Option<LexOrdering>,
662 ) -> Result<Option<Arc<dyn DataSource>>> {
663 if self.partitioned_by_file_group {
667 return Ok(None);
668 }
669
670 let source = self.file_source.repartitioned(
671 target_partitions,
672 repartition_file_min_size,
673 output_ordering,
674 self,
675 )?;
676
677 Ok(source.map(|s| Arc::new(s) as _))
678 }
679
680 fn output_partitioning(&self) -> Partitioning {
697 if self.partitioned_by_file_group {
698 let partition_cols = self.table_partition_cols();
699 if !partition_cols.is_empty() {
700 let projected_schema = match self.projected_schema() {
701 Ok(schema) => schema,
702 Err(_) => {
703 debug!(
704 "Could not get projected schema, falling back to UnknownPartitioning."
705 );
706 return Partitioning::UnknownPartitioning(self.file_groups.len());
707 }
708 };
709
710 let mut exprs: Vec<Arc<dyn PhysicalExpr>> = Vec::new();
713 for partition_col in partition_cols {
714 if let Some((idx, _)) = projected_schema
715 .fields()
716 .iter()
717 .enumerate()
718 .find(|(_, f)| f.name() == partition_col.name())
719 {
720 exprs.push(Arc::new(Column::new(partition_col.name(), idx)));
721 }
722 }
723
724 if exprs.len() == partition_cols.len() {
725 return Partitioning::Hash(exprs, self.file_groups.len());
726 }
727 }
728 }
729 Partitioning::UnknownPartitioning(self.file_groups.len())
730 }
731
732 fn eq_properties(&self) -> EquivalenceProperties {
736 let schema = self.file_source.table_schema().table_schema();
737 let mut eq_properties = EquivalenceProperties::new_with_orderings(
738 Arc::clone(schema),
739 self.validated_output_ordering(),
740 )
741 .with_constraints(self.constraints.clone());
742
743 if let Some(filter) = self.file_source.filter() {
744 match Self::add_filter_equivalence_info(&filter, &mut eq_properties, schema) {
747 Ok(()) => {}
748 Err(e) => {
749 warn!("Failed to add filter equivalence info: {e}");
750 #[cfg(debug_assertions)]
751 panic!("Failed to add filter equivalence info: {e}");
752 }
753 }
754 }
755
756 if let Some(projection) = self.file_source.projection() {
757 match (
758 projection.project_schema(schema),
759 projection.projection_mapping(schema),
760 ) {
761 (Ok(output_schema), Ok(mapping)) => {
762 eq_properties =
763 eq_properties.project(&mapping, Arc::new(output_schema));
764 }
765 (Err(e), _) | (_, Err(e)) => {
766 warn!("Failed to project equivalence properties: {e}");
767 #[cfg(debug_assertions)]
768 panic!("Failed to project equivalence properties: {e}");
769 }
770 }
771 }
772
773 eq_properties
774 }
775
776 fn scheduling_type(&self) -> SchedulingType {
777 SchedulingType::Cooperative
778 }
779
780 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
781 if let Some(partition) = partition {
782 if let Some(file_group) = self.file_groups.get(partition)
785 && let Some(stat) = file_group.file_statistics(None)
786 {
787 let output_schema = self.projected_schema()?;
789 return if let Some(projection) = self.file_source.projection() {
790 projection.project_statistics(stat.clone(), &output_schema)
791 } else {
792 Ok(stat.clone())
793 };
794 }
795 Ok(Statistics::new_unknown(self.projected_schema()?.as_ref()))
797 } else {
798 let statistics = self.statistics();
800 let projection = self.file_source.projection();
801 let output_schema = self.projected_schema()?;
802 if let Some(projection) = &projection {
803 projection.project_statistics(statistics.clone(), &output_schema)
804 } else {
805 Ok(statistics)
806 }
807 }
808 }
809
810 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
811 let source = FileScanConfigBuilder::from(self.clone())
812 .with_limit(limit)
813 .build();
814 Some(Arc::new(source))
815 }
816
817 fn fetch(&self) -> Option<usize> {
818 self.limit
819 }
820
821 fn metrics(&self) -> ExecutionPlanMetricsSet {
822 self.file_source.metrics().clone()
823 }
824
825 fn try_swapping_with_projection(
826 &self,
827 projection: &ProjectionExprs,
828 ) -> Result<Option<Arc<dyn DataSource>>> {
829 match self.file_source.try_pushdown_projection(projection)? {
830 Some(new_source) => {
831 let mut new_file_scan_config = self.clone();
832 new_file_scan_config.file_source = new_source;
833 Ok(Some(Arc::new(new_file_scan_config) as Arc<dyn DataSource>))
834 }
835 None => Ok(None),
836 }
837 }
838
839 fn try_pushdown_filters(
840 &self,
841 filters: Vec<Arc<dyn PhysicalExpr>>,
842 config: &ConfigOptions,
843 ) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
844 let table_schema = self.file_source.table_schema().table_schema();
853 let filters_to_remap = if let Some(projection) = self.file_source.projection() {
854 filters
855 .into_iter()
856 .map(|filter| projection.unproject_expr(&filter))
857 .collect::<Result<Vec<_>>>()?
858 } else {
859 filters
860 };
861 let remapped_filters = filters_to_remap
863 .into_iter()
864 .map(|filter| reassign_expr_columns(filter, table_schema))
865 .collect::<Result<Vec<_>>>()?;
866
867 let result = self
868 .file_source
869 .try_pushdown_filters(remapped_filters, config)?;
870 match result.updated_node {
871 Some(new_file_source) => {
872 let mut new_file_scan_config = self.clone();
873 new_file_scan_config.file_source = new_file_source;
874 Ok(FilterPushdownPropagation {
875 filters: result.filters,
876 updated_node: Some(Arc::new(new_file_scan_config) as _),
877 })
878 }
879 None => {
880 Ok(FilterPushdownPropagation {
882 filters: result.filters,
883 updated_node: None,
884 })
885 }
886 }
887 }
888
889 fn try_pushdown_sort(
890 &self,
891 order: &[PhysicalSortExpr],
892 ) -> Result<SortOrderPushdownResult<Arc<dyn DataSource>>> {
893 let pushdown_result = self
895 .file_source
896 .try_pushdown_sort(order, &self.eq_properties())?;
897
898 match pushdown_result {
899 SortOrderPushdownResult::Exact { inner } => {
900 Ok(SortOrderPushdownResult::Exact {
901 inner: self.rebuild_with_source(inner, true, order)?,
902 })
903 }
904 SortOrderPushdownResult::Inexact { inner } => {
905 Ok(SortOrderPushdownResult::Inexact {
906 inner: self.rebuild_with_source(inner, false, order)?,
907 })
908 }
909 SortOrderPushdownResult::Unsupported => {
910 Ok(SortOrderPushdownResult::Unsupported)
911 }
912 }
913 }
914
915 fn with_preserve_order(&self, preserve_order: bool) -> Option<Arc<dyn DataSource>> {
916 if self.preserve_order == preserve_order {
917 return Some(Arc::new(self.clone()));
918 }
919
920 let new_config = FileScanConfig {
921 preserve_order,
922 ..self.clone()
923 };
924 Some(Arc::new(new_config))
925 }
926}
927
928impl FileScanConfig {
929 fn validated_output_ordering(&self) -> Vec<LexOrdering> {
959 let schema = self.file_source.table_schema().table_schema();
960 validate_orderings(&self.output_ordering, schema, &self.file_groups, None)
961 }
962
963 pub fn file_schema(&self) -> &SchemaRef {
965 self.file_source.table_schema().file_schema()
966 }
967
968 pub fn table_partition_cols(&self) -> &Vec<FieldRef> {
970 self.file_source.table_schema().table_partition_cols()
971 }
972
973 pub fn statistics(&self) -> Statistics {
979 if self.file_source.filter().is_some() {
980 self.statistics.clone().to_inexact()
981 } else {
982 self.statistics.clone()
983 }
984 }
985
986 pub fn projected_schema(&self) -> Result<Arc<Schema>> {
987 let schema = self.file_source.table_schema().table_schema();
988 match self.file_source.projection() {
989 Some(proj) => Ok(Arc::new(proj.project_schema(schema)?)),
990 None => Ok(Arc::clone(schema)),
991 }
992 }
993
994 fn add_filter_equivalence_info(
995 filter: &Arc<dyn PhysicalExpr>,
996 eq_properties: &mut EquivalenceProperties,
997 schema: &Schema,
998 ) -> Result<()> {
999 let equal_pairs = split_conjunction(filter).into_iter().filter_map(|expr| {
1001 reassign_expr_columns(Arc::clone(expr), schema)
1004 .ok()
1005 .and_then(|expr| match expr.as_any().downcast_ref::<BinaryExpr>() {
1006 Some(expr) if expr.op() == &Operator::Eq => {
1007 Some((Arc::clone(expr.left()), Arc::clone(expr.right())))
1008 }
1009 _ => None,
1010 })
1011 });
1012
1013 for (lhs, rhs) in equal_pairs {
1014 eq_properties.add_equal_conditions(lhs, rhs)?
1015 }
1016
1017 Ok(())
1018 }
1019
1020 #[deprecated(
1029 since = "52.0.0",
1030 note = "newlines_in_values has moved to CsvSource. Access it via CsvSource::csv_options().newlines_in_values instead. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
1031 )]
1032 pub fn newlines_in_values(&self) -> bool {
1033 false
1034 }
1035
1036 #[deprecated(
1037 since = "52.0.0",
1038 note = "This method is no longer used, use eq_properties instead. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
1039 )]
1040 pub fn projected_constraints(&self) -> Constraints {
1041 let props = self.eq_properties();
1042 props.constraints().clone()
1043 }
1044
1045 #[deprecated(
1046 since = "52.0.0",
1047 note = "This method is no longer used, use eq_properties instead. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
1048 )]
1049 pub fn file_column_projection_indices(&self) -> Option<Vec<usize>> {
1050 #[expect(deprecated)]
1051 self.file_source.projection().as_ref().map(|p| {
1052 p.ordered_column_indices()
1053 .into_iter()
1054 .filter(|&i| i < self.file_schema().fields().len())
1055 .collect::<Vec<_>>()
1056 })
1057 }
1058
1059 pub fn split_groups_by_statistics_with_target_partitions(
1081 table_schema: &SchemaRef,
1082 file_groups: &[FileGroup],
1083 sort_order: &LexOrdering,
1084 target_partitions: usize,
1085 ) -> Result<Vec<FileGroup>> {
1086 if target_partitions == 0 {
1087 return Err(internal_datafusion_err!(
1088 "target_partitions must be greater than 0"
1089 ));
1090 }
1091
1092 let flattened_files = file_groups
1093 .iter()
1094 .flat_map(FileGroup::iter)
1095 .collect::<Vec<_>>();
1096
1097 if flattened_files.is_empty() {
1098 return Ok(vec![]);
1099 }
1100
1101 let statistics = MinMaxStatistics::new_from_files(
1102 sort_order,
1103 table_schema,
1104 None,
1105 flattened_files.iter().copied(),
1106 )?;
1107
1108 let indices_sorted_by_min = statistics.min_values_sorted();
1109
1110 let mut file_groups_indices: Vec<Vec<usize>> = vec![vec![]; target_partitions];
1112
1113 for (idx, min) in indices_sorted_by_min {
1114 if let Some((_, group)) = file_groups_indices
1115 .iter_mut()
1116 .enumerate()
1117 .filter(|(_, group)| {
1118 group.is_empty()
1119 || min
1120 > statistics
1121 .max(*group.last().expect("groups should not be empty"))
1122 })
1123 .min_by_key(|(_, group)| group.len())
1124 {
1125 group.push(idx);
1126 } else {
1127 file_groups_indices.push(vec![idx]);
1129 }
1130 }
1131
1132 file_groups_indices.retain(|group| !group.is_empty());
1134
1135 Ok(file_groups_indices
1137 .into_iter()
1138 .map(|file_group_indices| {
1139 FileGroup::new(
1140 file_group_indices
1141 .into_iter()
1142 .map(|idx| flattened_files[idx].clone())
1143 .collect(),
1144 )
1145 })
1146 .collect())
1147 }
1148
1149 pub fn split_groups_by_statistics(
1153 table_schema: &SchemaRef,
1154 file_groups: &[FileGroup],
1155 sort_order: &LexOrdering,
1156 ) -> Result<Vec<FileGroup>> {
1157 let flattened_files = file_groups
1158 .iter()
1159 .flat_map(FileGroup::iter)
1160 .collect::<Vec<_>>();
1161 if flattened_files.is_empty() {
1173 return Ok(vec![]);
1174 }
1175
1176 let statistics = MinMaxStatistics::new_from_files(
1177 sort_order,
1178 table_schema,
1179 None,
1180 flattened_files.iter().copied(),
1181 )
1182 .map_err(|e| {
1183 e.context("construct min/max statistics for split_groups_by_statistics")
1184 })?;
1185
1186 let indices_sorted_by_min = statistics.min_values_sorted();
1187 let mut file_groups_indices: Vec<Vec<usize>> = vec![];
1188
1189 for (idx, min) in indices_sorted_by_min {
1190 let file_group_to_insert = file_groups_indices.iter_mut().find(|group| {
1191 min > statistics.max(
1194 *group
1195 .last()
1196 .expect("groups should be nonempty at construction"),
1197 )
1198 });
1199 match file_group_to_insert {
1200 Some(group) => group.push(idx),
1201 None => file_groups_indices.push(vec![idx]),
1202 }
1203 }
1204
1205 Ok(file_groups_indices
1207 .into_iter()
1208 .map(|file_group_indices| {
1209 file_group_indices
1210 .into_iter()
1211 .map(|idx| flattened_files[idx].clone())
1212 .collect()
1213 })
1214 .collect())
1215 }
1216
1217 fn fmt_file_source(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
1219 write!(f, ", file_type={}", self.file_source.file_type())?;
1220 self.file_source.fmt_extra(t, f)
1221 }
1222
1223 pub fn file_source(&self) -> &Arc<dyn FileSource> {
1225 &self.file_source
1226 }
1227
1228 fn rebuild_with_source(
1230 &self,
1231 new_file_source: Arc<dyn FileSource>,
1232 is_exact: bool,
1233 order: &[PhysicalSortExpr],
1234 ) -> Result<Arc<dyn DataSource>> {
1235 let mut new_config = self.clone();
1236
1237 let reverse_file_groups = if self.output_ordering.is_empty() {
1249 false
1250 } else if let Some(requested) = LexOrdering::new(order.iter().cloned()) {
1251 let projected_schema = self.projected_schema()?;
1252 let orderings = project_orderings(&self.output_ordering, &projected_schema);
1253 orderings
1254 .iter()
1255 .any(|ordering| ordering.is_reverse(&requested))
1256 } else {
1257 false
1258 };
1259
1260 if reverse_file_groups {
1261 new_config.file_groups = new_config
1262 .file_groups
1263 .into_iter()
1264 .map(|group| {
1265 let mut files = group.into_inner();
1266 files.reverse();
1267 files.into()
1268 })
1269 .collect();
1270 }
1271
1272 new_config.file_source = new_file_source;
1273
1274 if !is_exact {
1277 new_config.output_ordering = vec![];
1278 }
1279
1280 Ok(Arc::new(new_config))
1281 }
1282}
1283
1284impl Debug for FileScanConfig {
1285 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1286 write!(f, "FileScanConfig {{")?;
1287 write!(f, "object_store_url={:?}, ", self.object_store_url)?;
1288
1289 write!(f, "statistics={:?}, ", self.statistics())?;
1290
1291 DisplayAs::fmt_as(self, DisplayFormatType::Verbose, f)?;
1292 write!(f, "}}")
1293 }
1294}
1295
1296impl DisplayAs for FileScanConfig {
1297 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
1298 let schema = self.projected_schema().map_err(|_| std::fmt::Error {})?;
1299 let orderings = get_projected_output_ordering(self, &schema);
1300
1301 write!(f, "file_groups=")?;
1302 FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
1303
1304 if !schema.fields().is_empty() {
1305 write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
1306 }
1307
1308 if let Some(limit) = self.limit {
1309 write!(f, ", limit={limit}")?;
1310 }
1311
1312 display_orderings(f, &orderings)?;
1313
1314 if !self.constraints.is_empty() {
1315 write!(f, ", {}", self.constraints)?;
1316 }
1317
1318 Ok(())
1319 }
1320}
1321
1322fn ordered_column_indices_from_projection(
1326 projection: &ProjectionExprs,
1327) -> Option<Vec<usize>> {
1328 projection
1329 .expr_iter()
1330 .map(|e| {
1331 let index = e.as_any().downcast_ref::<Column>()?.index();
1332 Some(index)
1333 })
1334 .collect::<Option<Vec<usize>>>()
1335}
1336
1337fn is_ordering_valid_for_file_groups(
1348 file_groups: &[FileGroup],
1349 ordering: &LexOrdering,
1350 schema: &SchemaRef,
1351 projection: Option<&[usize]>,
1352) -> bool {
1353 file_groups.iter().all(|group| {
1354 if group.len() <= 1 {
1355 return true; }
1357 match MinMaxStatistics::new_from_files(ordering, schema, projection, group.iter())
1358 {
1359 Ok(stats) => stats.is_sorted(),
1360 Err(_) => false, }
1362 })
1363}
1364
1365fn validate_orderings(
1368 orderings: &[LexOrdering],
1369 schema: &SchemaRef,
1370 file_groups: &[FileGroup],
1371 projection: Option<&[usize]>,
1372) -> Vec<LexOrdering> {
1373 orderings
1374 .iter()
1375 .filter(|ordering| {
1376 is_ordering_valid_for_file_groups(file_groups, ordering, schema, projection)
1377 })
1378 .cloned()
1379 .collect()
1380}
1381
1382fn get_projected_output_ordering(
1442 base_config: &FileScanConfig,
1443 projected_schema: &SchemaRef,
1444) -> Vec<LexOrdering> {
1445 let projected_orderings =
1446 project_orderings(&base_config.output_ordering, projected_schema);
1447
1448 let indices = base_config
1449 .file_source
1450 .projection()
1451 .as_ref()
1452 .map(|p| ordered_column_indices_from_projection(p));
1453
1454 match indices {
1455 Some(Some(indices)) => {
1456 validate_orderings(
1458 &projected_orderings,
1459 projected_schema,
1460 &base_config.file_groups,
1461 Some(indices.as_slice()),
1462 )
1463 }
1464 None => {
1465 validate_orderings(
1467 &projected_orderings,
1468 projected_schema,
1469 &base_config.file_groups,
1470 None,
1471 )
1472 }
1473 Some(None) => {
1474 if base_config.file_groups.iter().all(|g| g.len() <= 1) {
1478 projected_orderings
1479 } else {
1480 debug!(
1481 "Skipping specified output orderings. \
1482 Some file groups couldn't be determined to be sorted: {:?}",
1483 base_config.file_groups
1484 );
1485 vec![]
1486 }
1487 }
1488 }
1489}
1490
1491pub fn wrap_partition_type_in_dict(val_type: DataType) -> DataType {
1502 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type))
1503}
1504
1505pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {
1509 ScalarValue::Dictionary(Box::new(DataType::UInt16), Box::new(val))
1510}
1511
1512#[cfg(test)]
1513mod tests {
1514 use std::collections::HashMap;
1515
1516 use super::*;
1517 use crate::TableSchema;
1518 use crate::test_util::col;
1519 use crate::{
1520 generate_test_files, test_util::MockSource, tests::aggr_test_schema,
1521 verify_sort_integrity,
1522 };
1523
1524 use arrow::datatypes::Field;
1525 use datafusion_common::stats::Precision;
1526 use datafusion_common::{ColumnStatistics, internal_err};
1527 use datafusion_expr::{Operator, SortExpr};
1528 use datafusion_physical_expr::create_physical_sort_expr;
1529 use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
1530 use datafusion_physical_expr::projection::ProjectionExpr;
1531 use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
1532
1533 #[derive(Clone)]
1534 struct InexactSortPushdownSource {
1535 metrics: ExecutionPlanMetricsSet,
1536 table_schema: TableSchema,
1537 }
1538
1539 impl InexactSortPushdownSource {
1540 fn new(table_schema: TableSchema) -> Self {
1541 Self {
1542 metrics: ExecutionPlanMetricsSet::new(),
1543 table_schema,
1544 }
1545 }
1546 }
1547
1548 impl FileSource for InexactSortPushdownSource {
1549 fn create_file_opener(
1550 &self,
1551 _object_store: Arc<dyn object_store::ObjectStore>,
1552 _base_config: &FileScanConfig,
1553 _partition: usize,
1554 ) -> Result<Arc<dyn crate::file_stream::FileOpener>> {
1555 unimplemented!()
1556 }
1557
1558 fn as_any(&self) -> &dyn Any {
1559 self
1560 }
1561
1562 fn table_schema(&self) -> &TableSchema {
1563 &self.table_schema
1564 }
1565
1566 fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
1567 Arc::new(self.clone())
1568 }
1569
1570 fn metrics(&self) -> &ExecutionPlanMetricsSet {
1571 &self.metrics
1572 }
1573
1574 fn file_type(&self) -> &str {
1575 "mock"
1576 }
1577
1578 fn try_pushdown_sort(
1579 &self,
1580 _order: &[PhysicalSortExpr],
1581 _eq_properties: &EquivalenceProperties,
1582 ) -> Result<SortOrderPushdownResult<Arc<dyn FileSource>>> {
1583 Ok(SortOrderPushdownResult::Inexact {
1584 inner: Arc::new(self.clone()) as Arc<dyn FileSource>,
1585 })
1586 }
1587 }
1588
1589 #[test]
1590 fn physical_plan_config_no_projection_tab_cols_as_field() {
1591 let file_schema = aggr_test_schema();
1592
1593 let table_partition_col =
1595 Field::new("date", wrap_partition_type_in_dict(DataType::Utf8), true)
1596 .with_metadata(HashMap::from_iter(vec![(
1597 "key_whatever".to_owned(),
1598 "value_whatever".to_owned(),
1599 )]));
1600
1601 let conf = config_for_projection(
1602 Arc::clone(&file_schema),
1603 None,
1604 Statistics::new_unknown(&file_schema),
1605 vec![table_partition_col.clone()],
1606 );
1607
1608 let proj_schema = conf.projected_schema().unwrap();
1610 assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
1611 assert_eq!(
1612 *proj_schema.field(file_schema.fields().len()),
1613 table_partition_col,
1614 "partition columns are the last columns and ust have all values defined in created field"
1615 );
1616 }
1617
1618 #[test]
1619 fn test_split_groups_by_statistics() -> Result<()> {
1620 use chrono::TimeZone;
1621 use datafusion_common::DFSchema;
1622 use datafusion_expr::execution_props::ExecutionProps;
1623 use object_store::{ObjectMeta, path::Path};
1624
1625 struct File {
1626 name: &'static str,
1627 date: &'static str,
1628 statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
1629 }
1630 impl File {
1631 fn new(
1632 name: &'static str,
1633 date: &'static str,
1634 statistics: Vec<Option<(f64, f64)>>,
1635 ) -> Self {
1636 Self::new_nullable(
1637 name,
1638 date,
1639 statistics
1640 .into_iter()
1641 .map(|opt| opt.map(|(min, max)| (Some(min), Some(max))))
1642 .collect(),
1643 )
1644 }
1645
1646 fn new_nullable(
1647 name: &'static str,
1648 date: &'static str,
1649 statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
1650 ) -> Self {
1651 Self {
1652 name,
1653 date,
1654 statistics,
1655 }
1656 }
1657 }
1658
1659 struct TestCase {
1660 name: &'static str,
1661 file_schema: Schema,
1662 files: Vec<File>,
1663 sort: Vec<SortExpr>,
1664 expected_result: Result<Vec<Vec<&'static str>>, &'static str>,
1665 }
1666
1667 use datafusion_expr::col;
1668 let cases = vec![
1669 TestCase {
1670 name: "test sort",
1671 file_schema: Schema::new(vec![Field::new(
1672 "value".to_string(),
1673 DataType::Float64,
1674 false,
1675 )]),
1676 files: vec![
1677 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1678 File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1679 File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1680 ],
1681 sort: vec![col("value").sort(true, false)],
1682 expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1683 },
1684 TestCase {
1687 name: "test sort with files ordered differently",
1688 file_schema: Schema::new(vec![Field::new(
1689 "value".to_string(),
1690 DataType::Float64,
1691 false,
1692 )]),
1693 files: vec![
1694 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1695 File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1696 File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1697 ],
1698 sort: vec![col("value").sort(true, false)],
1699 expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1700 },
1701 TestCase {
1702 name: "reverse sort",
1703 file_schema: Schema::new(vec![Field::new(
1704 "value".to_string(),
1705 DataType::Float64,
1706 false,
1707 )]),
1708 files: vec![
1709 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1710 File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1711 File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1712 ],
1713 sort: vec![col("value").sort(false, true)],
1714 expected_result: Ok(vec![vec!["1", "0"], vec!["2"]]),
1715 },
1716 TestCase {
1717 name: "nullable sort columns, nulls last",
1718 file_schema: Schema::new(vec![Field::new(
1719 "value".to_string(),
1720 DataType::Float64,
1721 true,
1722 )]),
1723 files: vec![
1724 File::new_nullable(
1725 "0",
1726 "2023-01-01",
1727 vec![Some((Some(0.00), Some(0.49)))],
1728 ),
1729 File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), None))]),
1730 File::new_nullable("2", "2023-01-02", vec![Some((Some(0.00), None))]),
1731 ],
1732 sort: vec![col("value").sort(true, false)],
1733 expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1734 },
1735 TestCase {
1736 name: "nullable sort columns, nulls first",
1737 file_schema: Schema::new(vec![Field::new(
1738 "value".to_string(),
1739 DataType::Float64,
1740 true,
1741 )]),
1742 files: vec![
1743 File::new_nullable("0", "2023-01-01", vec![Some((None, Some(0.49)))]),
1744 File::new_nullable(
1745 "1",
1746 "2023-01-01",
1747 vec![Some((Some(0.50), Some(1.00)))],
1748 ),
1749 File::new_nullable("2", "2023-01-02", vec![Some((None, Some(1.00)))]),
1750 ],
1751 sort: vec![col("value").sort(true, true)],
1752 expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1753 },
1754 TestCase {
1755 name: "all three non-overlapping",
1756 file_schema: Schema::new(vec![Field::new(
1757 "value".to_string(),
1758 DataType::Float64,
1759 false,
1760 )]),
1761 files: vec![
1762 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1763 File::new("1", "2023-01-01", vec![Some((0.50, 0.99))]),
1764 File::new("2", "2023-01-02", vec![Some((1.00, 1.49))]),
1765 ],
1766 sort: vec![col("value").sort(true, false)],
1767 expected_result: Ok(vec![vec!["0", "1", "2"]]),
1768 },
1769 TestCase {
1770 name: "all three overlapping",
1771 file_schema: Schema::new(vec![Field::new(
1772 "value".to_string(),
1773 DataType::Float64,
1774 false,
1775 )]),
1776 files: vec![
1777 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1778 File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
1779 File::new("2", "2023-01-02", vec![Some((0.00, 0.49))]),
1780 ],
1781 sort: vec![col("value").sort(true, false)],
1782 expected_result: Ok(vec![vec!["0"], vec!["1"], vec!["2"]]),
1783 },
1784 TestCase {
1785 name: "empty input",
1786 file_schema: Schema::new(vec![Field::new(
1787 "value".to_string(),
1788 DataType::Float64,
1789 false,
1790 )]),
1791 files: vec![],
1792 sort: vec![col("value").sort(true, false)],
1793 expected_result: Ok(vec![]),
1794 },
1795 TestCase {
1796 name: "one file missing statistics",
1797 file_schema: Schema::new(vec![Field::new(
1798 "value".to_string(),
1799 DataType::Float64,
1800 false,
1801 )]),
1802 files: vec![
1803 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1804 File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
1805 File::new("2", "2023-01-02", vec![None]),
1806 ],
1807 sort: vec![col("value").sort(true, false)],
1808 expected_result: Err(
1809 "construct min/max statistics for split_groups_by_statistics\ncaused by\ncollect min/max values\ncaused by\nget min/max for column: 'value'\ncaused by\nError during planning: statistics not found",
1810 ),
1811 },
1812 ];
1813
1814 for case in cases {
1815 let table_schema = Arc::new(Schema::new(
1816 case.file_schema
1817 .fields()
1818 .clone()
1819 .into_iter()
1820 .cloned()
1821 .chain(Some(Arc::new(Field::new(
1822 "date".to_string(),
1823 DataType::Utf8,
1824 false,
1825 ))))
1826 .collect::<Vec<_>>(),
1827 ));
1828 let Some(sort_order) = LexOrdering::new(
1829 case.sort
1830 .into_iter()
1831 .map(|expr| {
1832 create_physical_sort_expr(
1833 &expr,
1834 &DFSchema::try_from(Arc::clone(&table_schema))?,
1835 &ExecutionProps::default(),
1836 )
1837 })
1838 .collect::<Result<Vec<_>>>()?,
1839 ) else {
1840 return internal_err!("This test should always use an ordering");
1841 };
1842
1843 let partitioned_files = FileGroup::new(
1844 case.files.into_iter().map(From::from).collect::<Vec<_>>(),
1845 );
1846 let result = FileScanConfig::split_groups_by_statistics(
1847 &table_schema,
1848 std::slice::from_ref(&partitioned_files),
1849 &sort_order,
1850 );
1851 let results_by_name = result
1852 .as_ref()
1853 .map(|file_groups| {
1854 file_groups
1855 .iter()
1856 .map(|file_group| {
1857 file_group
1858 .iter()
1859 .map(|file| {
1860 partitioned_files
1861 .iter()
1862 .find_map(|f| {
1863 if f.object_meta == file.object_meta {
1864 Some(
1865 f.object_meta
1866 .location
1867 .as_ref()
1868 .rsplit('/')
1869 .next()
1870 .unwrap()
1871 .trim_end_matches(".parquet"),
1872 )
1873 } else {
1874 None
1875 }
1876 })
1877 .unwrap()
1878 })
1879 .collect::<Vec<_>>()
1880 })
1881 .collect::<Vec<_>>()
1882 })
1883 .map_err(|e| e.strip_backtrace().leak() as &'static str);
1884
1885 assert_eq!(results_by_name, case.expected_result, "{}", case.name);
1886 }
1887
1888 return Ok(());
1889
1890 impl From<File> for PartitionedFile {
1891 fn from(file: File) -> Self {
1892 let object_meta = ObjectMeta {
1893 location: Path::from(format!(
1894 "data/date={}/{}.parquet",
1895 file.date, file.name
1896 )),
1897 last_modified: chrono::Utc.timestamp_nanos(0),
1898 size: 0,
1899 e_tag: None,
1900 version: None,
1901 };
1902 let statistics = Arc::new(Statistics {
1903 num_rows: Precision::Absent,
1904 total_byte_size: Precision::Absent,
1905 column_statistics: file
1906 .statistics
1907 .into_iter()
1908 .map(|stats| {
1909 stats
1910 .map(|(min, max)| ColumnStatistics {
1911 min_value: Precision::Exact(ScalarValue::Float64(
1912 min,
1913 )),
1914 max_value: Precision::Exact(ScalarValue::Float64(
1915 max,
1916 )),
1917 ..Default::default()
1918 })
1919 .unwrap_or_default()
1920 })
1921 .collect::<Vec<_>>(),
1922 });
1923 PartitionedFile::new_from_meta(object_meta)
1924 .with_partition_values(vec![ScalarValue::from(file.date)])
1925 .with_statistics(statistics)
1926 }
1927 }
1928 }
1929
1930 fn config_for_projection(
1932 file_schema: SchemaRef,
1933 projection: Option<Vec<usize>>,
1934 statistics: Statistics,
1935 table_partition_cols: Vec<Field>,
1936 ) -> FileScanConfig {
1937 let table_schema = TableSchema::new(
1938 file_schema,
1939 table_partition_cols.into_iter().map(Arc::new).collect(),
1940 );
1941 FileScanConfigBuilder::new(
1942 ObjectStoreUrl::parse("test:///").unwrap(),
1943 Arc::new(MockSource::new(table_schema.clone())),
1944 )
1945 .with_projection_indices(projection)
1946 .unwrap()
1947 .with_statistics(statistics)
1948 .build()
1949 }
1950
1951 #[test]
1952 fn test_file_scan_config_builder() {
1953 let file_schema = aggr_test_schema();
1954 let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
1955
1956 let table_schema = TableSchema::new(
1957 Arc::clone(&file_schema),
1958 vec![Arc::new(Field::new(
1959 "date",
1960 wrap_partition_type_in_dict(DataType::Utf8),
1961 false,
1962 ))],
1963 );
1964
1965 let file_source: Arc<dyn FileSource> =
1966 Arc::new(MockSource::new(table_schema.clone()));
1967
1968 let builder = FileScanConfigBuilder::new(
1970 object_store_url.clone(),
1971 Arc::clone(&file_source),
1972 );
1973
1974 let config = builder
1976 .with_limit(Some(1000))
1977 .with_projection_indices(Some(vec![0, 1]))
1978 .unwrap()
1979 .with_statistics(Statistics::new_unknown(&file_schema))
1980 .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new(
1981 "test.parquet".to_string(),
1982 1024,
1983 )])])
1984 .with_output_ordering(vec![
1985 [PhysicalSortExpr::new_default(Arc::new(Column::new(
1986 "date", 0,
1987 )))]
1988 .into(),
1989 ])
1990 .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
1991 .build();
1992
1993 assert_eq!(config.object_store_url, object_store_url);
1995 assert_eq!(*config.file_schema(), file_schema);
1996 assert_eq!(config.limit, Some(1000));
1997 assert_eq!(
1998 config
1999 .file_source
2000 .projection()
2001 .as_ref()
2002 .map(|p| p.column_indices()),
2003 Some(vec![0, 1])
2004 );
2005 assert_eq!(config.table_partition_cols().len(), 1);
2006 assert_eq!(config.table_partition_cols()[0].name(), "date");
2007 assert_eq!(config.file_groups.len(), 1);
2008 assert_eq!(config.file_groups[0].len(), 1);
2009 assert_eq!(
2010 config.file_groups[0][0].object_meta.location.as_ref(),
2011 "test.parquet"
2012 );
2013 assert_eq!(
2014 config.file_compression_type,
2015 FileCompressionType::UNCOMPRESSED
2016 );
2017 assert_eq!(config.output_ordering.len(), 1);
2018 }
2019
2020 #[test]
2021 fn equivalence_properties_after_schema_change() {
2022 let file_schema = aggr_test_schema();
2023 let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2024
2025 let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
2026
2027 let file_source: Arc<dyn FileSource> = Arc::new(
2029 MockSource::new(table_schema.clone()).with_filter(Arc::new(BinaryExpr::new(
2030 col("c2", &file_schema).unwrap(),
2031 Operator::Eq,
2032 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2033 ))),
2034 );
2035
2036 let config = FileScanConfigBuilder::new(
2037 object_store_url.clone(),
2038 Arc::clone(&file_source),
2039 )
2040 .with_projection_indices(Some(vec![0, 1, 2]))
2041 .unwrap()
2042 .build();
2043
2044 let exprs = ProjectionExprs::new(vec![ProjectionExpr::new(
2047 col("c1", &file_schema).unwrap(),
2048 "c1",
2049 )]);
2050 let data_source = config
2051 .try_swapping_with_projection(&exprs)
2052 .unwrap()
2053 .unwrap();
2054
2055 let eq_properties = data_source.eq_properties();
2058 let eq_group = eq_properties.eq_group();
2059
2060 for class in eq_group.iter() {
2061 for expr in class.iter() {
2062 if let Some(col) = expr.as_any().downcast_ref::<Column>() {
2063 assert_ne!(
2064 col.name(),
2065 "c2",
2066 "c2 should not be present in any equivalence class"
2067 );
2068 }
2069 }
2070 }
2071 }
2072
2073 #[test]
2074 fn test_file_scan_config_builder_defaults() {
2075 let file_schema = aggr_test_schema();
2076 let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2077
2078 let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
2079
2080 let file_source: Arc<dyn FileSource> =
2081 Arc::new(MockSource::new(table_schema.clone()));
2082
2083 let config = FileScanConfigBuilder::new(
2085 object_store_url.clone(),
2086 Arc::clone(&file_source),
2087 )
2088 .build();
2089
2090 assert_eq!(config.object_store_url, object_store_url);
2092 assert_eq!(*config.file_schema(), file_schema);
2093 assert_eq!(config.limit, None);
2094 let expected_projection: Vec<usize> = (0..file_schema.fields().len()).collect();
2097 assert_eq!(
2098 config
2099 .file_source
2100 .projection()
2101 .as_ref()
2102 .map(|p| p.column_indices()),
2103 Some(expected_projection)
2104 );
2105 assert!(config.table_partition_cols().is_empty());
2106 assert!(config.file_groups.is_empty());
2107 assert_eq!(
2108 config.file_compression_type,
2109 FileCompressionType::UNCOMPRESSED
2110 );
2111 assert!(config.output_ordering.is_empty());
2112 assert!(config.constraints.is_empty());
2113
2114 assert_eq!(config.statistics().num_rows, Precision::Absent);
2116 assert_eq!(config.statistics().total_byte_size, Precision::Absent);
2117 assert_eq!(
2118 config.statistics().column_statistics.len(),
2119 file_schema.fields().len()
2120 );
2121 for stat in config.statistics().column_statistics {
2122 assert_eq!(stat.distinct_count, Precision::Absent);
2123 assert_eq!(stat.min_value, Precision::Absent);
2124 assert_eq!(stat.max_value, Precision::Absent);
2125 assert_eq!(stat.null_count, Precision::Absent);
2126 }
2127 }
2128
2129 #[test]
2130 fn test_file_scan_config_builder_new_from() {
2131 let schema = aggr_test_schema();
2132 let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2133 let partition_cols = vec![Field::new(
2134 "date",
2135 wrap_partition_type_in_dict(DataType::Utf8),
2136 false,
2137 )];
2138 let file = PartitionedFile::new("test_file.parquet", 100);
2139
2140 let table_schema = TableSchema::new(
2141 Arc::clone(&schema),
2142 partition_cols.iter().map(|f| Arc::new(f.clone())).collect(),
2143 );
2144
2145 let file_source: Arc<dyn FileSource> =
2146 Arc::new(MockSource::new(table_schema.clone()));
2147
2148 let original_config = FileScanConfigBuilder::new(
2150 object_store_url.clone(),
2151 Arc::clone(&file_source),
2152 )
2153 .with_projection_indices(Some(vec![0, 2]))
2154 .unwrap()
2155 .with_limit(Some(10))
2156 .with_file(file.clone())
2157 .with_constraints(Constraints::default())
2158 .build();
2159
2160 let new_builder = FileScanConfigBuilder::from(original_config);
2162
2163 let new_config = new_builder.build();
2165
2166 let partition_cols = partition_cols.into_iter().map(Arc::new).collect::<Vec<_>>();
2168 assert_eq!(new_config.object_store_url, object_store_url);
2169 assert_eq!(*new_config.file_schema(), schema);
2170 assert_eq!(
2171 new_config
2172 .file_source
2173 .projection()
2174 .as_ref()
2175 .map(|p| p.column_indices()),
2176 Some(vec![0, 2])
2177 );
2178 assert_eq!(new_config.limit, Some(10));
2179 assert_eq!(*new_config.table_partition_cols(), partition_cols);
2180 assert_eq!(new_config.file_groups.len(), 1);
2181 assert_eq!(new_config.file_groups[0].len(), 1);
2182 assert_eq!(
2183 new_config.file_groups[0][0].object_meta.location.as_ref(),
2184 "test_file.parquet"
2185 );
2186 assert_eq!(new_config.constraints, Constraints::default());
2187 }
2188
2189 #[test]
2190 fn test_split_groups_by_statistics_with_target_partitions() -> Result<()> {
2191 use datafusion_common::DFSchema;
2192 use datafusion_expr::{col, execution_props::ExecutionProps};
2193
2194 let schema = Arc::new(Schema::new(vec![Field::new(
2195 "value",
2196 DataType::Float64,
2197 false,
2198 )]));
2199
2200 let exec_props = ExecutionProps::new();
2202 let df_schema = DFSchema::try_from_qualified_schema("test", schema.as_ref())?;
2203 let sort_expr = [col("value").sort(true, false)];
2204 let sort_ordering = sort_expr
2205 .map(|expr| {
2206 create_physical_sort_expr(&expr, &df_schema, &exec_props).unwrap()
2207 })
2208 .into();
2209
2210 struct TestCase {
2212 name: String,
2213 file_count: usize,
2214 overlap_factor: f64,
2215 target_partitions: usize,
2216 expected_partition_count: usize,
2217 }
2218
2219 let test_cases = vec![
2220 TestCase {
2222 name: "no_overlap_10_files_4_partitions".to_string(),
2223 file_count: 10,
2224 overlap_factor: 0.0,
2225 target_partitions: 4,
2226 expected_partition_count: 4,
2227 },
2228 TestCase {
2229 name: "medium_overlap_20_files_5_partitions".to_string(),
2230 file_count: 20,
2231 overlap_factor: 0.5,
2232 target_partitions: 5,
2233 expected_partition_count: 5,
2234 },
2235 TestCase {
2236 name: "high_overlap_30_files_3_partitions".to_string(),
2237 file_count: 30,
2238 overlap_factor: 0.8,
2239 target_partitions: 3,
2240 expected_partition_count: 7,
2241 },
2242 TestCase {
2244 name: "fewer_files_than_partitions".to_string(),
2245 file_count: 3,
2246 overlap_factor: 0.0,
2247 target_partitions: 10,
2248 expected_partition_count: 3, },
2250 TestCase {
2251 name: "single_file".to_string(),
2252 file_count: 1,
2253 overlap_factor: 0.0,
2254 target_partitions: 5,
2255 expected_partition_count: 1, },
2257 TestCase {
2258 name: "empty_files".to_string(),
2259 file_count: 0,
2260 overlap_factor: 0.0,
2261 target_partitions: 3,
2262 expected_partition_count: 0, },
2264 ];
2265
2266 for case in test_cases {
2267 println!("Running test case: {}", case.name);
2268
2269 let file_groups = generate_test_files(case.file_count, case.overlap_factor);
2271
2272 let result =
2274 FileScanConfig::split_groups_by_statistics_with_target_partitions(
2275 &schema,
2276 &file_groups,
2277 &sort_ordering,
2278 case.target_partitions,
2279 )?;
2280
2281 println!(
2283 "Created {} partitions (target was {})",
2284 result.len(),
2285 case.target_partitions
2286 );
2287
2288 assert_eq!(
2290 result.len(),
2291 case.expected_partition_count,
2292 "Case '{}': Unexpected partition count",
2293 case.name
2294 );
2295
2296 assert!(
2298 verify_sort_integrity(&result),
2299 "Case '{}': Files within partitions are not properly ordered",
2300 case.name
2301 );
2302
2303 if case.file_count > 1 && case.expected_partition_count > 1 {
2305 let group_sizes: Vec<usize> = result.iter().map(FileGroup::len).collect();
2306 let max_size = *group_sizes.iter().max().unwrap();
2307 let min_size = *group_sizes.iter().min().unwrap();
2308
2309 let avg_files_per_partition =
2311 case.file_count as f64 / case.expected_partition_count as f64;
2312 assert!(
2313 (max_size as f64) < 2.0 * avg_files_per_partition,
2314 "Case '{}': Unbalanced distribution. Max partition size {} exceeds twice the average {}",
2315 case.name,
2316 max_size,
2317 avg_files_per_partition
2318 );
2319
2320 println!("Distribution - min files: {min_size}, max files: {max_size}");
2321 }
2322 }
2323
2324 let empty_groups: Vec<FileGroup> = vec![];
2326 let err = FileScanConfig::split_groups_by_statistics_with_target_partitions(
2327 &schema,
2328 &empty_groups,
2329 &sort_ordering,
2330 0,
2331 )
2332 .unwrap_err();
2333
2334 assert!(
2335 err.to_string()
2336 .contains("target_partitions must be greater than 0"),
2337 "Expected error for zero target partitions"
2338 );
2339
2340 Ok(())
2341 }
2342
2343 #[test]
2344 fn test_partition_statistics_projection() {
2345 use crate::source::DataSourceExec;
2351 use datafusion_physical_plan::ExecutionPlan;
2352
2353 let schema = Arc::new(Schema::new(vec![
2355 Field::new("col0", DataType::Int32, false),
2356 Field::new("col1", DataType::Int32, false),
2357 Field::new("col2", DataType::Int32, false),
2358 Field::new("col3", DataType::Int32, false),
2359 ]));
2360
2361 let file_group_stats = Statistics {
2363 num_rows: Precision::Exact(100),
2364 total_byte_size: Precision::Exact(1024),
2365 column_statistics: vec![
2366 ColumnStatistics {
2367 null_count: Precision::Exact(0),
2368 ..ColumnStatistics::new_unknown()
2369 },
2370 ColumnStatistics {
2371 null_count: Precision::Exact(5),
2372 ..ColumnStatistics::new_unknown()
2373 },
2374 ColumnStatistics {
2375 null_count: Precision::Exact(10),
2376 ..ColumnStatistics::new_unknown()
2377 },
2378 ColumnStatistics {
2379 null_count: Precision::Exact(15),
2380 ..ColumnStatistics::new_unknown()
2381 },
2382 ],
2383 };
2384
2385 let file_group = FileGroup::new(vec![PartitionedFile::new("test.parquet", 1024)])
2387 .with_statistics(Arc::new(file_group_stats));
2388
2389 let table_schema = TableSchema::new(Arc::clone(&schema), vec![]);
2390
2391 let config = FileScanConfigBuilder::new(
2393 ObjectStoreUrl::parse("test:///").unwrap(),
2394 Arc::new(MockSource::new(table_schema.clone())),
2395 )
2396 .with_projection_indices(Some(vec![0, 2]))
2397 .unwrap() .with_file_groups(vec![file_group])
2399 .build();
2400
2401 let exec = DataSourceExec::from_data_source(config);
2403
2404 let partition_stats = exec.partition_statistics(Some(0)).unwrap();
2406
2407 assert_eq!(
2409 partition_stats.column_statistics.len(),
2410 2,
2411 "Expected 2 column statistics (projected), but got {}",
2412 partition_stats.column_statistics.len()
2413 );
2414
2415 assert_eq!(
2417 partition_stats.column_statistics[0].null_count,
2418 Precision::Exact(0),
2419 "First projected column should be col0 with 0 nulls"
2420 );
2421 assert_eq!(
2422 partition_stats.column_statistics[1].null_count,
2423 Precision::Exact(10),
2424 "Second projected column should be col2 with 10 nulls"
2425 );
2426
2427 assert_eq!(partition_stats.num_rows, Precision::Exact(100));
2429 assert_eq!(partition_stats.total_byte_size, Precision::Exact(800));
2430 }
2431
2432 #[test]
2433 fn test_output_partitioning_not_partitioned_by_file_group() {
2434 let file_schema = aggr_test_schema();
2435 let partition_col =
2436 Field::new("date", wrap_partition_type_in_dict(DataType::Utf8), false);
2437
2438 let config = config_for_projection(
2439 Arc::clone(&file_schema),
2440 None,
2441 Statistics::new_unknown(&file_schema),
2442 vec![partition_col],
2443 );
2444
2445 let partitioning = config.output_partitioning();
2447 assert!(matches!(partitioning, Partitioning::UnknownPartitioning(_)));
2448 }
2449
2450 #[test]
2451 fn test_output_partitioning_no_partition_columns() {
2452 let file_schema = aggr_test_schema();
2453 let mut config = config_for_projection(
2454 Arc::clone(&file_schema),
2455 None,
2456 Statistics::new_unknown(&file_schema),
2457 vec![], );
2459 config.partitioned_by_file_group = true;
2460
2461 let partitioning = config.output_partitioning();
2462 assert!(matches!(partitioning, Partitioning::UnknownPartitioning(_)));
2463 }
2464
2465 #[test]
2466 fn test_output_partitioning_with_partition_columns() {
2467 let file_schema = aggr_test_schema();
2468
2469 let single_partition_col = vec![Field::new(
2471 "date",
2472 wrap_partition_type_in_dict(DataType::Utf8),
2473 false,
2474 )];
2475
2476 let mut config = config_for_projection(
2477 Arc::clone(&file_schema),
2478 None,
2479 Statistics::new_unknown(&file_schema),
2480 single_partition_col,
2481 );
2482 config.partitioned_by_file_group = true;
2483 config.file_groups = vec![
2484 FileGroup::new(vec![PartitionedFile::new("f1.parquet".to_string(), 1024)]),
2485 FileGroup::new(vec![PartitionedFile::new("f2.parquet".to_string(), 1024)]),
2486 FileGroup::new(vec![PartitionedFile::new("f3.parquet".to_string(), 1024)]),
2487 ];
2488
2489 let partitioning = config.output_partitioning();
2490 match partitioning {
2491 Partitioning::Hash(exprs, num_partitions) => {
2492 assert_eq!(num_partitions, 3);
2493 assert_eq!(exprs.len(), 1);
2494 assert_eq!(
2495 exprs[0].as_any().downcast_ref::<Column>().unwrap().name(),
2496 "date"
2497 );
2498 }
2499 _ => panic!("Expected Hash partitioning"),
2500 }
2501
2502 let multiple_partition_cols = vec![
2504 Field::new("year", wrap_partition_type_in_dict(DataType::Utf8), false),
2505 Field::new("month", wrap_partition_type_in_dict(DataType::Utf8), false),
2506 ];
2507
2508 config = config_for_projection(
2509 Arc::clone(&file_schema),
2510 None,
2511 Statistics::new_unknown(&file_schema),
2512 multiple_partition_cols,
2513 );
2514 config.partitioned_by_file_group = true;
2515 config.file_groups = vec![
2516 FileGroup::new(vec![PartitionedFile::new("f1.parquet".to_string(), 1024)]),
2517 FileGroup::new(vec![PartitionedFile::new("f2.parquet".to_string(), 1024)]),
2518 ];
2519
2520 let partitioning = config.output_partitioning();
2521 match partitioning {
2522 Partitioning::Hash(exprs, num_partitions) => {
2523 assert_eq!(num_partitions, 2);
2524 assert_eq!(exprs.len(), 2);
2525 let col_names: Vec<_> = exprs
2526 .iter()
2527 .map(|e| e.as_any().downcast_ref::<Column>().unwrap().name())
2528 .collect();
2529 assert_eq!(col_names, vec!["year", "month"]);
2530 }
2531 _ => panic!("Expected Hash partitioning"),
2532 }
2533 }
2534
2535 #[test]
2536 fn try_pushdown_sort_reverses_file_groups_only_when_requested_is_reverse()
2537 -> Result<()> {
2538 let file_schema =
2539 Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)]));
2540
2541 let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
2542 let file_source = Arc::new(InexactSortPushdownSource::new(table_schema));
2543
2544 let file_groups = vec![FileGroup::new(vec![
2545 PartitionedFile::new("file1", 1),
2546 PartitionedFile::new("file2", 1),
2547 ])];
2548
2549 let sort_expr_asc = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
2550 let config =
2551 FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
2552 .with_file_groups(file_groups)
2553 .with_output_ordering(vec![
2554 LexOrdering::new(vec![sort_expr_asc.clone()]).unwrap(),
2555 ])
2556 .build();
2557
2558 let requested_asc = vec![sort_expr_asc.clone()];
2559 let result = config.try_pushdown_sort(&requested_asc)?;
2560 let SortOrderPushdownResult::Inexact { inner } = result else {
2561 panic!("Expected Inexact result");
2562 };
2563 let pushed_config = inner
2564 .as_any()
2565 .downcast_ref::<FileScanConfig>()
2566 .expect("Expected FileScanConfig");
2567 let pushed_files = pushed_config.file_groups[0].files();
2568 assert_eq!(pushed_files[0].object_meta.location.as_ref(), "file1");
2569 assert_eq!(pushed_files[1].object_meta.location.as_ref(), "file2");
2570
2571 let requested_desc = vec![sort_expr_asc.reverse()];
2572 let result = config.try_pushdown_sort(&requested_desc)?;
2573 let SortOrderPushdownResult::Inexact { inner } = result else {
2574 panic!("Expected Inexact result");
2575 };
2576 let pushed_config = inner
2577 .as_any()
2578 .downcast_ref::<FileScanConfig>()
2579 .expect("Expected FileScanConfig");
2580 let pushed_files = pushed_config.file_groups[0].files();
2581 assert_eq!(pushed_files[0].object_meta.location.as_ref(), "file2");
2582 assert_eq!(pushed_files[1].object_meta.location.as_ref(), "file1");
2583
2584 Ok(())
2585 }
2586}