1use crate::file_groups::FileGroup;
22use crate::{
23 PartitionedFile, display::FileGroupsDisplay, file::FileSource,
24 file_compression_type::FileCompressionType, file_stream::FileStream,
25 source::DataSource, statistics::MinMaxStatistics,
26};
27use arrow::datatypes::FieldRef;
28use arrow::datatypes::{DataType, Schema, SchemaRef};
29use datafusion_common::config::ConfigOptions;
30use datafusion_common::{
31 Constraints, Result, ScalarValue, Statistics, internal_datafusion_err, internal_err,
32};
33use datafusion_execution::{
34 SendableRecordBatchStream, TaskContext, object_store::ObjectStoreUrl,
35};
36use datafusion_expr::Operator;
37
38use datafusion_physical_expr::equivalence::project_orderings;
39use datafusion_physical_expr::expressions::{BinaryExpr, Column};
40use datafusion_physical_expr::projection::ProjectionExprs;
41use datafusion_physical_expr::utils::reassign_expr_columns;
42use datafusion_physical_expr::{EquivalenceProperties, Partitioning, split_conjunction};
43use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
44use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
45use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
46use datafusion_physical_plan::SortOrderPushdownResult;
47use datafusion_physical_plan::coop::cooperative;
48use datafusion_physical_plan::execution_plan::SchedulingType;
49use datafusion_physical_plan::{
50 DisplayAs, DisplayFormatType,
51 display::{ProjectSchemaDisplay, display_orderings},
52 filter_pushdown::FilterPushdownPropagation,
53 metrics::ExecutionPlanMetricsSet,
54};
55use log::{debug, warn};
56use std::{any::Any, fmt::Debug, fmt::Formatter, fmt::Result as FmtResult, sync::Arc};
57
58#[derive(Clone)]
127pub struct FileScanConfig {
128 pub object_store_url: ObjectStoreUrl,
140 pub file_groups: Vec<FileGroup>,
150 pub constraints: Constraints,
152 pub limit: Option<usize>,
155 pub output_ordering: Vec<LexOrdering>,
157 pub file_compression_type: FileCompressionType,
159 pub file_source: Arc<dyn FileSource>,
161 pub batch_size: Option<usize>,
164 pub expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
167 pub(crate) statistics: Statistics,
174 pub partitioned_by_file_group: bool,
182}
183
184#[derive(Clone)]
239pub struct FileScanConfigBuilder {
240 object_store_url: ObjectStoreUrl,
241 file_source: Arc<dyn FileSource>,
242 limit: Option<usize>,
243 constraints: Option<Constraints>,
244 file_groups: Vec<FileGroup>,
245 statistics: Option<Statistics>,
246 output_ordering: Vec<LexOrdering>,
247 file_compression_type: Option<FileCompressionType>,
248 batch_size: Option<usize>,
249 expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
250 partitioned_by_file_group: bool,
251}
252
253impl FileScanConfigBuilder {
254 pub fn new(
261 object_store_url: ObjectStoreUrl,
262 file_source: Arc<dyn FileSource>,
263 ) -> Self {
264 Self {
265 object_store_url,
266 file_source,
267 file_groups: vec![],
268 statistics: None,
269 output_ordering: vec![],
270 file_compression_type: None,
271 limit: None,
272 constraints: None,
273 batch_size: None,
274 expr_adapter_factory: None,
275 partitioned_by_file_group: false,
276 }
277 }
278
279 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
282 self.limit = limit;
283 self
284 }
285
286 pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
291 self.file_source = file_source;
292 self
293 }
294
295 pub fn table_schema(&self) -> &SchemaRef {
296 self.file_source.table_schema().table_schema()
297 }
298
299 #[deprecated(since = "51.0.0", note = "Use with_projection_indices instead")]
305 pub fn with_projection(self, indices: Option<Vec<usize>>) -> Self {
306 match self.clone().with_projection_indices(indices) {
307 Ok(builder) => builder,
308 Err(e) => {
309 warn!(
310 "Failed to push down projection in FileScanConfigBuilder::with_projection: {e}"
311 );
312 self
313 }
314 }
315 }
316
317 pub fn with_projection_indices(
321 mut self,
322 indices: Option<Vec<usize>>,
323 ) -> Result<Self> {
324 let projection_exprs = indices.map(|indices| {
325 ProjectionExprs::from_indices(
326 &indices,
327 self.file_source.table_schema().table_schema(),
328 )
329 });
330 let Some(projection_exprs) = projection_exprs else {
331 return Ok(self);
332 };
333 let new_source = self
334 .file_source
335 .try_pushdown_projection(&projection_exprs)
336 .map_err(|e| {
337 internal_datafusion_err!(
338 "Failed to push down projection in FileScanConfigBuilder::build: {e}"
339 )
340 })?;
341 if let Some(new_source) = new_source {
342 self.file_source = new_source;
343 } else {
344 internal_err!(
345 "FileSource {} does not support projection pushdown",
346 self.file_source.file_type()
347 )?;
348 }
349 Ok(self)
350 }
351
352 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
354 self.constraints = Some(constraints);
355 self
356 }
357
358 pub fn with_statistics(mut self, statistics: Statistics) -> Self {
361 self.statistics = Some(statistics);
362 self
363 }
364
365 pub fn with_file_groups(mut self, file_groups: Vec<FileGroup>) -> Self {
375 self.file_groups = file_groups;
376 self
377 }
378
379 pub fn with_file_group(mut self, file_group: FileGroup) -> Self {
383 self.file_groups.push(file_group);
384 self
385 }
386
387 pub fn with_file(self, partitioned_file: PartitionedFile) -> Self {
391 self.with_file_group(FileGroup::new(vec![partitioned_file]))
392 }
393
394 pub fn with_output_ordering(mut self, output_ordering: Vec<LexOrdering>) -> Self {
396 self.output_ordering = output_ordering;
397 self
398 }
399
400 pub fn with_file_compression_type(
402 mut self,
403 file_compression_type: FileCompressionType,
404 ) -> Self {
405 self.file_compression_type = Some(file_compression_type);
406 self
407 }
408
409 pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
411 self.batch_size = batch_size;
412 self
413 }
414
415 pub fn with_expr_adapter(
422 mut self,
423 expr_adapter: Option<Arc<dyn PhysicalExprAdapterFactory>>,
424 ) -> Self {
425 self.expr_adapter_factory = expr_adapter;
426 self
427 }
428
429 pub fn with_partitioned_by_file_group(
434 mut self,
435 partitioned_by_file_group: bool,
436 ) -> Self {
437 self.partitioned_by_file_group = partitioned_by_file_group;
438 self
439 }
440
441 pub fn build(self) -> FileScanConfig {
449 let Self {
450 object_store_url,
451 file_source,
452 limit,
453 constraints,
454 file_groups,
455 statistics,
456 output_ordering,
457 file_compression_type,
458 batch_size,
459 expr_adapter_factory: expr_adapter,
460 partitioned_by_file_group,
461 } = self;
462
463 let constraints = constraints.unwrap_or_default();
464 let statistics = statistics.unwrap_or_else(|| {
465 Statistics::new_unknown(file_source.table_schema().table_schema())
466 });
467 let file_compression_type =
468 file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED);
469
470 FileScanConfig {
471 object_store_url,
472 file_source,
473 limit,
474 constraints,
475 file_groups,
476 output_ordering,
477 file_compression_type,
478 batch_size,
479 expr_adapter_factory: expr_adapter,
480 statistics,
481 partitioned_by_file_group,
482 }
483 }
484}
485
486impl From<FileScanConfig> for FileScanConfigBuilder {
487 fn from(config: FileScanConfig) -> Self {
488 Self {
489 object_store_url: config.object_store_url,
490 file_source: Arc::<dyn FileSource>::clone(&config.file_source),
491 file_groups: config.file_groups,
492 statistics: Some(config.statistics),
493 output_ordering: config.output_ordering,
494 file_compression_type: Some(config.file_compression_type),
495 limit: config.limit,
496 constraints: Some(config.constraints),
497 batch_size: config.batch_size,
498 expr_adapter_factory: config.expr_adapter_factory,
499 partitioned_by_file_group: config.partitioned_by_file_group,
500 }
501 }
502}
503
504impl DataSource for FileScanConfig {
505 fn open(
506 &self,
507 partition: usize,
508 context: Arc<TaskContext>,
509 ) -> Result<SendableRecordBatchStream> {
510 let object_store = context.runtime_env().object_store(&self.object_store_url)?;
511 let batch_size = self
512 .batch_size
513 .unwrap_or_else(|| context.session_config().batch_size());
514
515 let source = self.file_source.with_batch_size(batch_size);
516
517 let opener = source.create_file_opener(object_store, self, partition)?;
518
519 let stream = FileStream::new(self, partition, opener, source.metrics())?;
520 Ok(Box::pin(cooperative(stream)))
521 }
522
523 fn as_any(&self) -> &dyn Any {
524 self
525 }
526
527 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
528 match t {
529 DisplayFormatType::Default | DisplayFormatType::Verbose => {
530 let schema = self.projected_schema().map_err(|_| std::fmt::Error {})?;
531 let orderings = get_projected_output_ordering(self, &schema);
532
533 write!(f, "file_groups=")?;
534 FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
535
536 if !schema.fields().is_empty() {
537 if let Some(projection) = self.file_source.projection() {
538 let expr: Vec<String> = projection
541 .as_ref()
542 .iter()
543 .map(|proj_expr| {
544 if let Some(column) =
545 proj_expr.expr.as_any().downcast_ref::<Column>()
546 {
547 if column.name() == proj_expr.alias {
548 column.name().to_string()
549 } else {
550 format!(
551 "{} as {}",
552 proj_expr.expr, proj_expr.alias
553 )
554 }
555 } else {
556 format!("{} as {}", proj_expr.expr, proj_expr.alias)
557 }
558 })
559 .collect();
560 write!(f, ", projection=[{}]", expr.join(", "))?;
561 } else {
562 write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
563 }
564 }
565
566 if let Some(limit) = self.limit {
567 write!(f, ", limit={limit}")?;
568 }
569
570 display_orderings(f, &orderings)?;
571
572 if !self.constraints.is_empty() {
573 write!(f, ", {}", self.constraints)?;
574 }
575
576 self.fmt_file_source(t, f)
577 }
578 DisplayFormatType::TreeRender => {
579 writeln!(f, "format={}", self.file_source.file_type())?;
580 self.file_source.fmt_extra(t, f)?;
581 let num_files = self.file_groups.iter().map(|fg| fg.len()).sum::<usize>();
582 writeln!(f, "files={num_files}")?;
583 Ok(())
584 }
585 }
586 }
587
588 fn repartitioned(
590 &self,
591 target_partitions: usize,
592 repartition_file_min_size: usize,
593 output_ordering: Option<LexOrdering>,
594 ) -> Result<Option<Arc<dyn DataSource>>> {
595 if self.partitioned_by_file_group {
599 return Ok(None);
600 }
601
602 let source = self.file_source.repartitioned(
603 target_partitions,
604 repartition_file_min_size,
605 output_ordering,
606 self,
607 )?;
608
609 Ok(source.map(|s| Arc::new(s) as _))
610 }
611
612 fn output_partitioning(&self) -> Partitioning {
629 if self.partitioned_by_file_group {
630 let partition_cols = self.table_partition_cols();
631 if !partition_cols.is_empty() {
632 let projected_schema = match self.projected_schema() {
633 Ok(schema) => schema,
634 Err(_) => {
635 debug!(
636 "Could not get projected schema, falling back to UnknownPartitioning."
637 );
638 return Partitioning::UnknownPartitioning(self.file_groups.len());
639 }
640 };
641
642 let mut exprs: Vec<Arc<dyn PhysicalExpr>> = Vec::new();
645 for partition_col in partition_cols {
646 if let Some((idx, _)) = projected_schema
647 .fields()
648 .iter()
649 .enumerate()
650 .find(|(_, f)| f.name() == partition_col.name())
651 {
652 exprs.push(Arc::new(Column::new(partition_col.name(), idx)));
653 }
654 }
655
656 if exprs.len() == partition_cols.len() {
657 return Partitioning::Hash(exprs, self.file_groups.len());
658 }
659 }
660 }
661 Partitioning::UnknownPartitioning(self.file_groups.len())
662 }
663
664 fn eq_properties(&self) -> EquivalenceProperties {
665 let schema = self.file_source.table_schema().table_schema();
666 let mut eq_properties = EquivalenceProperties::new_with_orderings(
667 Arc::clone(schema),
668 self.output_ordering.clone(),
669 )
670 .with_constraints(self.constraints.clone());
671
672 if let Some(filter) = self.file_source.filter() {
673 match Self::add_filter_equivalence_info(&filter, &mut eq_properties, schema) {
676 Ok(()) => {}
677 Err(e) => {
678 warn!("Failed to add filter equivalence info: {e}");
679 #[cfg(debug_assertions)]
680 panic!("Failed to add filter equivalence info: {e}");
681 }
682 }
683 }
684
685 if let Some(projection) = self.file_source.projection() {
686 match (
687 projection.project_schema(schema),
688 projection.projection_mapping(schema),
689 ) {
690 (Ok(output_schema), Ok(mapping)) => {
691 eq_properties =
692 eq_properties.project(&mapping, Arc::new(output_schema));
693 }
694 (Err(e), _) | (_, Err(e)) => {
695 warn!("Failed to project equivalence properties: {e}");
696 #[cfg(debug_assertions)]
697 panic!("Failed to project equivalence properties: {e}");
698 }
699 }
700 }
701
702 eq_properties
703 }
704
705 fn scheduling_type(&self) -> SchedulingType {
706 SchedulingType::Cooperative
707 }
708
709 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
710 if let Some(partition) = partition {
711 if let Some(file_group) = self.file_groups.get(partition)
714 && let Some(stat) = file_group.file_statistics(None)
715 {
716 let output_schema = self.projected_schema()?;
718 return if let Some(projection) = self.file_source.projection() {
719 projection.project_statistics(stat.clone(), &output_schema)
720 } else {
721 Ok(stat.clone())
722 };
723 }
724 Ok(Statistics::new_unknown(self.projected_schema()?.as_ref()))
726 } else {
727 let statistics = self.statistics();
729 let projection = self.file_source.projection();
730 let output_schema = self.projected_schema()?;
731 if let Some(projection) = &projection {
732 projection.project_statistics(statistics.clone(), &output_schema)
733 } else {
734 Ok(statistics)
735 }
736 }
737 }
738
739 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
740 let source = FileScanConfigBuilder::from(self.clone())
741 .with_limit(limit)
742 .build();
743 Some(Arc::new(source))
744 }
745
746 fn fetch(&self) -> Option<usize> {
747 self.limit
748 }
749
750 fn metrics(&self) -> ExecutionPlanMetricsSet {
751 self.file_source.metrics().clone()
752 }
753
754 fn try_swapping_with_projection(
755 &self,
756 projection: &ProjectionExprs,
757 ) -> Result<Option<Arc<dyn DataSource>>> {
758 match self.file_source.try_pushdown_projection(projection)? {
759 Some(new_source) => {
760 let mut new_file_scan_config = self.clone();
761 new_file_scan_config.file_source = new_source;
762 Ok(Some(Arc::new(new_file_scan_config) as Arc<dyn DataSource>))
763 }
764 None => Ok(None),
765 }
766 }
767
768 fn try_pushdown_filters(
769 &self,
770 filters: Vec<Arc<dyn PhysicalExpr>>,
771 config: &ConfigOptions,
772 ) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
773 let table_schema = self.file_source.table_schema().table_schema();
781 let filters_to_remap = if let Some(projection) = self.file_source.projection() {
784 use datafusion_physical_plan::projection::update_expr;
785 filters
786 .into_iter()
787 .map(|filter| {
788 update_expr(&filter, projection.as_ref(), true)?.ok_or_else(|| {
789 internal_datafusion_err!(
790 "Failed to map filter expression through projection: {}",
791 filter
792 )
793 })
794 })
795 .collect::<Result<Vec<_>>>()?
796 } else {
797 filters
798 };
799 let remapped_filters: Result<Vec<_>> = filters_to_remap
801 .into_iter()
802 .map(|filter| reassign_expr_columns(filter, table_schema.as_ref()))
803 .collect();
804 let remapped_filters = remapped_filters?;
805
806 let result = self
807 .file_source
808 .try_pushdown_filters(remapped_filters, config)?;
809 match result.updated_node {
810 Some(new_file_source) => {
811 let mut new_file_scan_config = self.clone();
812 new_file_scan_config.file_source = new_file_source;
813 Ok(FilterPushdownPropagation {
814 filters: result.filters,
815 updated_node: Some(Arc::new(new_file_scan_config) as _),
816 })
817 }
818 None => {
819 Ok(FilterPushdownPropagation {
821 filters: result.filters,
822 updated_node: None,
823 })
824 }
825 }
826 }
827
828 fn try_pushdown_sort(
829 &self,
830 order: &[PhysicalSortExpr],
831 ) -> Result<SortOrderPushdownResult<Arc<dyn DataSource>>> {
832 let pushdown_result = self
834 .file_source
835 .try_reverse_output(order, &self.eq_properties())?;
836
837 match pushdown_result {
838 SortOrderPushdownResult::Exact { inner } => {
839 Ok(SortOrderPushdownResult::Exact {
840 inner: self.rebuild_with_source(inner, true)?,
841 })
842 }
843 SortOrderPushdownResult::Inexact { inner } => {
844 Ok(SortOrderPushdownResult::Inexact {
845 inner: self.rebuild_with_source(inner, false)?,
846 })
847 }
848 SortOrderPushdownResult::Unsupported => {
849 Ok(SortOrderPushdownResult::Unsupported)
850 }
851 }
852 }
853}
854
855impl FileScanConfig {
856 pub fn file_schema(&self) -> &SchemaRef {
858 self.file_source.table_schema().file_schema()
859 }
860
861 pub fn table_partition_cols(&self) -> &Vec<FieldRef> {
863 self.file_source.table_schema().table_partition_cols()
864 }
865
866 pub fn statistics(&self) -> Statistics {
872 if self.file_source.filter().is_some() {
873 self.statistics.clone().to_inexact()
874 } else {
875 self.statistics.clone()
876 }
877 }
878
879 pub fn projected_schema(&self) -> Result<Arc<Schema>> {
880 let schema = self.file_source.table_schema().table_schema();
881 match self.file_source.projection() {
882 Some(proj) => Ok(Arc::new(proj.project_schema(schema)?)),
883 None => Ok(Arc::clone(schema)),
884 }
885 }
886
887 fn add_filter_equivalence_info(
888 filter: &Arc<dyn PhysicalExpr>,
889 eq_properties: &mut EquivalenceProperties,
890 schema: &Schema,
891 ) -> Result<()> {
892 let equal_pairs = split_conjunction(filter).into_iter().filter_map(|expr| {
894 reassign_expr_columns(Arc::clone(expr), schema)
897 .ok()
898 .and_then(|expr| match expr.as_any().downcast_ref::<BinaryExpr>() {
899 Some(expr) if expr.op() == &Operator::Eq => {
900 Some((Arc::clone(expr.left()), Arc::clone(expr.right())))
901 }
902 _ => None,
903 })
904 });
905
906 for (lhs, rhs) in equal_pairs {
907 eq_properties.add_equal_conditions(lhs, rhs)?
908 }
909
910 Ok(())
911 }
912
913 #[deprecated(
922 since = "52.0.0",
923 note = "newlines_in_values has moved to CsvSource. Access it via CsvSource::csv_options().newlines_in_values instead. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
924 )]
925 pub fn newlines_in_values(&self) -> bool {
926 false
927 }
928
929 #[deprecated(
930 since = "52.0.0",
931 note = "This method is no longer used, use eq_properties instead. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
932 )]
933 pub fn projected_constraints(&self) -> Constraints {
934 let props = self.eq_properties();
935 props.constraints().clone()
936 }
937
938 #[deprecated(
939 since = "52.0.0",
940 note = "This method is no longer used, use eq_properties instead. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
941 )]
942 pub fn file_column_projection_indices(&self) -> Option<Vec<usize>> {
943 #[expect(deprecated)]
944 self.file_source.projection().as_ref().map(|p| {
945 p.ordered_column_indices()
946 .into_iter()
947 .filter(|&i| i < self.file_schema().fields().len())
948 .collect::<Vec<_>>()
949 })
950 }
951
952 pub fn split_groups_by_statistics_with_target_partitions(
974 table_schema: &SchemaRef,
975 file_groups: &[FileGroup],
976 sort_order: &LexOrdering,
977 target_partitions: usize,
978 ) -> Result<Vec<FileGroup>> {
979 if target_partitions == 0 {
980 return Err(internal_datafusion_err!(
981 "target_partitions must be greater than 0"
982 ));
983 }
984
985 let flattened_files = file_groups
986 .iter()
987 .flat_map(FileGroup::iter)
988 .collect::<Vec<_>>();
989
990 if flattened_files.is_empty() {
991 return Ok(vec![]);
992 }
993
994 let statistics = MinMaxStatistics::new_from_files(
995 sort_order,
996 table_schema,
997 None,
998 flattened_files.iter().copied(),
999 )?;
1000
1001 let indices_sorted_by_min = statistics.min_values_sorted();
1002
1003 let mut file_groups_indices: Vec<Vec<usize>> = vec![vec![]; target_partitions];
1005
1006 for (idx, min) in indices_sorted_by_min {
1007 if let Some((_, group)) = file_groups_indices
1008 .iter_mut()
1009 .enumerate()
1010 .filter(|(_, group)| {
1011 group.is_empty()
1012 || min
1013 > statistics
1014 .max(*group.last().expect("groups should not be empty"))
1015 })
1016 .min_by_key(|(_, group)| group.len())
1017 {
1018 group.push(idx);
1019 } else {
1020 file_groups_indices.push(vec![idx]);
1022 }
1023 }
1024
1025 file_groups_indices.retain(|group| !group.is_empty());
1027
1028 Ok(file_groups_indices
1030 .into_iter()
1031 .map(|file_group_indices| {
1032 FileGroup::new(
1033 file_group_indices
1034 .into_iter()
1035 .map(|idx| flattened_files[idx].clone())
1036 .collect(),
1037 )
1038 })
1039 .collect())
1040 }
1041
1042 pub fn split_groups_by_statistics(
1046 table_schema: &SchemaRef,
1047 file_groups: &[FileGroup],
1048 sort_order: &LexOrdering,
1049 ) -> Result<Vec<FileGroup>> {
1050 let flattened_files = file_groups
1051 .iter()
1052 .flat_map(FileGroup::iter)
1053 .collect::<Vec<_>>();
1054 if flattened_files.is_empty() {
1066 return Ok(vec![]);
1067 }
1068
1069 let statistics = MinMaxStatistics::new_from_files(
1070 sort_order,
1071 table_schema,
1072 None,
1073 flattened_files.iter().copied(),
1074 )
1075 .map_err(|e| {
1076 e.context("construct min/max statistics for split_groups_by_statistics")
1077 })?;
1078
1079 let indices_sorted_by_min = statistics.min_values_sorted();
1080 let mut file_groups_indices: Vec<Vec<usize>> = vec![];
1081
1082 for (idx, min) in indices_sorted_by_min {
1083 let file_group_to_insert = file_groups_indices.iter_mut().find(|group| {
1084 min > statistics.max(
1087 *group
1088 .last()
1089 .expect("groups should be nonempty at construction"),
1090 )
1091 });
1092 match file_group_to_insert {
1093 Some(group) => group.push(idx),
1094 None => file_groups_indices.push(vec![idx]),
1095 }
1096 }
1097
1098 Ok(file_groups_indices
1100 .into_iter()
1101 .map(|file_group_indices| {
1102 file_group_indices
1103 .into_iter()
1104 .map(|idx| flattened_files[idx].clone())
1105 .collect()
1106 })
1107 .collect())
1108 }
1109
1110 fn fmt_file_source(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
1112 write!(f, ", file_type={}", self.file_source.file_type())?;
1113 self.file_source.fmt_extra(t, f)
1114 }
1115
1116 pub fn file_source(&self) -> &Arc<dyn FileSource> {
1118 &self.file_source
1119 }
1120
1121 fn rebuild_with_source(
1123 &self,
1124 new_file_source: Arc<dyn FileSource>,
1125 is_exact: bool,
1126 ) -> Result<Arc<dyn DataSource>> {
1127 let mut new_config = self.clone();
1128
1129 new_config.file_groups = new_config
1131 .file_groups
1132 .into_iter()
1133 .map(|group| {
1134 let mut files = group.into_inner();
1135 files.reverse();
1136 files.into()
1137 })
1138 .collect();
1139
1140 new_config.file_source = new_file_source;
1141
1142 if !is_exact {
1145 new_config.output_ordering = vec![];
1146 }
1147
1148 Ok(Arc::new(new_config))
1149 }
1150}
1151
1152impl Debug for FileScanConfig {
1153 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1154 write!(f, "FileScanConfig {{")?;
1155 write!(f, "object_store_url={:?}, ", self.object_store_url)?;
1156
1157 write!(f, "statistics={:?}, ", self.statistics())?;
1158
1159 DisplayAs::fmt_as(self, DisplayFormatType::Verbose, f)?;
1160 write!(f, "}}")
1161 }
1162}
1163
1164impl DisplayAs for FileScanConfig {
1165 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
1166 let schema = self.projected_schema().map_err(|_| std::fmt::Error {})?;
1167 let orderings = get_projected_output_ordering(self, &schema);
1168
1169 write!(f, "file_groups=")?;
1170 FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
1171
1172 if !schema.fields().is_empty() {
1173 write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
1174 }
1175
1176 if let Some(limit) = self.limit {
1177 write!(f, ", limit={limit}")?;
1178 }
1179
1180 display_orderings(f, &orderings)?;
1181
1182 if !self.constraints.is_empty() {
1183 write!(f, ", {}", self.constraints)?;
1184 }
1185
1186 Ok(())
1187 }
1188}
1189
1190fn ordered_column_indices_from_projection(
1194 projection: &ProjectionExprs,
1195) -> Option<Vec<usize>> {
1196 projection
1197 .expr_iter()
1198 .map(|e| {
1199 let index = e.as_any().downcast_ref::<Column>()?.index();
1200 Some(index)
1201 })
1202 .collect::<Option<Vec<usize>>>()
1203}
1204
1205fn get_projected_output_ordering(
1265 base_config: &FileScanConfig,
1266 projected_schema: &SchemaRef,
1267) -> Vec<LexOrdering> {
1268 let projected_orderings =
1269 project_orderings(&base_config.output_ordering, projected_schema);
1270
1271 let mut all_orderings = vec![];
1272 for new_ordering in projected_orderings {
1273 if base_config.file_groups.iter().any(|group| {
1275 if group.len() <= 1 {
1276 return false;
1278 }
1279
1280 let Some(indices) = base_config
1281 .file_source
1282 .projection()
1283 .as_ref()
1284 .map(|p| ordered_column_indices_from_projection(p))
1285 else {
1286 return true;
1288 };
1289
1290 let statistics = match MinMaxStatistics::new_from_files(
1291 &new_ordering,
1292 projected_schema,
1293 indices.as_deref(),
1294 group.iter(),
1295 ) {
1296 Ok(statistics) => statistics,
1297 Err(e) => {
1298 log::trace!("Error fetching statistics for file group: {e}");
1299 return true;
1301 }
1302 };
1303
1304 !statistics.is_sorted()
1305 }) {
1306 debug!(
1307 "Skipping specified output ordering {:?}. \
1308 Some file groups couldn't be determined to be sorted: {:?}",
1309 base_config.output_ordering[0], base_config.file_groups
1310 );
1311 continue;
1312 }
1313
1314 all_orderings.push(new_ordering);
1315 }
1316 all_orderings
1317}
1318
1319pub fn wrap_partition_type_in_dict(val_type: DataType) -> DataType {
1330 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type))
1331}
1332
1333pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {
1337 ScalarValue::Dictionary(Box::new(DataType::UInt16), Box::new(val))
1338}
1339
1340#[cfg(test)]
1341mod tests {
1342 use std::collections::HashMap;
1343
1344 use super::*;
1345 use crate::TableSchema;
1346 use crate::test_util::col;
1347 use crate::{
1348 generate_test_files, test_util::MockSource, tests::aggr_test_schema,
1349 verify_sort_integrity,
1350 };
1351
1352 use arrow::datatypes::Field;
1353 use datafusion_common::stats::Precision;
1354 use datafusion_common::{ColumnStatistics, internal_err};
1355 use datafusion_expr::{Operator, SortExpr};
1356 use datafusion_physical_expr::create_physical_sort_expr;
1357 use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
1358 use datafusion_physical_expr::projection::ProjectionExpr;
1359 use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
1360
1361 #[test]
1362 fn physical_plan_config_no_projection_tab_cols_as_field() {
1363 let file_schema = aggr_test_schema();
1364
1365 let table_partition_col =
1367 Field::new("date", wrap_partition_type_in_dict(DataType::Utf8), true)
1368 .with_metadata(HashMap::from_iter(vec![(
1369 "key_whatever".to_owned(),
1370 "value_whatever".to_owned(),
1371 )]));
1372
1373 let conf = config_for_projection(
1374 Arc::clone(&file_schema),
1375 None,
1376 Statistics::new_unknown(&file_schema),
1377 vec![table_partition_col.clone()],
1378 );
1379
1380 let proj_schema = conf.projected_schema().unwrap();
1382 assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
1383 assert_eq!(
1384 *proj_schema.field(file_schema.fields().len()),
1385 table_partition_col,
1386 "partition columns are the last columns and ust have all values defined in created field"
1387 );
1388 }
1389
1390 #[test]
1391 fn test_split_groups_by_statistics() -> Result<()> {
1392 use chrono::TimeZone;
1393 use datafusion_common::DFSchema;
1394 use datafusion_expr::execution_props::ExecutionProps;
1395 use object_store::{ObjectMeta, path::Path};
1396
1397 struct File {
1398 name: &'static str,
1399 date: &'static str,
1400 statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
1401 }
1402 impl File {
1403 fn new(
1404 name: &'static str,
1405 date: &'static str,
1406 statistics: Vec<Option<(f64, f64)>>,
1407 ) -> Self {
1408 Self::new_nullable(
1409 name,
1410 date,
1411 statistics
1412 .into_iter()
1413 .map(|opt| opt.map(|(min, max)| (Some(min), Some(max))))
1414 .collect(),
1415 )
1416 }
1417
1418 fn new_nullable(
1419 name: &'static str,
1420 date: &'static str,
1421 statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
1422 ) -> Self {
1423 Self {
1424 name,
1425 date,
1426 statistics,
1427 }
1428 }
1429 }
1430
1431 struct TestCase {
1432 name: &'static str,
1433 file_schema: Schema,
1434 files: Vec<File>,
1435 sort: Vec<SortExpr>,
1436 expected_result: Result<Vec<Vec<&'static str>>, &'static str>,
1437 }
1438
1439 use datafusion_expr::col;
1440 let cases = vec![
1441 TestCase {
1442 name: "test sort",
1443 file_schema: Schema::new(vec![Field::new(
1444 "value".to_string(),
1445 DataType::Float64,
1446 false,
1447 )]),
1448 files: vec![
1449 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1450 File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1451 File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1452 ],
1453 sort: vec![col("value").sort(true, false)],
1454 expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1455 },
1456 TestCase {
1459 name: "test sort with files ordered differently",
1460 file_schema: Schema::new(vec![Field::new(
1461 "value".to_string(),
1462 DataType::Float64,
1463 false,
1464 )]),
1465 files: vec![
1466 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1467 File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1468 File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1469 ],
1470 sort: vec![col("value").sort(true, false)],
1471 expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1472 },
1473 TestCase {
1474 name: "reverse sort",
1475 file_schema: Schema::new(vec![Field::new(
1476 "value".to_string(),
1477 DataType::Float64,
1478 false,
1479 )]),
1480 files: vec![
1481 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1482 File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1483 File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1484 ],
1485 sort: vec![col("value").sort(false, true)],
1486 expected_result: Ok(vec![vec!["1", "0"], vec!["2"]]),
1487 },
1488 TestCase {
1489 name: "nullable sort columns, nulls last",
1490 file_schema: Schema::new(vec![Field::new(
1491 "value".to_string(),
1492 DataType::Float64,
1493 true,
1494 )]),
1495 files: vec![
1496 File::new_nullable(
1497 "0",
1498 "2023-01-01",
1499 vec![Some((Some(0.00), Some(0.49)))],
1500 ),
1501 File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), None))]),
1502 File::new_nullable("2", "2023-01-02", vec![Some((Some(0.00), None))]),
1503 ],
1504 sort: vec![col("value").sort(true, false)],
1505 expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1506 },
1507 TestCase {
1508 name: "nullable sort columns, nulls first",
1509 file_schema: Schema::new(vec![Field::new(
1510 "value".to_string(),
1511 DataType::Float64,
1512 true,
1513 )]),
1514 files: vec![
1515 File::new_nullable("0", "2023-01-01", vec![Some((None, Some(0.49)))]),
1516 File::new_nullable(
1517 "1",
1518 "2023-01-01",
1519 vec![Some((Some(0.50), Some(1.00)))],
1520 ),
1521 File::new_nullable("2", "2023-01-02", vec![Some((None, Some(1.00)))]),
1522 ],
1523 sort: vec![col("value").sort(true, true)],
1524 expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1525 },
1526 TestCase {
1527 name: "all three non-overlapping",
1528 file_schema: Schema::new(vec![Field::new(
1529 "value".to_string(),
1530 DataType::Float64,
1531 false,
1532 )]),
1533 files: vec![
1534 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1535 File::new("1", "2023-01-01", vec![Some((0.50, 0.99))]),
1536 File::new("2", "2023-01-02", vec![Some((1.00, 1.49))]),
1537 ],
1538 sort: vec![col("value").sort(true, false)],
1539 expected_result: Ok(vec![vec!["0", "1", "2"]]),
1540 },
1541 TestCase {
1542 name: "all three overlapping",
1543 file_schema: Schema::new(vec![Field::new(
1544 "value".to_string(),
1545 DataType::Float64,
1546 false,
1547 )]),
1548 files: vec![
1549 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1550 File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
1551 File::new("2", "2023-01-02", vec![Some((0.00, 0.49))]),
1552 ],
1553 sort: vec![col("value").sort(true, false)],
1554 expected_result: Ok(vec![vec!["0"], vec!["1"], vec!["2"]]),
1555 },
1556 TestCase {
1557 name: "empty input",
1558 file_schema: Schema::new(vec![Field::new(
1559 "value".to_string(),
1560 DataType::Float64,
1561 false,
1562 )]),
1563 files: vec![],
1564 sort: vec![col("value").sort(true, false)],
1565 expected_result: Ok(vec![]),
1566 },
1567 TestCase {
1568 name: "one file missing statistics",
1569 file_schema: Schema::new(vec![Field::new(
1570 "value".to_string(),
1571 DataType::Float64,
1572 false,
1573 )]),
1574 files: vec![
1575 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1576 File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
1577 File::new("2", "2023-01-02", vec![None]),
1578 ],
1579 sort: vec![col("value").sort(true, false)],
1580 expected_result: Err(
1581 "construct min/max statistics for split_groups_by_statistics\ncaused by\ncollect min/max values\ncaused by\nget min/max for column: 'value'\ncaused by\nError during planning: statistics not found",
1582 ),
1583 },
1584 ];
1585
1586 for case in cases {
1587 let table_schema = Arc::new(Schema::new(
1588 case.file_schema
1589 .fields()
1590 .clone()
1591 .into_iter()
1592 .cloned()
1593 .chain(Some(Arc::new(Field::new(
1594 "date".to_string(),
1595 DataType::Utf8,
1596 false,
1597 ))))
1598 .collect::<Vec<_>>(),
1599 ));
1600 let Some(sort_order) = LexOrdering::new(
1601 case.sort
1602 .into_iter()
1603 .map(|expr| {
1604 create_physical_sort_expr(
1605 &expr,
1606 &DFSchema::try_from(Arc::clone(&table_schema))?,
1607 &ExecutionProps::default(),
1608 )
1609 })
1610 .collect::<Result<Vec<_>>>()?,
1611 ) else {
1612 return internal_err!("This test should always use an ordering");
1613 };
1614
1615 let partitioned_files = FileGroup::new(
1616 case.files.into_iter().map(From::from).collect::<Vec<_>>(),
1617 );
1618 let result = FileScanConfig::split_groups_by_statistics(
1619 &table_schema,
1620 std::slice::from_ref(&partitioned_files),
1621 &sort_order,
1622 );
1623 let results_by_name = result
1624 .as_ref()
1625 .map(|file_groups| {
1626 file_groups
1627 .iter()
1628 .map(|file_group| {
1629 file_group
1630 .iter()
1631 .map(|file| {
1632 partitioned_files
1633 .iter()
1634 .find_map(|f| {
1635 if f.object_meta == file.object_meta {
1636 Some(
1637 f.object_meta
1638 .location
1639 .as_ref()
1640 .rsplit('/')
1641 .next()
1642 .unwrap()
1643 .trim_end_matches(".parquet"),
1644 )
1645 } else {
1646 None
1647 }
1648 })
1649 .unwrap()
1650 })
1651 .collect::<Vec<_>>()
1652 })
1653 .collect::<Vec<_>>()
1654 })
1655 .map_err(|e| e.strip_backtrace().leak() as &'static str);
1656
1657 assert_eq!(results_by_name, case.expected_result, "{}", case.name);
1658 }
1659
1660 return Ok(());
1661
1662 impl From<File> for PartitionedFile {
1663 fn from(file: File) -> Self {
1664 PartitionedFile {
1665 object_meta: ObjectMeta {
1666 location: Path::from(format!(
1667 "data/date={}/{}.parquet",
1668 file.date, file.name
1669 )),
1670 last_modified: chrono::Utc.timestamp_nanos(0),
1671 size: 0,
1672 e_tag: None,
1673 version: None,
1674 },
1675 partition_values: vec![ScalarValue::from(file.date)],
1676 range: None,
1677 statistics: Some(Arc::new(Statistics {
1678 num_rows: Precision::Absent,
1679 total_byte_size: Precision::Absent,
1680 column_statistics: file
1681 .statistics
1682 .into_iter()
1683 .map(|stats| {
1684 stats
1685 .map(|(min, max)| ColumnStatistics {
1686 min_value: Precision::Exact(
1687 ScalarValue::Float64(min),
1688 ),
1689 max_value: Precision::Exact(
1690 ScalarValue::Float64(max),
1691 ),
1692 ..Default::default()
1693 })
1694 .unwrap_or_default()
1695 })
1696 .collect::<Vec<_>>(),
1697 })),
1698 extensions: None,
1699 metadata_size_hint: None,
1700 }
1701 }
1702 }
1703 }
1704
1705 fn config_for_projection(
1707 file_schema: SchemaRef,
1708 projection: Option<Vec<usize>>,
1709 statistics: Statistics,
1710 table_partition_cols: Vec<Field>,
1711 ) -> FileScanConfig {
1712 let table_schema = TableSchema::new(
1713 file_schema,
1714 table_partition_cols.into_iter().map(Arc::new).collect(),
1715 );
1716 FileScanConfigBuilder::new(
1717 ObjectStoreUrl::parse("test:///").unwrap(),
1718 Arc::new(MockSource::new(table_schema.clone())),
1719 )
1720 .with_projection_indices(projection)
1721 .unwrap()
1722 .with_statistics(statistics)
1723 .build()
1724 }
1725
1726 #[test]
1727 fn test_file_scan_config_builder() {
1728 let file_schema = aggr_test_schema();
1729 let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
1730
1731 let table_schema = TableSchema::new(
1732 Arc::clone(&file_schema),
1733 vec![Arc::new(Field::new(
1734 "date",
1735 wrap_partition_type_in_dict(DataType::Utf8),
1736 false,
1737 ))],
1738 );
1739
1740 let file_source: Arc<dyn FileSource> =
1741 Arc::new(MockSource::new(table_schema.clone()));
1742
1743 let builder = FileScanConfigBuilder::new(
1745 object_store_url.clone(),
1746 Arc::clone(&file_source),
1747 );
1748
1749 let config = builder
1751 .with_limit(Some(1000))
1752 .with_projection_indices(Some(vec![0, 1]))
1753 .unwrap()
1754 .with_statistics(Statistics::new_unknown(&file_schema))
1755 .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new(
1756 "test.parquet".to_string(),
1757 1024,
1758 )])])
1759 .with_output_ordering(vec![
1760 [PhysicalSortExpr::new_default(Arc::new(Column::new(
1761 "date", 0,
1762 )))]
1763 .into(),
1764 ])
1765 .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
1766 .build();
1767
1768 assert_eq!(config.object_store_url, object_store_url);
1770 assert_eq!(*config.file_schema(), file_schema);
1771 assert_eq!(config.limit, Some(1000));
1772 assert_eq!(
1773 config
1774 .file_source
1775 .projection()
1776 .as_ref()
1777 .map(|p| p.column_indices()),
1778 Some(vec![0, 1])
1779 );
1780 assert_eq!(config.table_partition_cols().len(), 1);
1781 assert_eq!(config.table_partition_cols()[0].name(), "date");
1782 assert_eq!(config.file_groups.len(), 1);
1783 assert_eq!(config.file_groups[0].len(), 1);
1784 assert_eq!(
1785 config.file_groups[0][0].object_meta.location.as_ref(),
1786 "test.parquet"
1787 );
1788 assert_eq!(
1789 config.file_compression_type,
1790 FileCompressionType::UNCOMPRESSED
1791 );
1792 assert_eq!(config.output_ordering.len(), 1);
1793 }
1794
1795 #[test]
1796 fn equivalence_properties_after_schema_change() {
1797 let file_schema = aggr_test_schema();
1798 let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
1799
1800 let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
1801
1802 let file_source: Arc<dyn FileSource> = Arc::new(
1804 MockSource::new(table_schema.clone()).with_filter(Arc::new(BinaryExpr::new(
1805 col("c2", &file_schema).unwrap(),
1806 Operator::Eq,
1807 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1808 ))),
1809 );
1810
1811 let config = FileScanConfigBuilder::new(
1812 object_store_url.clone(),
1813 Arc::clone(&file_source),
1814 )
1815 .with_projection_indices(Some(vec![0, 1, 2]))
1816 .unwrap()
1817 .build();
1818
1819 let exprs = ProjectionExprs::new(vec![ProjectionExpr::new(
1822 col("c1", &file_schema).unwrap(),
1823 "c1",
1824 )]);
1825 let data_source = config
1826 .try_swapping_with_projection(&exprs)
1827 .unwrap()
1828 .unwrap();
1829
1830 let eq_properties = data_source.eq_properties();
1833 let eq_group = eq_properties.eq_group();
1834
1835 for class in eq_group.iter() {
1836 for expr in class.iter() {
1837 if let Some(col) = expr.as_any().downcast_ref::<Column>() {
1838 assert_ne!(
1839 col.name(),
1840 "c2",
1841 "c2 should not be present in any equivalence class"
1842 );
1843 }
1844 }
1845 }
1846 }
1847
1848 #[test]
1849 fn test_file_scan_config_builder_defaults() {
1850 let file_schema = aggr_test_schema();
1851 let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
1852
1853 let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
1854
1855 let file_source: Arc<dyn FileSource> =
1856 Arc::new(MockSource::new(table_schema.clone()));
1857
1858 let config = FileScanConfigBuilder::new(
1860 object_store_url.clone(),
1861 Arc::clone(&file_source),
1862 )
1863 .build();
1864
1865 assert_eq!(config.object_store_url, object_store_url);
1867 assert_eq!(*config.file_schema(), file_schema);
1868 assert_eq!(config.limit, None);
1869 let expected_projection: Vec<usize> = (0..file_schema.fields().len()).collect();
1872 assert_eq!(
1873 config
1874 .file_source
1875 .projection()
1876 .as_ref()
1877 .map(|p| p.column_indices()),
1878 Some(expected_projection)
1879 );
1880 assert!(config.table_partition_cols().is_empty());
1881 assert!(config.file_groups.is_empty());
1882 assert_eq!(
1883 config.file_compression_type,
1884 FileCompressionType::UNCOMPRESSED
1885 );
1886 assert!(config.output_ordering.is_empty());
1887 assert!(config.constraints.is_empty());
1888
1889 assert_eq!(config.statistics().num_rows, Precision::Absent);
1891 assert_eq!(config.statistics().total_byte_size, Precision::Absent);
1892 assert_eq!(
1893 config.statistics().column_statistics.len(),
1894 file_schema.fields().len()
1895 );
1896 for stat in config.statistics().column_statistics {
1897 assert_eq!(stat.distinct_count, Precision::Absent);
1898 assert_eq!(stat.min_value, Precision::Absent);
1899 assert_eq!(stat.max_value, Precision::Absent);
1900 assert_eq!(stat.null_count, Precision::Absent);
1901 }
1902 }
1903
1904 #[test]
1905 fn test_file_scan_config_builder_new_from() {
1906 let schema = aggr_test_schema();
1907 let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
1908 let partition_cols = vec![Field::new(
1909 "date",
1910 wrap_partition_type_in_dict(DataType::Utf8),
1911 false,
1912 )];
1913 let file = PartitionedFile::new("test_file.parquet", 100);
1914
1915 let table_schema = TableSchema::new(
1916 Arc::clone(&schema),
1917 partition_cols.iter().map(|f| Arc::new(f.clone())).collect(),
1918 );
1919
1920 let file_source: Arc<dyn FileSource> =
1921 Arc::new(MockSource::new(table_schema.clone()));
1922
1923 let original_config = FileScanConfigBuilder::new(
1925 object_store_url.clone(),
1926 Arc::clone(&file_source),
1927 )
1928 .with_projection_indices(Some(vec![0, 2]))
1929 .unwrap()
1930 .with_limit(Some(10))
1931 .with_file(file.clone())
1932 .with_constraints(Constraints::default())
1933 .build();
1934
1935 let new_builder = FileScanConfigBuilder::from(original_config);
1937
1938 let new_config = new_builder.build();
1940
1941 let partition_cols = partition_cols.into_iter().map(Arc::new).collect::<Vec<_>>();
1943 assert_eq!(new_config.object_store_url, object_store_url);
1944 assert_eq!(*new_config.file_schema(), schema);
1945 assert_eq!(
1946 new_config
1947 .file_source
1948 .projection()
1949 .as_ref()
1950 .map(|p| p.column_indices()),
1951 Some(vec![0, 2])
1952 );
1953 assert_eq!(new_config.limit, Some(10));
1954 assert_eq!(*new_config.table_partition_cols(), partition_cols);
1955 assert_eq!(new_config.file_groups.len(), 1);
1956 assert_eq!(new_config.file_groups[0].len(), 1);
1957 assert_eq!(
1958 new_config.file_groups[0][0].object_meta.location.as_ref(),
1959 "test_file.parquet"
1960 );
1961 assert_eq!(new_config.constraints, Constraints::default());
1962 }
1963
1964 #[test]
1965 fn test_split_groups_by_statistics_with_target_partitions() -> Result<()> {
1966 use datafusion_common::DFSchema;
1967 use datafusion_expr::{col, execution_props::ExecutionProps};
1968
1969 let schema = Arc::new(Schema::new(vec![Field::new(
1970 "value",
1971 DataType::Float64,
1972 false,
1973 )]));
1974
1975 let exec_props = ExecutionProps::new();
1977 let df_schema = DFSchema::try_from_qualified_schema("test", schema.as_ref())?;
1978 let sort_expr = [col("value").sort(true, false)];
1979 let sort_ordering = sort_expr
1980 .map(|expr| {
1981 create_physical_sort_expr(&expr, &df_schema, &exec_props).unwrap()
1982 })
1983 .into();
1984
1985 struct TestCase {
1987 name: String,
1988 file_count: usize,
1989 overlap_factor: f64,
1990 target_partitions: usize,
1991 expected_partition_count: usize,
1992 }
1993
1994 let test_cases = vec![
1995 TestCase {
1997 name: "no_overlap_10_files_4_partitions".to_string(),
1998 file_count: 10,
1999 overlap_factor: 0.0,
2000 target_partitions: 4,
2001 expected_partition_count: 4,
2002 },
2003 TestCase {
2004 name: "medium_overlap_20_files_5_partitions".to_string(),
2005 file_count: 20,
2006 overlap_factor: 0.5,
2007 target_partitions: 5,
2008 expected_partition_count: 5,
2009 },
2010 TestCase {
2011 name: "high_overlap_30_files_3_partitions".to_string(),
2012 file_count: 30,
2013 overlap_factor: 0.8,
2014 target_partitions: 3,
2015 expected_partition_count: 7,
2016 },
2017 TestCase {
2019 name: "fewer_files_than_partitions".to_string(),
2020 file_count: 3,
2021 overlap_factor: 0.0,
2022 target_partitions: 10,
2023 expected_partition_count: 3, },
2025 TestCase {
2026 name: "single_file".to_string(),
2027 file_count: 1,
2028 overlap_factor: 0.0,
2029 target_partitions: 5,
2030 expected_partition_count: 1, },
2032 TestCase {
2033 name: "empty_files".to_string(),
2034 file_count: 0,
2035 overlap_factor: 0.0,
2036 target_partitions: 3,
2037 expected_partition_count: 0, },
2039 ];
2040
2041 for case in test_cases {
2042 println!("Running test case: {}", case.name);
2043
2044 let file_groups = generate_test_files(case.file_count, case.overlap_factor);
2046
2047 let result =
2049 FileScanConfig::split_groups_by_statistics_with_target_partitions(
2050 &schema,
2051 &file_groups,
2052 &sort_ordering,
2053 case.target_partitions,
2054 )?;
2055
2056 println!(
2058 "Created {} partitions (target was {})",
2059 result.len(),
2060 case.target_partitions
2061 );
2062
2063 assert_eq!(
2065 result.len(),
2066 case.expected_partition_count,
2067 "Case '{}': Unexpected partition count",
2068 case.name
2069 );
2070
2071 assert!(
2073 verify_sort_integrity(&result),
2074 "Case '{}': Files within partitions are not properly ordered",
2075 case.name
2076 );
2077
2078 if case.file_count > 1 && case.expected_partition_count > 1 {
2080 let group_sizes: Vec<usize> = result.iter().map(FileGroup::len).collect();
2081 let max_size = *group_sizes.iter().max().unwrap();
2082 let min_size = *group_sizes.iter().min().unwrap();
2083
2084 let avg_files_per_partition =
2086 case.file_count as f64 / case.expected_partition_count as f64;
2087 assert!(
2088 (max_size as f64) < 2.0 * avg_files_per_partition,
2089 "Case '{}': Unbalanced distribution. Max partition size {} exceeds twice the average {}",
2090 case.name,
2091 max_size,
2092 avg_files_per_partition
2093 );
2094
2095 println!("Distribution - min files: {min_size}, max files: {max_size}");
2096 }
2097 }
2098
2099 let empty_groups: Vec<FileGroup> = vec![];
2101 let err = FileScanConfig::split_groups_by_statistics_with_target_partitions(
2102 &schema,
2103 &empty_groups,
2104 &sort_ordering,
2105 0,
2106 )
2107 .unwrap_err();
2108
2109 assert!(
2110 err.to_string()
2111 .contains("target_partitions must be greater than 0"),
2112 "Expected error for zero target partitions"
2113 );
2114
2115 Ok(())
2116 }
2117
2118 #[test]
2119 fn test_partition_statistics_projection() {
2120 use crate::source::DataSourceExec;
2126 use datafusion_physical_plan::ExecutionPlan;
2127
2128 let schema = Arc::new(Schema::new(vec![
2130 Field::new("col0", DataType::Int32, false),
2131 Field::new("col1", DataType::Int32, false),
2132 Field::new("col2", DataType::Int32, false),
2133 Field::new("col3", DataType::Int32, false),
2134 ]));
2135
2136 let file_group_stats = Statistics {
2138 num_rows: Precision::Exact(100),
2139 total_byte_size: Precision::Exact(1024),
2140 column_statistics: vec![
2141 ColumnStatistics {
2142 null_count: Precision::Exact(0),
2143 ..ColumnStatistics::new_unknown()
2144 },
2145 ColumnStatistics {
2146 null_count: Precision::Exact(5),
2147 ..ColumnStatistics::new_unknown()
2148 },
2149 ColumnStatistics {
2150 null_count: Precision::Exact(10),
2151 ..ColumnStatistics::new_unknown()
2152 },
2153 ColumnStatistics {
2154 null_count: Precision::Exact(15),
2155 ..ColumnStatistics::new_unknown()
2156 },
2157 ],
2158 };
2159
2160 let file_group = FileGroup::new(vec![PartitionedFile::new("test.parquet", 1024)])
2162 .with_statistics(Arc::new(file_group_stats));
2163
2164 let table_schema = TableSchema::new(Arc::clone(&schema), vec![]);
2165
2166 let config = FileScanConfigBuilder::new(
2168 ObjectStoreUrl::parse("test:///").unwrap(),
2169 Arc::new(MockSource::new(table_schema.clone())),
2170 )
2171 .with_projection_indices(Some(vec![0, 2]))
2172 .unwrap() .with_file_groups(vec![file_group])
2174 .build();
2175
2176 let exec = DataSourceExec::from_data_source(config);
2178
2179 let partition_stats = exec.partition_statistics(Some(0)).unwrap();
2181
2182 assert_eq!(
2184 partition_stats.column_statistics.len(),
2185 2,
2186 "Expected 2 column statistics (projected), but got {}",
2187 partition_stats.column_statistics.len()
2188 );
2189
2190 assert_eq!(
2192 partition_stats.column_statistics[0].null_count,
2193 Precision::Exact(0),
2194 "First projected column should be col0 with 0 nulls"
2195 );
2196 assert_eq!(
2197 partition_stats.column_statistics[1].null_count,
2198 Precision::Exact(10),
2199 "Second projected column should be col2 with 10 nulls"
2200 );
2201
2202 assert_eq!(partition_stats.num_rows, Precision::Exact(100));
2204 assert_eq!(partition_stats.total_byte_size, Precision::Exact(800));
2205 }
2206
2207 #[test]
2208 fn test_output_partitioning_not_partitioned_by_file_group() {
2209 let file_schema = aggr_test_schema();
2210 let partition_col =
2211 Field::new("date", wrap_partition_type_in_dict(DataType::Utf8), false);
2212
2213 let config = config_for_projection(
2214 Arc::clone(&file_schema),
2215 None,
2216 Statistics::new_unknown(&file_schema),
2217 vec![partition_col],
2218 );
2219
2220 let partitioning = config.output_partitioning();
2222 assert!(matches!(partitioning, Partitioning::UnknownPartitioning(_)));
2223 }
2224
2225 #[test]
2226 fn test_output_partitioning_no_partition_columns() {
2227 let file_schema = aggr_test_schema();
2228 let mut config = config_for_projection(
2229 Arc::clone(&file_schema),
2230 None,
2231 Statistics::new_unknown(&file_schema),
2232 vec![], );
2234 config.partitioned_by_file_group = true;
2235
2236 let partitioning = config.output_partitioning();
2237 assert!(matches!(partitioning, Partitioning::UnknownPartitioning(_)));
2238 }
2239
2240 #[test]
2241 fn test_output_partitioning_with_partition_columns() {
2242 let file_schema = aggr_test_schema();
2243
2244 let single_partition_col = vec![Field::new(
2246 "date",
2247 wrap_partition_type_in_dict(DataType::Utf8),
2248 false,
2249 )];
2250
2251 let mut config = config_for_projection(
2252 Arc::clone(&file_schema),
2253 None,
2254 Statistics::new_unknown(&file_schema),
2255 single_partition_col,
2256 );
2257 config.partitioned_by_file_group = true;
2258 config.file_groups = vec![
2259 FileGroup::new(vec![PartitionedFile::new("f1.parquet".to_string(), 1024)]),
2260 FileGroup::new(vec![PartitionedFile::new("f2.parquet".to_string(), 1024)]),
2261 FileGroup::new(vec![PartitionedFile::new("f3.parquet".to_string(), 1024)]),
2262 ];
2263
2264 let partitioning = config.output_partitioning();
2265 match partitioning {
2266 Partitioning::Hash(exprs, num_partitions) => {
2267 assert_eq!(num_partitions, 3);
2268 assert_eq!(exprs.len(), 1);
2269 assert_eq!(
2270 exprs[0].as_any().downcast_ref::<Column>().unwrap().name(),
2271 "date"
2272 );
2273 }
2274 _ => panic!("Expected Hash partitioning"),
2275 }
2276
2277 let multiple_partition_cols = vec![
2279 Field::new("year", wrap_partition_type_in_dict(DataType::Utf8), false),
2280 Field::new("month", wrap_partition_type_in_dict(DataType::Utf8), false),
2281 ];
2282
2283 config = config_for_projection(
2284 Arc::clone(&file_schema),
2285 None,
2286 Statistics::new_unknown(&file_schema),
2287 multiple_partition_cols,
2288 );
2289 config.partitioned_by_file_group = true;
2290 config.file_groups = vec![
2291 FileGroup::new(vec![PartitionedFile::new("f1.parquet".to_string(), 1024)]),
2292 FileGroup::new(vec![PartitionedFile::new("f2.parquet".to_string(), 1024)]),
2293 ];
2294
2295 let partitioning = config.output_partitioning();
2296 match partitioning {
2297 Partitioning::Hash(exprs, num_partitions) => {
2298 assert_eq!(num_partitions, 2);
2299 assert_eq!(exprs.len(), 2);
2300 let col_names: Vec<_> = exprs
2301 .iter()
2302 .map(|e| e.as_any().downcast_ref::<Column>().unwrap().name())
2303 .collect();
2304 assert_eq!(col_names, vec!["year", "month"]);
2305 }
2306 _ => panic!("Expected Hash partitioning"),
2307 }
2308 }
2309}