1use crate::file_groups::FileGroup;
22use crate::{
23 PartitionedFile, display::FileGroupsDisplay, file::FileSource,
24 file_compression_type::FileCompressionType, file_stream::FileStream,
25 source::DataSource, statistics::MinMaxStatistics,
26};
27use arrow::datatypes::FieldRef;
28use arrow::datatypes::{DataType, Schema, SchemaRef};
29use datafusion_common::config::ConfigOptions;
30use datafusion_common::{
31 Constraints, Result, ScalarValue, Statistics, internal_datafusion_err, internal_err,
32};
33use datafusion_execution::{
34 SendableRecordBatchStream, TaskContext, object_store::ObjectStoreUrl,
35};
36use datafusion_expr::Operator;
37
38use datafusion_physical_expr::equivalence::project_orderings;
39use datafusion_physical_expr::expressions::{BinaryExpr, Column};
40use datafusion_physical_expr::projection::ProjectionExprs;
41use datafusion_physical_expr::utils::reassign_expr_columns;
42use datafusion_physical_expr::{EquivalenceProperties, Partitioning, split_conjunction};
43use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
44use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
45use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
46use datafusion_physical_plan::SortOrderPushdownResult;
47use datafusion_physical_plan::coop::cooperative;
48use datafusion_physical_plan::execution_plan::SchedulingType;
49use datafusion_physical_plan::{
50 DisplayAs, DisplayFormatType,
51 display::{ProjectSchemaDisplay, display_orderings},
52 filter_pushdown::FilterPushdownPropagation,
53 metrics::ExecutionPlanMetricsSet,
54};
55use log::{debug, warn};
56use std::{any::Any, fmt::Debug, fmt::Formatter, fmt::Result as FmtResult, sync::Arc};
57
58#[derive(Clone)]
127pub struct FileScanConfig {
128 pub object_store_url: ObjectStoreUrl,
140 pub file_groups: Vec<FileGroup>,
150 pub constraints: Constraints,
152 pub limit: Option<usize>,
155 pub output_ordering: Vec<LexOrdering>,
157 pub file_compression_type: FileCompressionType,
159 pub file_source: Arc<dyn FileSource>,
161 pub batch_size: Option<usize>,
164 pub expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
167 pub(crate) statistics: Statistics,
174 pub partitioned_by_file_group: bool,
182}
183
184#[derive(Clone)]
239pub struct FileScanConfigBuilder {
240 object_store_url: ObjectStoreUrl,
241 file_source: Arc<dyn FileSource>,
242 limit: Option<usize>,
243 constraints: Option<Constraints>,
244 file_groups: Vec<FileGroup>,
245 statistics: Option<Statistics>,
246 output_ordering: Vec<LexOrdering>,
247 file_compression_type: Option<FileCompressionType>,
248 batch_size: Option<usize>,
249 expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
250 partitioned_by_file_group: bool,
251}
252
253impl FileScanConfigBuilder {
254 pub fn new(
261 object_store_url: ObjectStoreUrl,
262 file_source: Arc<dyn FileSource>,
263 ) -> Self {
264 Self {
265 object_store_url,
266 file_source,
267 file_groups: vec![],
268 statistics: None,
269 output_ordering: vec![],
270 file_compression_type: None,
271 limit: None,
272 constraints: None,
273 batch_size: None,
274 expr_adapter_factory: None,
275 partitioned_by_file_group: false,
276 }
277 }
278
279 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
282 self.limit = limit;
283 self
284 }
285
286 pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
291 self.file_source = file_source;
292 self
293 }
294
295 pub fn table_schema(&self) -> &SchemaRef {
296 self.file_source.table_schema().table_schema()
297 }
298
299 #[deprecated(since = "51.0.0", note = "Use with_projection_indices instead")]
305 pub fn with_projection(self, indices: Option<Vec<usize>>) -> Self {
306 match self.clone().with_projection_indices(indices) {
307 Ok(builder) => builder,
308 Err(e) => {
309 warn!(
310 "Failed to push down projection in FileScanConfigBuilder::with_projection: {e}"
311 );
312 self
313 }
314 }
315 }
316
317 pub fn with_projection_indices(
321 mut self,
322 indices: Option<Vec<usize>>,
323 ) -> Result<Self> {
324 let projection_exprs = indices.map(|indices| {
325 ProjectionExprs::from_indices(
326 &indices,
327 self.file_source.table_schema().table_schema(),
328 )
329 });
330 let Some(projection_exprs) = projection_exprs else {
331 return Ok(self);
332 };
333 let new_source = self
334 .file_source
335 .try_pushdown_projection(&projection_exprs)
336 .map_err(|e| {
337 internal_datafusion_err!(
338 "Failed to push down projection in FileScanConfigBuilder::build: {e}"
339 )
340 })?;
341 if let Some(new_source) = new_source {
342 self.file_source = new_source;
343 } else {
344 internal_err!(
345 "FileSource {} does not support projection pushdown",
346 self.file_source.file_type()
347 )?;
348 }
349 Ok(self)
350 }
351
352 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
354 self.constraints = Some(constraints);
355 self
356 }
357
358 pub fn with_statistics(mut self, statistics: Statistics) -> Self {
361 self.statistics = Some(statistics);
362 self
363 }
364
365 pub fn with_file_groups(mut self, file_groups: Vec<FileGroup>) -> Self {
375 self.file_groups = file_groups;
376 self
377 }
378
379 pub fn with_file_group(mut self, file_group: FileGroup) -> Self {
383 self.file_groups.push(file_group);
384 self
385 }
386
387 pub fn with_file(self, partitioned_file: PartitionedFile) -> Self {
391 self.with_file_group(FileGroup::new(vec![partitioned_file]))
392 }
393
394 pub fn with_output_ordering(mut self, output_ordering: Vec<LexOrdering>) -> Self {
396 self.output_ordering = output_ordering;
397 self
398 }
399
400 pub fn with_file_compression_type(
402 mut self,
403 file_compression_type: FileCompressionType,
404 ) -> Self {
405 self.file_compression_type = Some(file_compression_type);
406 self
407 }
408
409 pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
411 self.batch_size = batch_size;
412 self
413 }
414
415 pub fn with_expr_adapter(
422 mut self,
423 expr_adapter: Option<Arc<dyn PhysicalExprAdapterFactory>>,
424 ) -> Self {
425 self.expr_adapter_factory = expr_adapter;
426 self
427 }
428
429 pub fn with_partitioned_by_file_group(
434 mut self,
435 partitioned_by_file_group: bool,
436 ) -> Self {
437 self.partitioned_by_file_group = partitioned_by_file_group;
438 self
439 }
440
441 pub fn build(self) -> FileScanConfig {
449 let Self {
450 object_store_url,
451 file_source,
452 limit,
453 constraints,
454 file_groups,
455 statistics,
456 output_ordering,
457 file_compression_type,
458 batch_size,
459 expr_adapter_factory: expr_adapter,
460 partitioned_by_file_group,
461 } = self;
462
463 let constraints = constraints.unwrap_or_default();
464 let statistics = statistics.unwrap_or_else(|| {
465 Statistics::new_unknown(file_source.table_schema().table_schema())
466 });
467 let file_compression_type =
468 file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED);
469
470 FileScanConfig {
471 object_store_url,
472 file_source,
473 limit,
474 constraints,
475 file_groups,
476 output_ordering,
477 file_compression_type,
478 batch_size,
479 expr_adapter_factory: expr_adapter,
480 statistics,
481 partitioned_by_file_group,
482 }
483 }
484}
485
486impl From<FileScanConfig> for FileScanConfigBuilder {
487 fn from(config: FileScanConfig) -> Self {
488 Self {
489 object_store_url: config.object_store_url,
490 file_source: Arc::<dyn FileSource>::clone(&config.file_source),
491 file_groups: config.file_groups,
492 statistics: Some(config.statistics),
493 output_ordering: config.output_ordering,
494 file_compression_type: Some(config.file_compression_type),
495 limit: config.limit,
496 constraints: Some(config.constraints),
497 batch_size: config.batch_size,
498 expr_adapter_factory: config.expr_adapter_factory,
499 partitioned_by_file_group: config.partitioned_by_file_group,
500 }
501 }
502}
503
504impl DataSource for FileScanConfig {
505 fn open(
506 &self,
507 partition: usize,
508 context: Arc<TaskContext>,
509 ) -> Result<SendableRecordBatchStream> {
510 let object_store = context.runtime_env().object_store(&self.object_store_url)?;
511 let batch_size = self
512 .batch_size
513 .unwrap_or_else(|| context.session_config().batch_size());
514
515 let source = self.file_source.with_batch_size(batch_size);
516
517 let opener = source.create_file_opener(object_store, self, partition)?;
518
519 let stream = FileStream::new(self, partition, opener, source.metrics())?;
520 Ok(Box::pin(cooperative(stream)))
521 }
522
523 fn as_any(&self) -> &dyn Any {
524 self
525 }
526
527 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
528 match t {
529 DisplayFormatType::Default | DisplayFormatType::Verbose => {
530 let schema = self.projected_schema().map_err(|_| std::fmt::Error {})?;
531 let orderings = get_projected_output_ordering(self, &schema);
532
533 write!(f, "file_groups=")?;
534 FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
535
536 if !schema.fields().is_empty() {
537 if let Some(projection) = self.file_source.projection() {
538 let expr: Vec<String> = projection
541 .as_ref()
542 .iter()
543 .map(|proj_expr| {
544 if let Some(column) =
545 proj_expr.expr.as_any().downcast_ref::<Column>()
546 {
547 if column.name() == proj_expr.alias {
548 column.name().to_string()
549 } else {
550 format!(
551 "{} as {}",
552 proj_expr.expr, proj_expr.alias
553 )
554 }
555 } else {
556 format!("{} as {}", proj_expr.expr, proj_expr.alias)
557 }
558 })
559 .collect();
560 write!(f, ", projection=[{}]", expr.join(", "))?;
561 } else {
562 write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
563 }
564 }
565
566 if let Some(limit) = self.limit {
567 write!(f, ", limit={limit}")?;
568 }
569
570 display_orderings(f, &orderings)?;
571
572 if !self.constraints.is_empty() {
573 write!(f, ", {}", self.constraints)?;
574 }
575
576 self.fmt_file_source(t, f)
577 }
578 DisplayFormatType::TreeRender => {
579 writeln!(f, "format={}", self.file_source.file_type())?;
580 self.file_source.fmt_extra(t, f)?;
581 let num_files = self.file_groups.iter().map(|fg| fg.len()).sum::<usize>();
582 writeln!(f, "files={num_files}")?;
583 Ok(())
584 }
585 }
586 }
587
588 fn repartitioned(
590 &self,
591 target_partitions: usize,
592 repartition_file_min_size: usize,
593 output_ordering: Option<LexOrdering>,
594 ) -> Result<Option<Arc<dyn DataSource>>> {
595 if self.partitioned_by_file_group {
599 return Ok(None);
600 }
601
602 let source = self.file_source.repartitioned(
603 target_partitions,
604 repartition_file_min_size,
605 output_ordering,
606 self,
607 )?;
608
609 Ok(source.map(|s| Arc::new(s) as _))
610 }
611
612 fn output_partitioning(&self) -> Partitioning {
629 if self.partitioned_by_file_group {
630 let partition_cols = self.table_partition_cols();
631 if !partition_cols.is_empty() {
632 let projected_schema = match self.projected_schema() {
633 Ok(schema) => schema,
634 Err(_) => {
635 debug!(
636 "Could not get projected schema, falling back to UnknownPartitioning."
637 );
638 return Partitioning::UnknownPartitioning(self.file_groups.len());
639 }
640 };
641
642 let mut exprs: Vec<Arc<dyn PhysicalExpr>> = Vec::new();
645 for partition_col in partition_cols {
646 if let Some((idx, _)) = projected_schema
647 .fields()
648 .iter()
649 .enumerate()
650 .find(|(_, f)| f.name() == partition_col.name())
651 {
652 exprs.push(Arc::new(Column::new(partition_col.name(), idx)));
653 }
654 }
655
656 if exprs.len() == partition_cols.len() {
657 return Partitioning::Hash(exprs, self.file_groups.len());
658 }
659 }
660 }
661 Partitioning::UnknownPartitioning(self.file_groups.len())
662 }
663
664 fn eq_properties(&self) -> EquivalenceProperties {
665 let schema = self.file_source.table_schema().table_schema();
666 let mut eq_properties = EquivalenceProperties::new_with_orderings(
667 Arc::clone(schema),
668 self.validated_output_ordering(),
669 )
670 .with_constraints(self.constraints.clone());
671
672 if let Some(filter) = self.file_source.filter() {
673 match Self::add_filter_equivalence_info(&filter, &mut eq_properties, schema) {
676 Ok(()) => {}
677 Err(e) => {
678 warn!("Failed to add filter equivalence info: {e}");
679 #[cfg(debug_assertions)]
680 panic!("Failed to add filter equivalence info: {e}");
681 }
682 }
683 }
684
685 if let Some(projection) = self.file_source.projection() {
686 match (
687 projection.project_schema(schema),
688 projection.projection_mapping(schema),
689 ) {
690 (Ok(output_schema), Ok(mapping)) => {
691 eq_properties =
692 eq_properties.project(&mapping, Arc::new(output_schema));
693 }
694 (Err(e), _) | (_, Err(e)) => {
695 warn!("Failed to project equivalence properties: {e}");
696 #[cfg(debug_assertions)]
697 panic!("Failed to project equivalence properties: {e}");
698 }
699 }
700 }
701
702 eq_properties
703 }
704
705 fn scheduling_type(&self) -> SchedulingType {
706 SchedulingType::Cooperative
707 }
708
709 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
710 if let Some(partition) = partition {
711 if let Some(file_group) = self.file_groups.get(partition)
714 && let Some(stat) = file_group.file_statistics(None)
715 {
716 let output_schema = self.projected_schema()?;
718 return if let Some(projection) = self.file_source.projection() {
719 projection.project_statistics(stat.clone(), &output_schema)
720 } else {
721 Ok(stat.clone())
722 };
723 }
724 Ok(Statistics::new_unknown(self.projected_schema()?.as_ref()))
726 } else {
727 let statistics = self.statistics();
729 let projection = self.file_source.projection();
730 let output_schema = self.projected_schema()?;
731 if let Some(projection) = &projection {
732 projection.project_statistics(statistics.clone(), &output_schema)
733 } else {
734 Ok(statistics)
735 }
736 }
737 }
738
739 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
740 let source = FileScanConfigBuilder::from(self.clone())
741 .with_limit(limit)
742 .build();
743 Some(Arc::new(source))
744 }
745
746 fn fetch(&self) -> Option<usize> {
747 self.limit
748 }
749
750 fn metrics(&self) -> ExecutionPlanMetricsSet {
751 self.file_source.metrics().clone()
752 }
753
754 fn try_swapping_with_projection(
755 &self,
756 projection: &ProjectionExprs,
757 ) -> Result<Option<Arc<dyn DataSource>>> {
758 match self.file_source.try_pushdown_projection(projection)? {
759 Some(new_source) => {
760 let mut new_file_scan_config = self.clone();
761 new_file_scan_config.file_source = new_source;
762 Ok(Some(Arc::new(new_file_scan_config) as Arc<dyn DataSource>))
763 }
764 None => Ok(None),
765 }
766 }
767
768 fn try_pushdown_filters(
769 &self,
770 filters: Vec<Arc<dyn PhysicalExpr>>,
771 config: &ConfigOptions,
772 ) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
773 let table_schema = self.file_source.table_schema().table_schema();
781 let filters_to_remap = if let Some(projection) = self.file_source.projection() {
784 use datafusion_physical_plan::projection::update_expr;
785 filters
786 .into_iter()
787 .map(|filter| {
788 update_expr(&filter, projection.as_ref(), true)?.ok_or_else(|| {
789 internal_datafusion_err!(
790 "Failed to map filter expression through projection: {}",
791 filter
792 )
793 })
794 })
795 .collect::<Result<Vec<_>>>()?
796 } else {
797 filters
798 };
799 let remapped_filters: Result<Vec<_>> = filters_to_remap
801 .into_iter()
802 .map(|filter| reassign_expr_columns(filter, table_schema.as_ref()))
803 .collect();
804 let remapped_filters = remapped_filters?;
805
806 let result = self
807 .file_source
808 .try_pushdown_filters(remapped_filters, config)?;
809 match result.updated_node {
810 Some(new_file_source) => {
811 let mut new_file_scan_config = self.clone();
812 new_file_scan_config.file_source = new_file_source;
813 Ok(FilterPushdownPropagation {
814 filters: result.filters,
815 updated_node: Some(Arc::new(new_file_scan_config) as _),
816 })
817 }
818 None => {
819 Ok(FilterPushdownPropagation {
821 filters: result.filters,
822 updated_node: None,
823 })
824 }
825 }
826 }
827
828 fn try_pushdown_sort(
829 &self,
830 order: &[PhysicalSortExpr],
831 ) -> Result<SortOrderPushdownResult<Arc<dyn DataSource>>> {
832 let pushdown_result = self
834 .file_source
835 .try_reverse_output(order, &self.eq_properties())?;
836
837 match pushdown_result {
838 SortOrderPushdownResult::Exact { inner } => {
839 Ok(SortOrderPushdownResult::Exact {
840 inner: self.rebuild_with_source(inner, true)?,
841 })
842 }
843 SortOrderPushdownResult::Inexact { inner } => {
844 Ok(SortOrderPushdownResult::Inexact {
845 inner: self.rebuild_with_source(inner, false)?,
846 })
847 }
848 SortOrderPushdownResult::Unsupported => {
849 Ok(SortOrderPushdownResult::Unsupported)
850 }
851 }
852 }
853}
854
855impl FileScanConfig {
856 fn validated_output_ordering(&self) -> Vec<LexOrdering> {
886 let schema = self.file_source.table_schema().table_schema();
887 validate_orderings(&self.output_ordering, schema, &self.file_groups, None)
888 }
889
890 pub fn file_schema(&self) -> &SchemaRef {
892 self.file_source.table_schema().file_schema()
893 }
894
895 pub fn table_partition_cols(&self) -> &Vec<FieldRef> {
897 self.file_source.table_schema().table_partition_cols()
898 }
899
900 pub fn statistics(&self) -> Statistics {
906 if self.file_source.filter().is_some() {
907 self.statistics.clone().to_inexact()
908 } else {
909 self.statistics.clone()
910 }
911 }
912
913 pub fn projected_schema(&self) -> Result<Arc<Schema>> {
914 let schema = self.file_source.table_schema().table_schema();
915 match self.file_source.projection() {
916 Some(proj) => Ok(Arc::new(proj.project_schema(schema)?)),
917 None => Ok(Arc::clone(schema)),
918 }
919 }
920
921 fn add_filter_equivalence_info(
922 filter: &Arc<dyn PhysicalExpr>,
923 eq_properties: &mut EquivalenceProperties,
924 schema: &Schema,
925 ) -> Result<()> {
926 let equal_pairs = split_conjunction(filter).into_iter().filter_map(|expr| {
928 reassign_expr_columns(Arc::clone(expr), schema)
931 .ok()
932 .and_then(|expr| match expr.as_any().downcast_ref::<BinaryExpr>() {
933 Some(expr) if expr.op() == &Operator::Eq => {
934 Some((Arc::clone(expr.left()), Arc::clone(expr.right())))
935 }
936 _ => None,
937 })
938 });
939
940 for (lhs, rhs) in equal_pairs {
941 eq_properties.add_equal_conditions(lhs, rhs)?
942 }
943
944 Ok(())
945 }
946
947 #[deprecated(
956 since = "52.0.0",
957 note = "newlines_in_values has moved to CsvSource. Access it via CsvSource::csv_options().newlines_in_values instead. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
958 )]
959 pub fn newlines_in_values(&self) -> bool {
960 false
961 }
962
963 #[deprecated(
964 since = "52.0.0",
965 note = "This method is no longer used, use eq_properties instead. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
966 )]
967 pub fn projected_constraints(&self) -> Constraints {
968 let props = self.eq_properties();
969 props.constraints().clone()
970 }
971
972 #[deprecated(
973 since = "52.0.0",
974 note = "This method is no longer used, use eq_properties instead. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
975 )]
976 pub fn file_column_projection_indices(&self) -> Option<Vec<usize>> {
977 #[expect(deprecated)]
978 self.file_source.projection().as_ref().map(|p| {
979 p.ordered_column_indices()
980 .into_iter()
981 .filter(|&i| i < self.file_schema().fields().len())
982 .collect::<Vec<_>>()
983 })
984 }
985
986 pub fn split_groups_by_statistics_with_target_partitions(
1008 table_schema: &SchemaRef,
1009 file_groups: &[FileGroup],
1010 sort_order: &LexOrdering,
1011 target_partitions: usize,
1012 ) -> Result<Vec<FileGroup>> {
1013 if target_partitions == 0 {
1014 return Err(internal_datafusion_err!(
1015 "target_partitions must be greater than 0"
1016 ));
1017 }
1018
1019 let flattened_files = file_groups
1020 .iter()
1021 .flat_map(FileGroup::iter)
1022 .collect::<Vec<_>>();
1023
1024 if flattened_files.is_empty() {
1025 return Ok(vec![]);
1026 }
1027
1028 let statistics = MinMaxStatistics::new_from_files(
1029 sort_order,
1030 table_schema,
1031 None,
1032 flattened_files.iter().copied(),
1033 )?;
1034
1035 let indices_sorted_by_min = statistics.min_values_sorted();
1036
1037 let mut file_groups_indices: Vec<Vec<usize>> = vec![vec![]; target_partitions];
1039
1040 for (idx, min) in indices_sorted_by_min {
1041 if let Some((_, group)) = file_groups_indices
1042 .iter_mut()
1043 .enumerate()
1044 .filter(|(_, group)| {
1045 group.is_empty()
1046 || min
1047 > statistics
1048 .max(*group.last().expect("groups should not be empty"))
1049 })
1050 .min_by_key(|(_, group)| group.len())
1051 {
1052 group.push(idx);
1053 } else {
1054 file_groups_indices.push(vec![idx]);
1056 }
1057 }
1058
1059 file_groups_indices.retain(|group| !group.is_empty());
1061
1062 Ok(file_groups_indices
1064 .into_iter()
1065 .map(|file_group_indices| {
1066 FileGroup::new(
1067 file_group_indices
1068 .into_iter()
1069 .map(|idx| flattened_files[idx].clone())
1070 .collect(),
1071 )
1072 })
1073 .collect())
1074 }
1075
1076 pub fn split_groups_by_statistics(
1080 table_schema: &SchemaRef,
1081 file_groups: &[FileGroup],
1082 sort_order: &LexOrdering,
1083 ) -> Result<Vec<FileGroup>> {
1084 let flattened_files = file_groups
1085 .iter()
1086 .flat_map(FileGroup::iter)
1087 .collect::<Vec<_>>();
1088 if flattened_files.is_empty() {
1100 return Ok(vec![]);
1101 }
1102
1103 let statistics = MinMaxStatistics::new_from_files(
1104 sort_order,
1105 table_schema,
1106 None,
1107 flattened_files.iter().copied(),
1108 )
1109 .map_err(|e| {
1110 e.context("construct min/max statistics for split_groups_by_statistics")
1111 })?;
1112
1113 let indices_sorted_by_min = statistics.min_values_sorted();
1114 let mut file_groups_indices: Vec<Vec<usize>> = vec![];
1115
1116 for (idx, min) in indices_sorted_by_min {
1117 let file_group_to_insert = file_groups_indices.iter_mut().find(|group| {
1118 min > statistics.max(
1121 *group
1122 .last()
1123 .expect("groups should be nonempty at construction"),
1124 )
1125 });
1126 match file_group_to_insert {
1127 Some(group) => group.push(idx),
1128 None => file_groups_indices.push(vec![idx]),
1129 }
1130 }
1131
1132 Ok(file_groups_indices
1134 .into_iter()
1135 .map(|file_group_indices| {
1136 file_group_indices
1137 .into_iter()
1138 .map(|idx| flattened_files[idx].clone())
1139 .collect()
1140 })
1141 .collect())
1142 }
1143
1144 fn fmt_file_source(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
1146 write!(f, ", file_type={}", self.file_source.file_type())?;
1147 self.file_source.fmt_extra(t, f)
1148 }
1149
1150 pub fn file_source(&self) -> &Arc<dyn FileSource> {
1152 &self.file_source
1153 }
1154
1155 fn rebuild_with_source(
1157 &self,
1158 new_file_source: Arc<dyn FileSource>,
1159 is_exact: bool,
1160 ) -> Result<Arc<dyn DataSource>> {
1161 let mut new_config = self.clone();
1162
1163 new_config.file_groups = new_config
1165 .file_groups
1166 .into_iter()
1167 .map(|group| {
1168 let mut files = group.into_inner();
1169 files.reverse();
1170 files.into()
1171 })
1172 .collect();
1173
1174 new_config.file_source = new_file_source;
1175
1176 if !is_exact {
1179 new_config.output_ordering = vec![];
1180 }
1181
1182 Ok(Arc::new(new_config))
1183 }
1184}
1185
1186impl Debug for FileScanConfig {
1187 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1188 write!(f, "FileScanConfig {{")?;
1189 write!(f, "object_store_url={:?}, ", self.object_store_url)?;
1190
1191 write!(f, "statistics={:?}, ", self.statistics())?;
1192
1193 DisplayAs::fmt_as(self, DisplayFormatType::Verbose, f)?;
1194 write!(f, "}}")
1195 }
1196}
1197
1198impl DisplayAs for FileScanConfig {
1199 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
1200 let schema = self.projected_schema().map_err(|_| std::fmt::Error {})?;
1201 let orderings = get_projected_output_ordering(self, &schema);
1202
1203 write!(f, "file_groups=")?;
1204 FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
1205
1206 if !schema.fields().is_empty() {
1207 write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
1208 }
1209
1210 if let Some(limit) = self.limit {
1211 write!(f, ", limit={limit}")?;
1212 }
1213
1214 display_orderings(f, &orderings)?;
1215
1216 if !self.constraints.is_empty() {
1217 write!(f, ", {}", self.constraints)?;
1218 }
1219
1220 Ok(())
1221 }
1222}
1223
1224fn ordered_column_indices_from_projection(
1228 projection: &ProjectionExprs,
1229) -> Option<Vec<usize>> {
1230 projection
1231 .expr_iter()
1232 .map(|e| {
1233 let index = e.as_any().downcast_ref::<Column>()?.index();
1234 Some(index)
1235 })
1236 .collect::<Option<Vec<usize>>>()
1237}
1238
1239fn is_ordering_valid_for_file_groups(
1250 file_groups: &[FileGroup],
1251 ordering: &LexOrdering,
1252 schema: &SchemaRef,
1253 projection: Option<&[usize]>,
1254) -> bool {
1255 file_groups.iter().all(|group| {
1256 if group.len() <= 1 {
1257 return true; }
1259 match MinMaxStatistics::new_from_files(ordering, schema, projection, group.iter())
1260 {
1261 Ok(stats) => stats.is_sorted(),
1262 Err(_) => false, }
1264 })
1265}
1266
1267fn validate_orderings(
1270 orderings: &[LexOrdering],
1271 schema: &SchemaRef,
1272 file_groups: &[FileGroup],
1273 projection: Option<&[usize]>,
1274) -> Vec<LexOrdering> {
1275 orderings
1276 .iter()
1277 .filter(|ordering| {
1278 is_ordering_valid_for_file_groups(file_groups, ordering, schema, projection)
1279 })
1280 .cloned()
1281 .collect()
1282}
1283
1284fn get_projected_output_ordering(
1344 base_config: &FileScanConfig,
1345 projected_schema: &SchemaRef,
1346) -> Vec<LexOrdering> {
1347 let projected_orderings =
1348 project_orderings(&base_config.output_ordering, projected_schema);
1349
1350 let indices = base_config
1351 .file_source
1352 .projection()
1353 .as_ref()
1354 .map(|p| ordered_column_indices_from_projection(p));
1355
1356 match indices {
1357 Some(Some(indices)) => {
1358 validate_orderings(
1360 &projected_orderings,
1361 projected_schema,
1362 &base_config.file_groups,
1363 Some(indices.as_slice()),
1364 )
1365 }
1366 None => {
1367 validate_orderings(
1369 &projected_orderings,
1370 projected_schema,
1371 &base_config.file_groups,
1372 None,
1373 )
1374 }
1375 Some(None) => {
1376 if base_config.file_groups.iter().all(|g| g.len() <= 1) {
1380 projected_orderings
1381 } else {
1382 debug!(
1383 "Skipping specified output orderings. \
1384 Some file groups couldn't be determined to be sorted: {:?}",
1385 base_config.file_groups
1386 );
1387 vec![]
1388 }
1389 }
1390 }
1391}
1392
1393pub fn wrap_partition_type_in_dict(val_type: DataType) -> DataType {
1404 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type))
1405}
1406
1407pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {
1411 ScalarValue::Dictionary(Box::new(DataType::UInt16), Box::new(val))
1412}
1413
1414#[cfg(test)]
1415mod tests {
1416 use std::collections::HashMap;
1417
1418 use super::*;
1419 use crate::TableSchema;
1420 use crate::test_util::col;
1421 use crate::{
1422 generate_test_files, test_util::MockSource, tests::aggr_test_schema,
1423 verify_sort_integrity,
1424 };
1425
1426 use arrow::datatypes::Field;
1427 use datafusion_common::stats::Precision;
1428 use datafusion_common::{ColumnStatistics, internal_err};
1429 use datafusion_expr::{Operator, SortExpr};
1430 use datafusion_physical_expr::create_physical_sort_expr;
1431 use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
1432 use datafusion_physical_expr::projection::ProjectionExpr;
1433 use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
1434
1435 #[test]
1436 fn physical_plan_config_no_projection_tab_cols_as_field() {
1437 let file_schema = aggr_test_schema();
1438
1439 let table_partition_col =
1441 Field::new("date", wrap_partition_type_in_dict(DataType::Utf8), true)
1442 .with_metadata(HashMap::from_iter(vec![(
1443 "key_whatever".to_owned(),
1444 "value_whatever".to_owned(),
1445 )]));
1446
1447 let conf = config_for_projection(
1448 Arc::clone(&file_schema),
1449 None,
1450 Statistics::new_unknown(&file_schema),
1451 vec![table_partition_col.clone()],
1452 );
1453
1454 let proj_schema = conf.projected_schema().unwrap();
1456 assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
1457 assert_eq!(
1458 *proj_schema.field(file_schema.fields().len()),
1459 table_partition_col,
1460 "partition columns are the last columns and ust have all values defined in created field"
1461 );
1462 }
1463
1464 #[test]
1465 fn test_split_groups_by_statistics() -> Result<()> {
1466 use chrono::TimeZone;
1467 use datafusion_common::DFSchema;
1468 use datafusion_expr::execution_props::ExecutionProps;
1469 use object_store::{ObjectMeta, path::Path};
1470
1471 struct File {
1472 name: &'static str,
1473 date: &'static str,
1474 statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
1475 }
1476 impl File {
1477 fn new(
1478 name: &'static str,
1479 date: &'static str,
1480 statistics: Vec<Option<(f64, f64)>>,
1481 ) -> Self {
1482 Self::new_nullable(
1483 name,
1484 date,
1485 statistics
1486 .into_iter()
1487 .map(|opt| opt.map(|(min, max)| (Some(min), Some(max))))
1488 .collect(),
1489 )
1490 }
1491
1492 fn new_nullable(
1493 name: &'static str,
1494 date: &'static str,
1495 statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
1496 ) -> Self {
1497 Self {
1498 name,
1499 date,
1500 statistics,
1501 }
1502 }
1503 }
1504
1505 struct TestCase {
1506 name: &'static str,
1507 file_schema: Schema,
1508 files: Vec<File>,
1509 sort: Vec<SortExpr>,
1510 expected_result: Result<Vec<Vec<&'static str>>, &'static str>,
1511 }
1512
1513 use datafusion_expr::col;
1514 let cases = vec![
1515 TestCase {
1516 name: "test sort",
1517 file_schema: Schema::new(vec![Field::new(
1518 "value".to_string(),
1519 DataType::Float64,
1520 false,
1521 )]),
1522 files: vec![
1523 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1524 File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1525 File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1526 ],
1527 sort: vec![col("value").sort(true, false)],
1528 expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1529 },
1530 TestCase {
1533 name: "test sort with files ordered differently",
1534 file_schema: Schema::new(vec![Field::new(
1535 "value".to_string(),
1536 DataType::Float64,
1537 false,
1538 )]),
1539 files: vec![
1540 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1541 File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1542 File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1543 ],
1544 sort: vec![col("value").sort(true, false)],
1545 expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1546 },
1547 TestCase {
1548 name: "reverse sort",
1549 file_schema: Schema::new(vec![Field::new(
1550 "value".to_string(),
1551 DataType::Float64,
1552 false,
1553 )]),
1554 files: vec![
1555 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1556 File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1557 File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1558 ],
1559 sort: vec![col("value").sort(false, true)],
1560 expected_result: Ok(vec![vec!["1", "0"], vec!["2"]]),
1561 },
1562 TestCase {
1563 name: "nullable sort columns, nulls last",
1564 file_schema: Schema::new(vec![Field::new(
1565 "value".to_string(),
1566 DataType::Float64,
1567 true,
1568 )]),
1569 files: vec![
1570 File::new_nullable(
1571 "0",
1572 "2023-01-01",
1573 vec![Some((Some(0.00), Some(0.49)))],
1574 ),
1575 File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), None))]),
1576 File::new_nullable("2", "2023-01-02", vec![Some((Some(0.00), None))]),
1577 ],
1578 sort: vec![col("value").sort(true, false)],
1579 expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1580 },
1581 TestCase {
1582 name: "nullable sort columns, nulls first",
1583 file_schema: Schema::new(vec![Field::new(
1584 "value".to_string(),
1585 DataType::Float64,
1586 true,
1587 )]),
1588 files: vec![
1589 File::new_nullable("0", "2023-01-01", vec![Some((None, Some(0.49)))]),
1590 File::new_nullable(
1591 "1",
1592 "2023-01-01",
1593 vec![Some((Some(0.50), Some(1.00)))],
1594 ),
1595 File::new_nullable("2", "2023-01-02", vec![Some((None, Some(1.00)))]),
1596 ],
1597 sort: vec![col("value").sort(true, true)],
1598 expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1599 },
1600 TestCase {
1601 name: "all three non-overlapping",
1602 file_schema: Schema::new(vec![Field::new(
1603 "value".to_string(),
1604 DataType::Float64,
1605 false,
1606 )]),
1607 files: vec![
1608 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1609 File::new("1", "2023-01-01", vec![Some((0.50, 0.99))]),
1610 File::new("2", "2023-01-02", vec![Some((1.00, 1.49))]),
1611 ],
1612 sort: vec![col("value").sort(true, false)],
1613 expected_result: Ok(vec![vec!["0", "1", "2"]]),
1614 },
1615 TestCase {
1616 name: "all three overlapping",
1617 file_schema: Schema::new(vec![Field::new(
1618 "value".to_string(),
1619 DataType::Float64,
1620 false,
1621 )]),
1622 files: vec![
1623 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1624 File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
1625 File::new("2", "2023-01-02", vec![Some((0.00, 0.49))]),
1626 ],
1627 sort: vec![col("value").sort(true, false)],
1628 expected_result: Ok(vec![vec!["0"], vec!["1"], vec!["2"]]),
1629 },
1630 TestCase {
1631 name: "empty input",
1632 file_schema: Schema::new(vec![Field::new(
1633 "value".to_string(),
1634 DataType::Float64,
1635 false,
1636 )]),
1637 files: vec![],
1638 sort: vec![col("value").sort(true, false)],
1639 expected_result: Ok(vec![]),
1640 },
1641 TestCase {
1642 name: "one file missing statistics",
1643 file_schema: Schema::new(vec![Field::new(
1644 "value".to_string(),
1645 DataType::Float64,
1646 false,
1647 )]),
1648 files: vec![
1649 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1650 File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
1651 File::new("2", "2023-01-02", vec![None]),
1652 ],
1653 sort: vec![col("value").sort(true, false)],
1654 expected_result: Err(
1655 "construct min/max statistics for split_groups_by_statistics\ncaused by\ncollect min/max values\ncaused by\nget min/max for column: 'value'\ncaused by\nError during planning: statistics not found",
1656 ),
1657 },
1658 ];
1659
1660 for case in cases {
1661 let table_schema = Arc::new(Schema::new(
1662 case.file_schema
1663 .fields()
1664 .clone()
1665 .into_iter()
1666 .cloned()
1667 .chain(Some(Arc::new(Field::new(
1668 "date".to_string(),
1669 DataType::Utf8,
1670 false,
1671 ))))
1672 .collect::<Vec<_>>(),
1673 ));
1674 let Some(sort_order) = LexOrdering::new(
1675 case.sort
1676 .into_iter()
1677 .map(|expr| {
1678 create_physical_sort_expr(
1679 &expr,
1680 &DFSchema::try_from(Arc::clone(&table_schema))?,
1681 &ExecutionProps::default(),
1682 )
1683 })
1684 .collect::<Result<Vec<_>>>()?,
1685 ) else {
1686 return internal_err!("This test should always use an ordering");
1687 };
1688
1689 let partitioned_files = FileGroup::new(
1690 case.files.into_iter().map(From::from).collect::<Vec<_>>(),
1691 );
1692 let result = FileScanConfig::split_groups_by_statistics(
1693 &table_schema,
1694 std::slice::from_ref(&partitioned_files),
1695 &sort_order,
1696 );
1697 let results_by_name = result
1698 .as_ref()
1699 .map(|file_groups| {
1700 file_groups
1701 .iter()
1702 .map(|file_group| {
1703 file_group
1704 .iter()
1705 .map(|file| {
1706 partitioned_files
1707 .iter()
1708 .find_map(|f| {
1709 if f.object_meta == file.object_meta {
1710 Some(
1711 f.object_meta
1712 .location
1713 .as_ref()
1714 .rsplit('/')
1715 .next()
1716 .unwrap()
1717 .trim_end_matches(".parquet"),
1718 )
1719 } else {
1720 None
1721 }
1722 })
1723 .unwrap()
1724 })
1725 .collect::<Vec<_>>()
1726 })
1727 .collect::<Vec<_>>()
1728 })
1729 .map_err(|e| e.strip_backtrace().leak() as &'static str);
1730
1731 assert_eq!(results_by_name, case.expected_result, "{}", case.name);
1732 }
1733
1734 return Ok(());
1735
1736 impl From<File> for PartitionedFile {
1737 fn from(file: File) -> Self {
1738 PartitionedFile {
1739 object_meta: ObjectMeta {
1740 location: Path::from(format!(
1741 "data/date={}/{}.parquet",
1742 file.date, file.name
1743 )),
1744 last_modified: chrono::Utc.timestamp_nanos(0),
1745 size: 0,
1746 e_tag: None,
1747 version: None,
1748 },
1749 partition_values: vec![ScalarValue::from(file.date)],
1750 range: None,
1751 statistics: Some(Arc::new(Statistics {
1752 num_rows: Precision::Absent,
1753 total_byte_size: Precision::Absent,
1754 column_statistics: file
1755 .statistics
1756 .into_iter()
1757 .map(|stats| {
1758 stats
1759 .map(|(min, max)| ColumnStatistics {
1760 min_value: Precision::Exact(
1761 ScalarValue::Float64(min),
1762 ),
1763 max_value: Precision::Exact(
1764 ScalarValue::Float64(max),
1765 ),
1766 ..Default::default()
1767 })
1768 .unwrap_or_default()
1769 })
1770 .collect::<Vec<_>>(),
1771 })),
1772 extensions: None,
1773 metadata_size_hint: None,
1774 }
1775 }
1776 }
1777 }
1778
1779 fn config_for_projection(
1781 file_schema: SchemaRef,
1782 projection: Option<Vec<usize>>,
1783 statistics: Statistics,
1784 table_partition_cols: Vec<Field>,
1785 ) -> FileScanConfig {
1786 let table_schema = TableSchema::new(
1787 file_schema,
1788 table_partition_cols.into_iter().map(Arc::new).collect(),
1789 );
1790 FileScanConfigBuilder::new(
1791 ObjectStoreUrl::parse("test:///").unwrap(),
1792 Arc::new(MockSource::new(table_schema.clone())),
1793 )
1794 .with_projection_indices(projection)
1795 .unwrap()
1796 .with_statistics(statistics)
1797 .build()
1798 }
1799
1800 #[test]
1801 fn test_file_scan_config_builder() {
1802 let file_schema = aggr_test_schema();
1803 let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
1804
1805 let table_schema = TableSchema::new(
1806 Arc::clone(&file_schema),
1807 vec![Arc::new(Field::new(
1808 "date",
1809 wrap_partition_type_in_dict(DataType::Utf8),
1810 false,
1811 ))],
1812 );
1813
1814 let file_source: Arc<dyn FileSource> =
1815 Arc::new(MockSource::new(table_schema.clone()));
1816
1817 let builder = FileScanConfigBuilder::new(
1819 object_store_url.clone(),
1820 Arc::clone(&file_source),
1821 );
1822
1823 let config = builder
1825 .with_limit(Some(1000))
1826 .with_projection_indices(Some(vec![0, 1]))
1827 .unwrap()
1828 .with_statistics(Statistics::new_unknown(&file_schema))
1829 .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new(
1830 "test.parquet".to_string(),
1831 1024,
1832 )])])
1833 .with_output_ordering(vec![
1834 [PhysicalSortExpr::new_default(Arc::new(Column::new(
1835 "date", 0,
1836 )))]
1837 .into(),
1838 ])
1839 .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
1840 .build();
1841
1842 assert_eq!(config.object_store_url, object_store_url);
1844 assert_eq!(*config.file_schema(), file_schema);
1845 assert_eq!(config.limit, Some(1000));
1846 assert_eq!(
1847 config
1848 .file_source
1849 .projection()
1850 .as_ref()
1851 .map(|p| p.column_indices()),
1852 Some(vec![0, 1])
1853 );
1854 assert_eq!(config.table_partition_cols().len(), 1);
1855 assert_eq!(config.table_partition_cols()[0].name(), "date");
1856 assert_eq!(config.file_groups.len(), 1);
1857 assert_eq!(config.file_groups[0].len(), 1);
1858 assert_eq!(
1859 config.file_groups[0][0].object_meta.location.as_ref(),
1860 "test.parquet"
1861 );
1862 assert_eq!(
1863 config.file_compression_type,
1864 FileCompressionType::UNCOMPRESSED
1865 );
1866 assert_eq!(config.output_ordering.len(), 1);
1867 }
1868
1869 #[test]
1870 fn equivalence_properties_after_schema_change() {
1871 let file_schema = aggr_test_schema();
1872 let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
1873
1874 let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
1875
1876 let file_source: Arc<dyn FileSource> = Arc::new(
1878 MockSource::new(table_schema.clone()).with_filter(Arc::new(BinaryExpr::new(
1879 col("c2", &file_schema).unwrap(),
1880 Operator::Eq,
1881 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1882 ))),
1883 );
1884
1885 let config = FileScanConfigBuilder::new(
1886 object_store_url.clone(),
1887 Arc::clone(&file_source),
1888 )
1889 .with_projection_indices(Some(vec![0, 1, 2]))
1890 .unwrap()
1891 .build();
1892
1893 let exprs = ProjectionExprs::new(vec![ProjectionExpr::new(
1896 col("c1", &file_schema).unwrap(),
1897 "c1",
1898 )]);
1899 let data_source = config
1900 .try_swapping_with_projection(&exprs)
1901 .unwrap()
1902 .unwrap();
1903
1904 let eq_properties = data_source.eq_properties();
1907 let eq_group = eq_properties.eq_group();
1908
1909 for class in eq_group.iter() {
1910 for expr in class.iter() {
1911 if let Some(col) = expr.as_any().downcast_ref::<Column>() {
1912 assert_ne!(
1913 col.name(),
1914 "c2",
1915 "c2 should not be present in any equivalence class"
1916 );
1917 }
1918 }
1919 }
1920 }
1921
1922 #[test]
1923 fn test_file_scan_config_builder_defaults() {
1924 let file_schema = aggr_test_schema();
1925 let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
1926
1927 let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
1928
1929 let file_source: Arc<dyn FileSource> =
1930 Arc::new(MockSource::new(table_schema.clone()));
1931
1932 let config = FileScanConfigBuilder::new(
1934 object_store_url.clone(),
1935 Arc::clone(&file_source),
1936 )
1937 .build();
1938
1939 assert_eq!(config.object_store_url, object_store_url);
1941 assert_eq!(*config.file_schema(), file_schema);
1942 assert_eq!(config.limit, None);
1943 let expected_projection: Vec<usize> = (0..file_schema.fields().len()).collect();
1946 assert_eq!(
1947 config
1948 .file_source
1949 .projection()
1950 .as_ref()
1951 .map(|p| p.column_indices()),
1952 Some(expected_projection)
1953 );
1954 assert!(config.table_partition_cols().is_empty());
1955 assert!(config.file_groups.is_empty());
1956 assert_eq!(
1957 config.file_compression_type,
1958 FileCompressionType::UNCOMPRESSED
1959 );
1960 assert!(config.output_ordering.is_empty());
1961 assert!(config.constraints.is_empty());
1962
1963 assert_eq!(config.statistics().num_rows, Precision::Absent);
1965 assert_eq!(config.statistics().total_byte_size, Precision::Absent);
1966 assert_eq!(
1967 config.statistics().column_statistics.len(),
1968 file_schema.fields().len()
1969 );
1970 for stat in config.statistics().column_statistics {
1971 assert_eq!(stat.distinct_count, Precision::Absent);
1972 assert_eq!(stat.min_value, Precision::Absent);
1973 assert_eq!(stat.max_value, Precision::Absent);
1974 assert_eq!(stat.null_count, Precision::Absent);
1975 }
1976 }
1977
1978 #[test]
1979 fn test_file_scan_config_builder_new_from() {
1980 let schema = aggr_test_schema();
1981 let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
1982 let partition_cols = vec![Field::new(
1983 "date",
1984 wrap_partition_type_in_dict(DataType::Utf8),
1985 false,
1986 )];
1987 let file = PartitionedFile::new("test_file.parquet", 100);
1988
1989 let table_schema = TableSchema::new(
1990 Arc::clone(&schema),
1991 partition_cols.iter().map(|f| Arc::new(f.clone())).collect(),
1992 );
1993
1994 let file_source: Arc<dyn FileSource> =
1995 Arc::new(MockSource::new(table_schema.clone()));
1996
1997 let original_config = FileScanConfigBuilder::new(
1999 object_store_url.clone(),
2000 Arc::clone(&file_source),
2001 )
2002 .with_projection_indices(Some(vec![0, 2]))
2003 .unwrap()
2004 .with_limit(Some(10))
2005 .with_file(file.clone())
2006 .with_constraints(Constraints::default())
2007 .build();
2008
2009 let new_builder = FileScanConfigBuilder::from(original_config);
2011
2012 let new_config = new_builder.build();
2014
2015 let partition_cols = partition_cols.into_iter().map(Arc::new).collect::<Vec<_>>();
2017 assert_eq!(new_config.object_store_url, object_store_url);
2018 assert_eq!(*new_config.file_schema(), schema);
2019 assert_eq!(
2020 new_config
2021 .file_source
2022 .projection()
2023 .as_ref()
2024 .map(|p| p.column_indices()),
2025 Some(vec![0, 2])
2026 );
2027 assert_eq!(new_config.limit, Some(10));
2028 assert_eq!(*new_config.table_partition_cols(), partition_cols);
2029 assert_eq!(new_config.file_groups.len(), 1);
2030 assert_eq!(new_config.file_groups[0].len(), 1);
2031 assert_eq!(
2032 new_config.file_groups[0][0].object_meta.location.as_ref(),
2033 "test_file.parquet"
2034 );
2035 assert_eq!(new_config.constraints, Constraints::default());
2036 }
2037
2038 #[test]
2039 fn test_split_groups_by_statistics_with_target_partitions() -> Result<()> {
2040 use datafusion_common::DFSchema;
2041 use datafusion_expr::{col, execution_props::ExecutionProps};
2042
2043 let schema = Arc::new(Schema::new(vec![Field::new(
2044 "value",
2045 DataType::Float64,
2046 false,
2047 )]));
2048
2049 let exec_props = ExecutionProps::new();
2051 let df_schema = DFSchema::try_from_qualified_schema("test", schema.as_ref())?;
2052 let sort_expr = [col("value").sort(true, false)];
2053 let sort_ordering = sort_expr
2054 .map(|expr| {
2055 create_physical_sort_expr(&expr, &df_schema, &exec_props).unwrap()
2056 })
2057 .into();
2058
2059 struct TestCase {
2061 name: String,
2062 file_count: usize,
2063 overlap_factor: f64,
2064 target_partitions: usize,
2065 expected_partition_count: usize,
2066 }
2067
2068 let test_cases = vec![
2069 TestCase {
2071 name: "no_overlap_10_files_4_partitions".to_string(),
2072 file_count: 10,
2073 overlap_factor: 0.0,
2074 target_partitions: 4,
2075 expected_partition_count: 4,
2076 },
2077 TestCase {
2078 name: "medium_overlap_20_files_5_partitions".to_string(),
2079 file_count: 20,
2080 overlap_factor: 0.5,
2081 target_partitions: 5,
2082 expected_partition_count: 5,
2083 },
2084 TestCase {
2085 name: "high_overlap_30_files_3_partitions".to_string(),
2086 file_count: 30,
2087 overlap_factor: 0.8,
2088 target_partitions: 3,
2089 expected_partition_count: 7,
2090 },
2091 TestCase {
2093 name: "fewer_files_than_partitions".to_string(),
2094 file_count: 3,
2095 overlap_factor: 0.0,
2096 target_partitions: 10,
2097 expected_partition_count: 3, },
2099 TestCase {
2100 name: "single_file".to_string(),
2101 file_count: 1,
2102 overlap_factor: 0.0,
2103 target_partitions: 5,
2104 expected_partition_count: 1, },
2106 TestCase {
2107 name: "empty_files".to_string(),
2108 file_count: 0,
2109 overlap_factor: 0.0,
2110 target_partitions: 3,
2111 expected_partition_count: 0, },
2113 ];
2114
2115 for case in test_cases {
2116 println!("Running test case: {}", case.name);
2117
2118 let file_groups = generate_test_files(case.file_count, case.overlap_factor);
2120
2121 let result =
2123 FileScanConfig::split_groups_by_statistics_with_target_partitions(
2124 &schema,
2125 &file_groups,
2126 &sort_ordering,
2127 case.target_partitions,
2128 )?;
2129
2130 println!(
2132 "Created {} partitions (target was {})",
2133 result.len(),
2134 case.target_partitions
2135 );
2136
2137 assert_eq!(
2139 result.len(),
2140 case.expected_partition_count,
2141 "Case '{}': Unexpected partition count",
2142 case.name
2143 );
2144
2145 assert!(
2147 verify_sort_integrity(&result),
2148 "Case '{}': Files within partitions are not properly ordered",
2149 case.name
2150 );
2151
2152 if case.file_count > 1 && case.expected_partition_count > 1 {
2154 let group_sizes: Vec<usize> = result.iter().map(FileGroup::len).collect();
2155 let max_size = *group_sizes.iter().max().unwrap();
2156 let min_size = *group_sizes.iter().min().unwrap();
2157
2158 let avg_files_per_partition =
2160 case.file_count as f64 / case.expected_partition_count as f64;
2161 assert!(
2162 (max_size as f64) < 2.0 * avg_files_per_partition,
2163 "Case '{}': Unbalanced distribution. Max partition size {} exceeds twice the average {}",
2164 case.name,
2165 max_size,
2166 avg_files_per_partition
2167 );
2168
2169 println!("Distribution - min files: {min_size}, max files: {max_size}");
2170 }
2171 }
2172
2173 let empty_groups: Vec<FileGroup> = vec![];
2175 let err = FileScanConfig::split_groups_by_statistics_with_target_partitions(
2176 &schema,
2177 &empty_groups,
2178 &sort_ordering,
2179 0,
2180 )
2181 .unwrap_err();
2182
2183 assert!(
2184 err.to_string()
2185 .contains("target_partitions must be greater than 0"),
2186 "Expected error for zero target partitions"
2187 );
2188
2189 Ok(())
2190 }
2191
2192 #[test]
2193 fn test_partition_statistics_projection() {
2194 use crate::source::DataSourceExec;
2200 use datafusion_physical_plan::ExecutionPlan;
2201
2202 let schema = Arc::new(Schema::new(vec![
2204 Field::new("col0", DataType::Int32, false),
2205 Field::new("col1", DataType::Int32, false),
2206 Field::new("col2", DataType::Int32, false),
2207 Field::new("col3", DataType::Int32, false),
2208 ]));
2209
2210 let file_group_stats = Statistics {
2212 num_rows: Precision::Exact(100),
2213 total_byte_size: Precision::Exact(1024),
2214 column_statistics: vec![
2215 ColumnStatistics {
2216 null_count: Precision::Exact(0),
2217 ..ColumnStatistics::new_unknown()
2218 },
2219 ColumnStatistics {
2220 null_count: Precision::Exact(5),
2221 ..ColumnStatistics::new_unknown()
2222 },
2223 ColumnStatistics {
2224 null_count: Precision::Exact(10),
2225 ..ColumnStatistics::new_unknown()
2226 },
2227 ColumnStatistics {
2228 null_count: Precision::Exact(15),
2229 ..ColumnStatistics::new_unknown()
2230 },
2231 ],
2232 };
2233
2234 let file_group = FileGroup::new(vec![PartitionedFile::new("test.parquet", 1024)])
2236 .with_statistics(Arc::new(file_group_stats));
2237
2238 let table_schema = TableSchema::new(Arc::clone(&schema), vec![]);
2239
2240 let config = FileScanConfigBuilder::new(
2242 ObjectStoreUrl::parse("test:///").unwrap(),
2243 Arc::new(MockSource::new(table_schema.clone())),
2244 )
2245 .with_projection_indices(Some(vec![0, 2]))
2246 .unwrap() .with_file_groups(vec![file_group])
2248 .build();
2249
2250 let exec = DataSourceExec::from_data_source(config);
2252
2253 let partition_stats = exec.partition_statistics(Some(0)).unwrap();
2255
2256 assert_eq!(
2258 partition_stats.column_statistics.len(),
2259 2,
2260 "Expected 2 column statistics (projected), but got {}",
2261 partition_stats.column_statistics.len()
2262 );
2263
2264 assert_eq!(
2266 partition_stats.column_statistics[0].null_count,
2267 Precision::Exact(0),
2268 "First projected column should be col0 with 0 nulls"
2269 );
2270 assert_eq!(
2271 partition_stats.column_statistics[1].null_count,
2272 Precision::Exact(10),
2273 "Second projected column should be col2 with 10 nulls"
2274 );
2275
2276 assert_eq!(partition_stats.num_rows, Precision::Exact(100));
2278 assert_eq!(partition_stats.total_byte_size, Precision::Exact(800));
2279 }
2280
2281 #[test]
2282 fn test_output_partitioning_not_partitioned_by_file_group() {
2283 let file_schema = aggr_test_schema();
2284 let partition_col =
2285 Field::new("date", wrap_partition_type_in_dict(DataType::Utf8), false);
2286
2287 let config = config_for_projection(
2288 Arc::clone(&file_schema),
2289 None,
2290 Statistics::new_unknown(&file_schema),
2291 vec![partition_col],
2292 );
2293
2294 let partitioning = config.output_partitioning();
2296 assert!(matches!(partitioning, Partitioning::UnknownPartitioning(_)));
2297 }
2298
2299 #[test]
2300 fn test_output_partitioning_no_partition_columns() {
2301 let file_schema = aggr_test_schema();
2302 let mut config = config_for_projection(
2303 Arc::clone(&file_schema),
2304 None,
2305 Statistics::new_unknown(&file_schema),
2306 vec![], );
2308 config.partitioned_by_file_group = true;
2309
2310 let partitioning = config.output_partitioning();
2311 assert!(matches!(partitioning, Partitioning::UnknownPartitioning(_)));
2312 }
2313
2314 #[test]
2315 fn test_output_partitioning_with_partition_columns() {
2316 let file_schema = aggr_test_schema();
2317
2318 let single_partition_col = vec![Field::new(
2320 "date",
2321 wrap_partition_type_in_dict(DataType::Utf8),
2322 false,
2323 )];
2324
2325 let mut config = config_for_projection(
2326 Arc::clone(&file_schema),
2327 None,
2328 Statistics::new_unknown(&file_schema),
2329 single_partition_col,
2330 );
2331 config.partitioned_by_file_group = true;
2332 config.file_groups = vec![
2333 FileGroup::new(vec![PartitionedFile::new("f1.parquet".to_string(), 1024)]),
2334 FileGroup::new(vec![PartitionedFile::new("f2.parquet".to_string(), 1024)]),
2335 FileGroup::new(vec![PartitionedFile::new("f3.parquet".to_string(), 1024)]),
2336 ];
2337
2338 let partitioning = config.output_partitioning();
2339 match partitioning {
2340 Partitioning::Hash(exprs, num_partitions) => {
2341 assert_eq!(num_partitions, 3);
2342 assert_eq!(exprs.len(), 1);
2343 assert_eq!(
2344 exprs[0].as_any().downcast_ref::<Column>().unwrap().name(),
2345 "date"
2346 );
2347 }
2348 _ => panic!("Expected Hash partitioning"),
2349 }
2350
2351 let multiple_partition_cols = vec![
2353 Field::new("year", wrap_partition_type_in_dict(DataType::Utf8), false),
2354 Field::new("month", wrap_partition_type_in_dict(DataType::Utf8), false),
2355 ];
2356
2357 config = config_for_projection(
2358 Arc::clone(&file_schema),
2359 None,
2360 Statistics::new_unknown(&file_schema),
2361 multiple_partition_cols,
2362 );
2363 config.partitioned_by_file_group = true;
2364 config.file_groups = vec![
2365 FileGroup::new(vec![PartitionedFile::new("f1.parquet".to_string(), 1024)]),
2366 FileGroup::new(vec![PartitionedFile::new("f2.parquet".to_string(), 1024)]),
2367 ];
2368
2369 let partitioning = config.output_partitioning();
2370 match partitioning {
2371 Partitioning::Hash(exprs, num_partitions) => {
2372 assert_eq!(num_partitions, 2);
2373 assert_eq!(exprs.len(), 2);
2374 let col_names: Vec<_> = exprs
2375 .iter()
2376 .map(|e| e.as_any().downcast_ref::<Column>().unwrap().name())
2377 .collect();
2378 assert_eq!(col_names, vec!["year", "month"]);
2379 }
2380 _ => panic!("Expected Hash partitioning"),
2381 }
2382 }
2383}