1pub(crate) mod sort_pushdown;
22
23use crate::file_groups::FileGroup;
24use crate::{
25 PartitionedFile, display::FileGroupsDisplay, file::FileSource,
26 file_compression_type::FileCompressionType, file_stream::FileStreamBuilder,
27 file_stream::work_source::SharedWorkSource, source::DataSource,
28 statistics::MinMaxStatistics,
29};
30use arrow::datatypes::FieldRef;
31use arrow::datatypes::{DataType, Schema, SchemaRef};
32use datafusion_common::config::ConfigOptions;
33use datafusion_common::{
34 Constraints, Result, ScalarValue, Statistics, internal_datafusion_err, internal_err,
35};
36use datafusion_execution::{
37 SendableRecordBatchStream, TaskContext, object_store::ObjectStoreUrl,
38};
39use datafusion_expr::Operator;
40
41use crate::source::OpenArgs;
42use datafusion_physical_expr::expressions::{BinaryExpr, Column};
43use datafusion_physical_expr::projection::ProjectionExprs;
44use datafusion_physical_expr::utils::reassign_expr_columns;
45use datafusion_physical_expr::{EquivalenceProperties, Partitioning, split_conjunction};
46use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
47use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
48use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
49use datafusion_physical_plan::SortOrderPushdownResult;
50use datafusion_physical_plan::coop::cooperative;
51use datafusion_physical_plan::execution_plan::SchedulingType;
52use datafusion_physical_plan::{
53 DisplayAs, DisplayFormatType,
54 display::{ProjectSchemaDisplay, display_orderings},
55 filter_pushdown::FilterPushdownPropagation,
56 metrics::ExecutionPlanMetricsSet,
57};
58use log::{debug, warn};
59use std::any::Any;
60use std::{fmt::Debug, fmt::Formatter, fmt::Result as FmtResult, sync::Arc};
61
62#[derive(Clone)]
140pub struct FileScanConfig {
141 pub object_store_url: ObjectStoreUrl,
153 pub file_groups: Vec<FileGroup>,
163 pub constraints: Constraints,
165 pub limit: Option<usize>,
168 pub preserve_order: bool,
173 pub output_ordering: Vec<LexOrdering>,
181 pub file_compression_type: FileCompressionType,
183 pub file_source: Arc<dyn FileSource>,
185 pub batch_size: Option<usize>,
188 pub expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
191 pub(crate) statistics: Statistics,
201 pub partitioned_by_file_group: bool,
209}
210
211#[derive(Clone)]
266pub struct FileScanConfigBuilder {
267 object_store_url: ObjectStoreUrl,
268 file_source: Arc<dyn FileSource>,
269 limit: Option<usize>,
270 preserve_order: bool,
271 constraints: Option<Constraints>,
272 file_groups: Vec<FileGroup>,
273 statistics: Option<Statistics>,
274 output_ordering: Vec<LexOrdering>,
275 file_compression_type: Option<FileCompressionType>,
276 batch_size: Option<usize>,
277 expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
278 partitioned_by_file_group: bool,
279}
280
281impl FileScanConfigBuilder {
282 pub fn new(
289 object_store_url: ObjectStoreUrl,
290 file_source: Arc<dyn FileSource>,
291 ) -> Self {
292 Self {
293 object_store_url,
294 file_source,
295 file_groups: vec![],
296 statistics: None,
297 output_ordering: vec![],
298 file_compression_type: None,
299 limit: None,
300 preserve_order: false,
301 constraints: None,
302 batch_size: None,
303 expr_adapter_factory: None,
304 partitioned_by_file_group: false,
305 }
306 }
307
308 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
312 self.limit = limit;
313 self
314 }
315
316 pub fn with_preserve_order(mut self, order_sensitive: bool) -> Self {
323 self.preserve_order = order_sensitive;
324 self
325 }
326
327 pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
332 self.file_source = file_source;
333 self
334 }
335
336 pub fn table_schema(&self) -> &SchemaRef {
338 self.file_source.table_schema().table_schema()
339 }
340
341 #[deprecated(since = "51.0.0", note = "Use with_projection_indices instead")]
347 pub fn with_projection(self, indices: Option<Vec<usize>>) -> Self {
348 match self.clone().with_projection_indices(indices) {
349 Ok(builder) => builder,
350 Err(e) => {
351 warn!(
352 "Failed to push down projection in FileScanConfigBuilder::with_projection: {e}"
353 );
354 self
355 }
356 }
357 }
358
359 pub fn with_projection_indices(
368 mut self,
369 indices: Option<Vec<usize>>,
370 ) -> Result<Self> {
371 let projection_exprs = indices.map(|indices| {
372 ProjectionExprs::from_indices(
373 &indices,
374 self.file_source.table_schema().table_schema(),
375 )
376 });
377 let Some(projection_exprs) = projection_exprs else {
378 return Ok(self);
379 };
380 let new_source = self
381 .file_source
382 .try_pushdown_projection(&projection_exprs)
383 .map_err(|e| {
384 internal_datafusion_err!(
385 "Failed to push down projection in FileScanConfigBuilder::build: {e}"
386 )
387 })?;
388 if let Some(new_source) = new_source {
389 self.file_source = new_source;
390 } else {
391 internal_err!(
392 "FileSource {} does not support projection pushdown",
393 self.file_source.file_type()
394 )?;
395 }
396 Ok(self)
397 }
398
399 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
401 self.constraints = Some(constraints);
402 self
403 }
404
405 pub fn with_statistics(mut self, statistics: Statistics) -> Self {
418 self.statistics = Some(statistics);
419 self
420 }
421
422 pub fn with_file_groups(mut self, file_groups: Vec<FileGroup>) -> Self {
432 self.file_groups = file_groups;
433 self
434 }
435
436 pub fn with_file_group(mut self, file_group: FileGroup) -> Self {
440 self.file_groups.push(file_group);
441 self
442 }
443
444 pub fn with_file(self, partitioned_file: PartitionedFile) -> Self {
448 self.with_file_group(FileGroup::new(vec![partitioned_file]))
449 }
450
451 pub fn with_output_ordering(mut self, output_ordering: Vec<LexOrdering>) -> Self {
460 self.output_ordering = output_ordering;
461 self
462 }
463
464 pub fn with_file_compression_type(
466 mut self,
467 file_compression_type: FileCompressionType,
468 ) -> Self {
469 self.file_compression_type = Some(file_compression_type);
470 self
471 }
472
473 pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
475 self.batch_size = batch_size;
476 self
477 }
478
479 pub fn with_expr_adapter(
486 mut self,
487 expr_adapter: Option<Arc<dyn PhysicalExprAdapterFactory>>,
488 ) -> Self {
489 self.expr_adapter_factory = expr_adapter;
490 self
491 }
492
493 pub fn with_partitioned_by_file_group(
498 mut self,
499 partitioned_by_file_group: bool,
500 ) -> Self {
501 self.partitioned_by_file_group = partitioned_by_file_group;
502 self
503 }
504
505 pub fn build(self) -> FileScanConfig {
513 let Self {
514 object_store_url,
515 file_source,
516 limit,
517 preserve_order,
518 constraints,
519 file_groups,
520 statistics,
521 output_ordering,
522 file_compression_type,
523 batch_size,
524 expr_adapter_factory: expr_adapter,
525 partitioned_by_file_group,
526 } = self;
527
528 let constraints = constraints.unwrap_or_default();
529 let statistics = statistics.unwrap_or_else(|| {
530 Statistics::new_unknown(file_source.table_schema().table_schema())
531 });
532 let file_compression_type =
533 file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED);
534
535 let preserve_order = preserve_order || !output_ordering.is_empty();
537
538 FileScanConfig {
539 object_store_url,
540 file_source,
541 limit,
542 preserve_order,
543 constraints,
544 file_groups,
545 output_ordering,
546 file_compression_type,
547 batch_size,
548 expr_adapter_factory: expr_adapter,
549 statistics,
550 partitioned_by_file_group,
551 }
552 }
553}
554
555impl From<FileScanConfig> for FileScanConfigBuilder {
556 fn from(config: FileScanConfig) -> Self {
557 Self {
558 object_store_url: config.object_store_url,
559 file_source: Arc::<dyn FileSource>::clone(&config.file_source),
560 file_groups: config.file_groups,
561 statistics: Some(config.statistics),
562 output_ordering: config.output_ordering,
563 file_compression_type: Some(config.file_compression_type),
564 limit: config.limit,
565 preserve_order: config.preserve_order,
566 constraints: Some(config.constraints),
567 batch_size: config.batch_size,
568 expr_adapter_factory: config.expr_adapter_factory,
569 partitioned_by_file_group: config.partitioned_by_file_group,
570 }
571 }
572}
573
574impl DataSource for FileScanConfig {
575 fn open(
576 &self,
577 partition: usize,
578 context: Arc<TaskContext>,
579 ) -> Result<SendableRecordBatchStream> {
580 self.open_with_args(OpenArgs::new(partition, context))
581 }
582
583 fn open_with_args(&self, args: OpenArgs) -> Result<SendableRecordBatchStream> {
584 let OpenArgs {
585 partition,
586 context,
587 sibling_state,
588 } = args;
589 let object_store = context.runtime_env().object_store(&self.object_store_url)?;
590 let batch_size = self
591 .batch_size
592 .unwrap_or_else(|| context.session_config().batch_size());
593
594 let source = self.file_source.with_batch_size(batch_size);
595
596 let morselizer = source.create_morselizer(object_store, self, partition)?;
597
598 let shared_work_source = sibling_state
602 .as_ref()
603 .and_then(|state| state.downcast_ref::<SharedWorkSource>())
604 .cloned();
605
606 let stream = FileStreamBuilder::new(self)
607 .with_partition(partition)
608 .with_shared_work_source(shared_work_source)
609 .with_morselizer(morselizer)
610 .with_metrics(source.metrics())
611 .build()?;
612 Ok(Box::pin(cooperative(stream)))
613 }
614
615 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
616 match t {
617 DisplayFormatType::Default | DisplayFormatType::Verbose => {
618 let schema = self.projected_schema().map_err(|_| std::fmt::Error {})?;
619 let orderings =
620 sort_pushdown::get_projected_output_ordering(self, &schema);
621
622 write!(f, "file_groups=")?;
623 FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
624
625 if !schema.fields().is_empty() {
626 if let Some(projection) = self.file_source.projection() {
627 let expr: Vec<String> = projection
630 .as_ref()
631 .iter()
632 .map(|proj_expr| {
633 if let Some(column) =
634 proj_expr.expr.downcast_ref::<Column>()
635 {
636 if column.name() == proj_expr.alias {
637 column.name().to_string()
638 } else {
639 format!(
640 "{} as {}",
641 proj_expr.expr, proj_expr.alias
642 )
643 }
644 } else {
645 format!("{} as {}", proj_expr.expr, proj_expr.alias)
646 }
647 })
648 .collect();
649 write!(f, ", projection=[{}]", expr.join(", "))?;
650 } else {
651 write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
652 }
653 }
654
655 if let Some(limit) = self.limit {
656 write!(f, ", limit={limit}")?;
657 }
658
659 display_orderings(f, &orderings)?;
660
661 if !self.constraints.is_empty() {
662 write!(f, ", {}", self.constraints)?;
663 }
664
665 self.fmt_file_source(t, f)
666 }
667 DisplayFormatType::TreeRender => {
668 writeln!(f, "format={}", self.file_source.file_type())?;
669 self.file_source.fmt_extra(t, f)?;
670 let num_files = self.file_groups.iter().map(|fg| fg.len()).sum::<usize>();
671 writeln!(f, "files={num_files}")?;
672 Ok(())
673 }
674 }
675 }
676
677 fn repartitioned(
679 &self,
680 target_partitions: usize,
681 repartition_file_min_size: usize,
682 output_ordering: Option<LexOrdering>,
683 ) -> Result<Option<Arc<dyn DataSource>>> {
684 if self.partitioned_by_file_group {
688 return Ok(None);
689 }
690
691 let source = self.file_source.repartitioned(
692 target_partitions,
693 repartition_file_min_size,
694 output_ordering,
695 self,
696 )?;
697
698 Ok(source.map(|s| Arc::new(s) as _))
699 }
700
701 fn output_partitioning(&self) -> Partitioning {
718 if self.partitioned_by_file_group {
719 let partition_cols = self.table_partition_cols();
720 if !partition_cols.is_empty() {
721 let projected_schema = match self.projected_schema() {
722 Ok(schema) => schema,
723 Err(_) => {
724 debug!(
725 "Could not get projected schema, falling back to UnknownPartitioning."
726 );
727 return Partitioning::UnknownPartitioning(self.file_groups.len());
728 }
729 };
730
731 let mut exprs: Vec<Arc<dyn PhysicalExpr>> = Vec::new();
734 for partition_col in partition_cols {
735 if let Some((idx, _)) = projected_schema
736 .fields()
737 .iter()
738 .enumerate()
739 .find(|(_, f)| f.name() == partition_col.name())
740 {
741 exprs.push(Arc::new(Column::new(partition_col.name(), idx)));
742 }
743 }
744
745 if exprs.len() == partition_cols.len() {
746 return Partitioning::Hash(exprs, self.file_groups.len());
747 }
748 }
749 }
750 Partitioning::UnknownPartitioning(self.file_groups.len())
751 }
752
753 fn eq_properties(&self) -> EquivalenceProperties {
757 let schema = self.file_source.table_schema().table_schema();
758 let mut eq_properties = EquivalenceProperties::new_with_orderings(
759 Arc::clone(schema),
760 self.validated_output_ordering(),
761 )
762 .with_constraints(self.constraints.clone());
763
764 if let Some(filter) = self.file_source.filter() {
765 match Self::add_filter_equivalence_info(&filter, &mut eq_properties, schema) {
768 Ok(()) => {}
769 Err(e) => {
770 warn!("Failed to add filter equivalence info: {e}");
771 #[cfg(debug_assertions)]
772 panic!("Failed to add filter equivalence info: {e}");
773 }
774 }
775 }
776
777 if let Some(projection) = self.file_source.projection() {
778 match (
779 projection.project_schema(schema),
780 projection.projection_mapping(schema),
781 ) {
782 (Ok(output_schema), Ok(mapping)) => {
783 eq_properties =
784 eq_properties.project(&mapping, Arc::new(output_schema));
785 }
786 (Err(e), _) | (_, Err(e)) => {
787 warn!("Failed to project equivalence properties: {e}");
788 #[cfg(debug_assertions)]
789 panic!("Failed to project equivalence properties: {e}");
790 }
791 }
792 }
793
794 eq_properties
795 }
796
797 fn scheduling_type(&self) -> SchedulingType {
798 SchedulingType::Cooperative
799 }
800
801 fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
802 if let Some(partition) = partition {
803 if let Some(file_group) = self.file_groups.get(partition)
806 && let Some(stat) = file_group.file_statistics(None)
807 {
808 let output_schema = self.projected_schema()?;
810 return if let Some(projection) = self.file_source.projection() {
811 Ok(Arc::new(
812 projection.project_statistics(stat.clone(), &output_schema)?,
813 ))
814 } else {
815 Ok(Arc::new(stat.clone()))
816 };
817 }
818 Ok(Arc::new(Statistics::new_unknown(
820 self.projected_schema()?.as_ref(),
821 )))
822 } else {
823 let statistics = self.statistics();
825 let projection = self.file_source.projection();
826 let output_schema = self.projected_schema()?;
827 if let Some(projection) = &projection {
828 Ok(Arc::new(
829 projection.project_statistics(statistics.clone(), &output_schema)?,
830 ))
831 } else {
832 Ok(Arc::new(statistics))
833 }
834 }
835 }
836
837 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
838 let source = FileScanConfigBuilder::from(self.clone())
839 .with_limit(limit)
840 .build();
841 Some(Arc::new(source))
842 }
843
844 fn fetch(&self) -> Option<usize> {
845 self.limit
846 }
847
848 fn metrics(&self) -> ExecutionPlanMetricsSet {
849 self.file_source.metrics().clone()
850 }
851
852 fn try_swapping_with_projection(
853 &self,
854 projection: &ProjectionExprs,
855 ) -> Result<Option<Arc<dyn DataSource>>> {
856 match self.file_source.try_pushdown_projection(projection)? {
857 Some(new_source) => {
858 let mut new_file_scan_config = self.clone();
859 new_file_scan_config.file_source = new_source;
860 Ok(Some(Arc::new(new_file_scan_config) as Arc<dyn DataSource>))
861 }
862 None => Ok(None),
863 }
864 }
865
866 fn try_pushdown_filters(
867 &self,
868 filters: Vec<Arc<dyn PhysicalExpr>>,
869 config: &ConfigOptions,
870 ) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
871 let table_schema = self.file_source.table_schema().table_schema();
880 let filters_to_remap = if let Some(projection) = self.file_source.projection() {
881 filters
882 .into_iter()
883 .map(|filter| projection.unproject_expr(&filter))
884 .collect::<Result<Vec<_>>>()?
885 } else {
886 filters
887 };
888 let remapped_filters = filters_to_remap
890 .into_iter()
891 .map(|filter| reassign_expr_columns(filter, table_schema))
892 .collect::<Result<Vec<_>>>()?;
893
894 let result = self
895 .file_source
896 .try_pushdown_filters(remapped_filters, config)?;
897 match result.updated_node {
898 Some(new_file_source) => {
899 let mut new_file_scan_config = self.clone();
900 new_file_scan_config.file_source = new_file_source;
901 Ok(FilterPushdownPropagation {
902 filters: result.filters,
903 updated_node: Some(Arc::new(new_file_scan_config) as _),
904 })
905 }
906 None => {
907 Ok(FilterPushdownPropagation {
909 filters: result.filters,
910 updated_node: None,
911 })
912 }
913 }
914 }
915
916 fn try_pushdown_sort(
957 &self,
958 order: &[PhysicalSortExpr],
959 ) -> Result<SortOrderPushdownResult<Arc<dyn DataSource>>> {
960 let pushdown_result = self
961 .file_source
962 .try_pushdown_sort(order, &self.eq_properties())?;
963
964 match pushdown_result {
965 SortOrderPushdownResult::Exact { inner } => {
966 let config = self.rebuild_with_source(inner, true, order)?;
967 if config.output_ordering.is_empty() {
971 Ok(SortOrderPushdownResult::Inexact {
972 inner: Arc::new(config),
973 })
974 } else {
975 Ok(SortOrderPushdownResult::Exact {
976 inner: Arc::new(config),
977 })
978 }
979 }
980 SortOrderPushdownResult::Inexact { inner } => {
981 let mut config = self.rebuild_with_source(inner, false, order)?;
982 if config.output_ordering.is_empty() {
991 return Ok(SortOrderPushdownResult::Inexact {
992 inner: Arc::new(config),
993 });
994 }
995 config.file_source = Arc::clone(&self.file_source);
1015 Ok(SortOrderPushdownResult::Exact {
1016 inner: Arc::new(config),
1017 })
1018 }
1019 SortOrderPushdownResult::Unsupported => {
1020 self.try_sort_file_groups_by_statistics(order)
1021 }
1022 }
1023 }
1024
1025 fn with_preserve_order(&self, preserve_order: bool) -> Option<Arc<dyn DataSource>> {
1026 if self.preserve_order == preserve_order {
1027 return Some(Arc::new(self.clone()));
1028 }
1029
1030 let new_config = FileScanConfig {
1031 preserve_order,
1032 ..self.clone()
1033 };
1034 Some(Arc::new(new_config))
1035 }
1036
1037 fn create_sibling_state(&self) -> Option<Arc<dyn Any + Send + Sync>> {
1044 if self.preserve_order || self.partitioned_by_file_group {
1045 return None;
1046 }
1047
1048 Some(Arc::new(SharedWorkSource::from_config(self)) as Arc<dyn Any + Send + Sync>)
1049 }
1050}
1051
1052impl FileScanConfig {
1053 fn validated_output_ordering(&self) -> Vec<LexOrdering> {
1083 let schema = self.file_source.table_schema().table_schema();
1084 sort_pushdown::validate_orderings(
1085 &self.output_ordering,
1086 schema,
1087 &self.file_groups,
1088 None,
1089 )
1090 }
1091
1092 pub fn file_schema(&self) -> &SchemaRef {
1094 self.file_source.table_schema().file_schema()
1095 }
1096
1097 pub fn table_partition_cols(&self) -> &Vec<FieldRef> {
1099 self.file_source.table_schema().table_partition_cols()
1100 }
1101
1102 pub fn statistics(&self) -> Statistics {
1108 if self.file_source.filter().is_some() {
1109 self.statistics.clone().to_inexact()
1110 } else {
1111 self.statistics.clone()
1112 }
1113 }
1114
1115 pub fn projected_schema(&self) -> Result<Arc<Schema>> {
1116 let schema = self.file_source.table_schema().table_schema();
1117 match self.file_source.projection() {
1118 Some(proj) => Ok(Arc::new(proj.project_schema(schema)?)),
1119 None => Ok(Arc::clone(schema)),
1120 }
1121 }
1122
1123 fn add_filter_equivalence_info(
1124 filter: &Arc<dyn PhysicalExpr>,
1125 eq_properties: &mut EquivalenceProperties,
1126 schema: &Schema,
1127 ) -> Result<()> {
1128 let equal_pairs = split_conjunction(filter).into_iter().filter_map(|expr| {
1130 reassign_expr_columns(Arc::clone(expr), schema)
1133 .ok()
1134 .and_then(|expr| match expr.downcast_ref::<BinaryExpr>() {
1135 Some(expr) if expr.op() == &Operator::Eq => {
1136 Some((Arc::clone(expr.left()), Arc::clone(expr.right())))
1137 }
1138 _ => None,
1139 })
1140 });
1141
1142 for (lhs, rhs) in equal_pairs {
1143 eq_properties.add_equal_conditions(lhs, rhs)?
1144 }
1145
1146 Ok(())
1147 }
1148
1149 #[deprecated(
1158 since = "52.0.0",
1159 note = "newlines_in_values has moved to CsvSource. Access it via CsvSource::csv_options().newlines_in_values instead. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
1160 )]
1161 pub fn newlines_in_values(&self) -> bool {
1162 false
1163 }
1164
1165 #[deprecated(
1166 since = "52.0.0",
1167 note = "This method is no longer used, use eq_properties instead. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
1168 )]
1169 pub fn projected_constraints(&self) -> Constraints {
1170 let props = self.eq_properties();
1171 props.constraints().clone()
1172 }
1173
1174 #[deprecated(
1175 since = "52.0.0",
1176 note = "This method is no longer used, use eq_properties instead. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
1177 )]
1178 pub fn file_column_projection_indices(&self) -> Option<Vec<usize>> {
1179 #[expect(deprecated)]
1180 self.file_source.projection().as_ref().map(|p| {
1181 p.ordered_column_indices()
1182 .into_iter()
1183 .filter(|&i| i < self.file_schema().fields().len())
1184 .collect::<Vec<_>>()
1185 })
1186 }
1187
1188 pub fn split_groups_by_statistics_with_target_partitions(
1210 table_schema: &SchemaRef,
1211 file_groups: &[FileGroup],
1212 sort_order: &LexOrdering,
1213 target_partitions: usize,
1214 ) -> Result<Vec<FileGroup>> {
1215 if target_partitions == 0 {
1216 return Err(internal_datafusion_err!(
1217 "target_partitions must be greater than 0"
1218 ));
1219 }
1220
1221 let flattened_files = file_groups
1222 .iter()
1223 .flat_map(FileGroup::iter)
1224 .collect::<Vec<_>>();
1225
1226 if flattened_files.is_empty() {
1227 return Ok(vec![]);
1228 }
1229
1230 let statistics = MinMaxStatistics::new_from_files(
1231 sort_order,
1232 table_schema,
1233 None,
1234 flattened_files.iter().copied(),
1235 )?;
1236
1237 let indices_sorted_by_min = statistics.min_values_sorted();
1238
1239 let mut file_groups_indices: Vec<Vec<usize>> = vec![vec![]; target_partitions];
1241
1242 for (idx, min) in indices_sorted_by_min {
1243 if let Some((_, group)) = file_groups_indices
1244 .iter_mut()
1245 .enumerate()
1246 .filter(|(_, group)| {
1247 group.is_empty()
1248 || min
1249 > statistics
1250 .max(*group.last().expect("groups should not be empty"))
1251 })
1252 .min_by_key(|(_, group)| group.len())
1253 {
1254 group.push(idx);
1255 } else {
1256 file_groups_indices.push(vec![idx]);
1258 }
1259 }
1260
1261 file_groups_indices.retain(|group| !group.is_empty());
1263
1264 Ok(file_groups_indices
1266 .into_iter()
1267 .map(|file_group_indices| {
1268 FileGroup::new(
1269 file_group_indices
1270 .into_iter()
1271 .map(|idx| flattened_files[idx].clone())
1272 .collect(),
1273 )
1274 })
1275 .collect())
1276 }
1277
1278 pub fn split_groups_by_statistics(
1282 table_schema: &SchemaRef,
1283 file_groups: &[FileGroup],
1284 sort_order: &LexOrdering,
1285 ) -> Result<Vec<FileGroup>> {
1286 let flattened_files = file_groups
1287 .iter()
1288 .flat_map(FileGroup::iter)
1289 .collect::<Vec<_>>();
1290 if flattened_files.is_empty() {
1302 return Ok(vec![]);
1303 }
1304
1305 let statistics = MinMaxStatistics::new_from_files(
1306 sort_order,
1307 table_schema,
1308 None,
1309 flattened_files.iter().copied(),
1310 )
1311 .map_err(|e| {
1312 e.context("construct min/max statistics for split_groups_by_statistics")
1313 })?;
1314
1315 let indices_sorted_by_min = statistics.min_values_sorted();
1316 let mut file_groups_indices: Vec<Vec<usize>> = vec![];
1317
1318 for (idx, min) in indices_sorted_by_min {
1319 let file_group_to_insert = file_groups_indices.iter_mut().find(|group| {
1320 min > statistics.max(
1323 *group
1324 .last()
1325 .expect("groups should be nonempty at construction"),
1326 )
1327 });
1328 match file_group_to_insert {
1329 Some(group) => group.push(idx),
1330 None => file_groups_indices.push(vec![idx]),
1331 }
1332 }
1333
1334 Ok(file_groups_indices
1336 .into_iter()
1337 .map(|file_group_indices| {
1338 file_group_indices
1339 .into_iter()
1340 .map(|idx| flattened_files[idx].clone())
1341 .collect()
1342 })
1343 .collect())
1344 }
1345
1346 fn fmt_file_source(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
1348 write!(f, ", file_type={}", self.file_source.file_type())?;
1349 self.file_source.fmt_extra(t, f)
1350 }
1351
1352 pub fn file_source(&self) -> &Arc<dyn FileSource> {
1354 &self.file_source
1355 }
1356
1357 }
1361
1362impl Debug for FileScanConfig {
1363 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1364 write!(f, "FileScanConfig {{")?;
1365 write!(f, "object_store_url={:?}, ", self.object_store_url)?;
1366
1367 write!(f, "statistics={:?}, ", self.statistics())?;
1368
1369 DisplayAs::fmt_as(self, DisplayFormatType::Verbose, f)?;
1370 write!(f, "}}")
1371 }
1372}
1373
1374impl DisplayAs for FileScanConfig {
1375 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
1376 let schema = self.projected_schema().map_err(|_| std::fmt::Error {})?;
1377 let orderings = sort_pushdown::get_projected_output_ordering(self, &schema);
1378
1379 write!(f, "file_groups=")?;
1380 FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
1381
1382 if !schema.fields().is_empty() {
1383 write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
1384 }
1385
1386 if let Some(limit) = self.limit {
1387 write!(f, ", limit={limit}")?;
1388 }
1389
1390 display_orderings(f, &orderings)?;
1391
1392 if !self.constraints.is_empty() {
1393 write!(f, ", {}", self.constraints)?;
1394 }
1395
1396 Ok(())
1397 }
1398}
1399
1400pub fn wrap_partition_type_in_dict(val_type: DataType) -> DataType {
1411 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type))
1412}
1413
1414pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {
1418 ScalarValue::Dictionary(Box::new(DataType::UInt16), Box::new(val))
1419}
1420
1421#[cfg(test)]
1422mod tests {
1423 use std::collections::HashMap;
1424
1425 use super::*;
1426 use crate::TableSchema;
1427 use crate::source::DataSourceExec;
1428 use crate::test_util::col;
1429 use crate::{
1430 generate_test_files, test_util::MockSource, tests::aggr_test_schema,
1431 verify_sort_integrity,
1432 };
1433
1434 use arrow::array::{Int32Array, RecordBatch};
1435 use arrow::datatypes::Field;
1436 use datafusion_common::ColumnStatistics;
1437 use datafusion_common::stats::Precision;
1438 use datafusion_common::{Result, assert_batches_eq, internal_err};
1439 use datafusion_execution::TaskContext;
1440 use datafusion_expr::SortExpr;
1441 use datafusion_physical_expr::create_physical_sort_expr;
1442 use datafusion_physical_expr::expressions::Literal;
1443 use datafusion_physical_expr::projection::ProjectionExpr;
1444 use datafusion_physical_expr::projection::ProjectionExprs;
1445 use datafusion_physical_plan::ExecutionPlan;
1446 use datafusion_physical_plan::execution_plan::collect;
1447 use futures::FutureExt as _;
1448 use futures::StreamExt as _;
1449 use futures::stream;
1450 use object_store::ObjectStore;
1451 use std::fmt::Debug;
1452
1453 #[derive(Clone)]
1454 struct InexactSortPushdownSource {
1455 metrics: ExecutionPlanMetricsSet,
1456 table_schema: TableSchema,
1457 }
1458
1459 impl InexactSortPushdownSource {
1460 fn new(table_schema: TableSchema) -> Self {
1461 Self {
1462 metrics: ExecutionPlanMetricsSet::new(),
1463 table_schema,
1464 }
1465 }
1466 }
1467
1468 impl FileSource for InexactSortPushdownSource {
1469 fn create_file_opener(
1470 &self,
1471 _object_store: Arc<dyn ObjectStore>,
1472 _base_config: &FileScanConfig,
1473 _partition: usize,
1474 ) -> Result<Arc<dyn crate::file_stream::FileOpener>> {
1475 unimplemented!()
1476 }
1477
1478 fn table_schema(&self) -> &TableSchema {
1479 &self.table_schema
1480 }
1481
1482 fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
1483 Arc::new(self.clone())
1484 }
1485
1486 fn metrics(&self) -> &ExecutionPlanMetricsSet {
1487 &self.metrics
1488 }
1489
1490 fn file_type(&self) -> &str {
1491 "mock"
1492 }
1493
1494 fn try_pushdown_sort(
1495 &self,
1496 _order: &[PhysicalSortExpr],
1497 _eq_properties: &EquivalenceProperties,
1498 ) -> Result<SortOrderPushdownResult<Arc<dyn FileSource>>> {
1499 Ok(SortOrderPushdownResult::Inexact {
1500 inner: Arc::new(self.clone()) as Arc<dyn FileSource>,
1501 })
1502 }
1503 }
1504
1505 #[test]
1506 fn physical_plan_config_no_projection_tab_cols_as_field() {
1507 let file_schema = aggr_test_schema();
1508
1509 let table_partition_col =
1511 Field::new("date", wrap_partition_type_in_dict(DataType::Utf8), true)
1512 .with_metadata(HashMap::from_iter(vec![(
1513 "key_whatever".to_owned(),
1514 "value_whatever".to_owned(),
1515 )]));
1516
1517 let conf = config_for_projection(
1518 Arc::clone(&file_schema),
1519 None,
1520 Statistics::new_unknown(&file_schema),
1521 vec![table_partition_col.clone()],
1522 );
1523
1524 let proj_schema = conf.projected_schema().unwrap();
1526 assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
1527 assert_eq!(
1528 *proj_schema.field(file_schema.fields().len()),
1529 table_partition_col,
1530 "partition columns are the last columns and ust have all values defined in created field"
1531 );
1532 }
1533
1534 #[test]
1535 fn test_split_groups_by_statistics() -> Result<()> {
1536 use chrono::TimeZone;
1537 use datafusion_common::DFSchema;
1538 use datafusion_expr::execution_props::ExecutionProps;
1539 use object_store::{ObjectMeta, path::Path};
1540
1541 struct File {
1542 name: &'static str,
1543 date: &'static str,
1544 statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
1545 }
1546 impl File {
1547 fn new(
1548 name: &'static str,
1549 date: &'static str,
1550 statistics: Vec<Option<(f64, f64)>>,
1551 ) -> Self {
1552 Self::new_nullable(
1553 name,
1554 date,
1555 statistics
1556 .into_iter()
1557 .map(|opt| opt.map(|(min, max)| (Some(min), Some(max))))
1558 .collect(),
1559 )
1560 }
1561
1562 fn new_nullable(
1563 name: &'static str,
1564 date: &'static str,
1565 statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
1566 ) -> Self {
1567 Self {
1568 name,
1569 date,
1570 statistics,
1571 }
1572 }
1573 }
1574
1575 struct TestCase {
1576 name: &'static str,
1577 file_schema: Schema,
1578 files: Vec<File>,
1579 sort: Vec<SortExpr>,
1580 expected_result: Result<Vec<Vec<&'static str>>, &'static str>,
1581 }
1582
1583 use datafusion_expr::col;
1584 let cases = vec![
1585 TestCase {
1586 name: "test sort",
1587 file_schema: Schema::new(vec![Field::new(
1588 "value".to_string(),
1589 DataType::Float64,
1590 false,
1591 )]),
1592 files: vec![
1593 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1594 File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1595 File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1596 ],
1597 sort: vec![col("value").sort(true, false)],
1598 expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1599 },
1600 TestCase {
1603 name: "test sort with files ordered differently",
1604 file_schema: Schema::new(vec![Field::new(
1605 "value".to_string(),
1606 DataType::Float64,
1607 false,
1608 )]),
1609 files: vec![
1610 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1611 File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1612 File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1613 ],
1614 sort: vec![col("value").sort(true, false)],
1615 expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1616 },
1617 TestCase {
1618 name: "reverse sort",
1619 file_schema: Schema::new(vec![Field::new(
1620 "value".to_string(),
1621 DataType::Float64,
1622 false,
1623 )]),
1624 files: vec![
1625 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1626 File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1627 File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1628 ],
1629 sort: vec![col("value").sort(false, true)],
1630 expected_result: Ok(vec![vec!["1", "0"], vec!["2"]]),
1631 },
1632 TestCase {
1633 name: "nullable sort columns, nulls last",
1634 file_schema: Schema::new(vec![Field::new(
1635 "value".to_string(),
1636 DataType::Float64,
1637 true,
1638 )]),
1639 files: vec![
1640 File::new_nullable(
1641 "0",
1642 "2023-01-01",
1643 vec![Some((Some(0.00), Some(0.49)))],
1644 ),
1645 File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), None))]),
1646 File::new_nullable("2", "2023-01-02", vec![Some((Some(0.00), None))]),
1647 ],
1648 sort: vec![col("value").sort(true, false)],
1649 expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1650 },
1651 TestCase {
1652 name: "nullable sort columns, nulls first",
1653 file_schema: Schema::new(vec![Field::new(
1654 "value".to_string(),
1655 DataType::Float64,
1656 true,
1657 )]),
1658 files: vec![
1659 File::new_nullable("0", "2023-01-01", vec![Some((None, Some(0.49)))]),
1660 File::new_nullable(
1661 "1",
1662 "2023-01-01",
1663 vec![Some((Some(0.50), Some(1.00)))],
1664 ),
1665 File::new_nullable("2", "2023-01-02", vec![Some((None, Some(1.00)))]),
1666 ],
1667 sort: vec![col("value").sort(true, true)],
1668 expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1669 },
1670 TestCase {
1671 name: "all three non-overlapping",
1672 file_schema: Schema::new(vec![Field::new(
1673 "value".to_string(),
1674 DataType::Float64,
1675 false,
1676 )]),
1677 files: vec![
1678 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1679 File::new("1", "2023-01-01", vec![Some((0.50, 0.99))]),
1680 File::new("2", "2023-01-02", vec![Some((1.00, 1.49))]),
1681 ],
1682 sort: vec![col("value").sort(true, false)],
1683 expected_result: Ok(vec![vec!["0", "1", "2"]]),
1684 },
1685 TestCase {
1686 name: "all three overlapping",
1687 file_schema: Schema::new(vec![Field::new(
1688 "value".to_string(),
1689 DataType::Float64,
1690 false,
1691 )]),
1692 files: vec![
1693 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1694 File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
1695 File::new("2", "2023-01-02", vec![Some((0.00, 0.49))]),
1696 ],
1697 sort: vec![col("value").sort(true, false)],
1698 expected_result: Ok(vec![vec!["0"], vec!["1"], vec!["2"]]),
1699 },
1700 TestCase {
1701 name: "empty input",
1702 file_schema: Schema::new(vec![Field::new(
1703 "value".to_string(),
1704 DataType::Float64,
1705 false,
1706 )]),
1707 files: vec![],
1708 sort: vec![col("value").sort(true, false)],
1709 expected_result: Ok(vec![]),
1710 },
1711 TestCase {
1712 name: "one file missing statistics",
1713 file_schema: Schema::new(vec![Field::new(
1714 "value".to_string(),
1715 DataType::Float64,
1716 false,
1717 )]),
1718 files: vec![
1719 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1720 File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
1721 File::new("2", "2023-01-02", vec![None]),
1722 ],
1723 sort: vec![col("value").sort(true, false)],
1724 expected_result: Err(
1725 "construct min/max statistics for split_groups_by_statistics\ncaused by\ncollect min/max values\ncaused by\nget min/max for column: 'value'\ncaused by\nError during planning: statistics not found",
1726 ),
1727 },
1728 ];
1729
1730 for case in cases {
1731 let table_schema = Arc::new(Schema::new(
1732 case.file_schema
1733 .fields()
1734 .clone()
1735 .into_iter()
1736 .cloned()
1737 .chain(Some(Arc::new(Field::new(
1738 "date".to_string(),
1739 DataType::Utf8,
1740 false,
1741 ))))
1742 .collect::<Vec<_>>(),
1743 ));
1744 let Some(sort_order) = LexOrdering::new(
1745 case.sort
1746 .into_iter()
1747 .map(|expr| {
1748 create_physical_sort_expr(
1749 &expr,
1750 &DFSchema::try_from(Arc::clone(&table_schema))?,
1751 &ExecutionProps::default(),
1752 )
1753 })
1754 .collect::<Result<Vec<_>>>()?,
1755 ) else {
1756 return internal_err!("This test should always use an ordering");
1757 };
1758
1759 let partitioned_files = FileGroup::new(
1760 case.files.into_iter().map(From::from).collect::<Vec<_>>(),
1761 );
1762 let result = FileScanConfig::split_groups_by_statistics(
1763 &table_schema,
1764 std::slice::from_ref(&partitioned_files),
1765 &sort_order,
1766 );
1767 let results_by_name = result
1768 .as_ref()
1769 .map(|file_groups| {
1770 file_groups
1771 .iter()
1772 .map(|file_group| {
1773 file_group
1774 .iter()
1775 .map(|file| {
1776 partitioned_files
1777 .iter()
1778 .find_map(|f| {
1779 if f.object_meta == file.object_meta {
1780 Some(
1781 f.object_meta
1782 .location
1783 .as_ref()
1784 .rsplit('/')
1785 .next()
1786 .unwrap()
1787 .trim_end_matches(".parquet"),
1788 )
1789 } else {
1790 None
1791 }
1792 })
1793 .unwrap()
1794 })
1795 .collect::<Vec<_>>()
1796 })
1797 .collect::<Vec<_>>()
1798 })
1799 .map_err(|e| e.strip_backtrace().leak() as &'static str);
1800
1801 assert_eq!(results_by_name, case.expected_result, "{}", case.name);
1802 }
1803
1804 return Ok(());
1805
1806 impl From<File> for PartitionedFile {
1807 fn from(file: File) -> Self {
1808 let object_meta = ObjectMeta {
1809 location: Path::from(format!(
1810 "data/date={}/{}.parquet",
1811 file.date, file.name
1812 )),
1813 last_modified: chrono::Utc.timestamp_nanos(0),
1814 size: 0,
1815 e_tag: None,
1816 version: None,
1817 };
1818 let statistics = Arc::new(Statistics {
1819 num_rows: Precision::Absent,
1820 total_byte_size: Precision::Absent,
1821 column_statistics: file
1822 .statistics
1823 .into_iter()
1824 .map(|stats| {
1825 stats
1826 .map(|(min, max)| ColumnStatistics {
1827 min_value: Precision::Exact(ScalarValue::Float64(
1828 min,
1829 )),
1830 max_value: Precision::Exact(ScalarValue::Float64(
1831 max,
1832 )),
1833 ..Default::default()
1834 })
1835 .unwrap_or_default()
1836 })
1837 .collect::<Vec<_>>(),
1838 });
1839 PartitionedFile::new_from_meta(object_meta)
1840 .with_partition_values(vec![ScalarValue::from(file.date)])
1841 .with_statistics(statistics)
1842 }
1843 }
1844 }
1845
1846 fn config_for_projection(
1848 file_schema: SchemaRef,
1849 projection: Option<Vec<usize>>,
1850 statistics: Statistics,
1851 table_partition_cols: Vec<Field>,
1852 ) -> FileScanConfig {
1853 let table_schema = TableSchema::new(
1854 file_schema,
1855 table_partition_cols.into_iter().map(Arc::new).collect(),
1856 );
1857 FileScanConfigBuilder::new(
1858 ObjectStoreUrl::parse("test:///").unwrap(),
1859 Arc::new(MockSource::new(table_schema.clone())),
1860 )
1861 .with_projection_indices(projection)
1862 .unwrap()
1863 .with_statistics(statistics)
1864 .build()
1865 }
1866
1867 #[test]
1868 fn test_file_scan_config_builder() {
1869 let file_schema = aggr_test_schema();
1870 let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
1871
1872 let table_schema = TableSchema::new(
1873 Arc::clone(&file_schema),
1874 vec![Arc::new(Field::new(
1875 "date",
1876 wrap_partition_type_in_dict(DataType::Utf8),
1877 false,
1878 ))],
1879 );
1880
1881 let file_source: Arc<dyn FileSource> =
1882 Arc::new(MockSource::new(table_schema.clone()));
1883
1884 let builder = FileScanConfigBuilder::new(
1886 object_store_url.clone(),
1887 Arc::clone(&file_source),
1888 );
1889
1890 let config = builder
1892 .with_limit(Some(1000))
1893 .with_projection_indices(Some(vec![0, 1]))
1894 .unwrap()
1895 .with_statistics(Statistics::new_unknown(&file_schema))
1896 .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new(
1897 "test.parquet".to_string(),
1898 1024,
1899 )])])
1900 .with_output_ordering(vec![
1901 [PhysicalSortExpr::new_default(Arc::new(Column::new(
1902 "date", 0,
1903 )))]
1904 .into(),
1905 ])
1906 .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
1907 .build();
1908
1909 assert_eq!(config.object_store_url, object_store_url);
1911 assert_eq!(*config.file_schema(), file_schema);
1912 assert_eq!(config.limit, Some(1000));
1913 assert_eq!(
1914 config
1915 .file_source
1916 .projection()
1917 .as_ref()
1918 .map(|p| p.column_indices()),
1919 Some(vec![0, 1])
1920 );
1921 assert_eq!(config.table_partition_cols().len(), 1);
1922 assert_eq!(config.table_partition_cols()[0].name(), "date");
1923 assert_eq!(config.file_groups.len(), 1);
1924 assert_eq!(config.file_groups[0].len(), 1);
1925 assert_eq!(
1926 config.file_groups[0][0].object_meta.location.as_ref(),
1927 "test.parquet"
1928 );
1929 assert_eq!(
1930 config.file_compression_type,
1931 FileCompressionType::UNCOMPRESSED
1932 );
1933 assert_eq!(config.output_ordering.len(), 1);
1934 }
1935
1936 #[test]
1937 fn equivalence_properties_after_schema_change() {
1938 let file_schema = aggr_test_schema();
1939 let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
1940
1941 let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
1942
1943 let file_source: Arc<dyn FileSource> = Arc::new(
1945 MockSource::new(table_schema.clone()).with_filter(Arc::new(BinaryExpr::new(
1946 col("c2", &file_schema).unwrap(),
1947 Operator::Eq,
1948 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1949 ))),
1950 );
1951
1952 let config = FileScanConfigBuilder::new(
1953 object_store_url.clone(),
1954 Arc::clone(&file_source),
1955 )
1956 .with_projection_indices(Some(vec![0, 1, 2]))
1957 .unwrap()
1958 .build();
1959
1960 let exprs = ProjectionExprs::new(vec![ProjectionExpr::new(
1963 col("c1", &file_schema).unwrap(),
1964 "c1",
1965 )]);
1966 let data_source = config
1967 .try_swapping_with_projection(&exprs)
1968 .unwrap()
1969 .unwrap();
1970
1971 let eq_properties = data_source.eq_properties();
1974 let eq_group = eq_properties.eq_group();
1975
1976 for class in eq_group.iter() {
1977 for expr in class.iter() {
1978 if let Some(col) = expr.downcast_ref::<Column>() {
1979 assert_ne!(
1980 col.name(),
1981 "c2",
1982 "c2 should not be present in any equivalence class"
1983 );
1984 }
1985 }
1986 }
1987 }
1988
1989 #[test]
1990 fn test_file_scan_config_builder_defaults() {
1991 let file_schema = aggr_test_schema();
1992 let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
1993
1994 let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
1995
1996 let file_source: Arc<dyn FileSource> =
1997 Arc::new(MockSource::new(table_schema.clone()));
1998
1999 let config = FileScanConfigBuilder::new(
2001 object_store_url.clone(),
2002 Arc::clone(&file_source),
2003 )
2004 .build();
2005
2006 assert_eq!(config.object_store_url, object_store_url);
2008 assert_eq!(*config.file_schema(), file_schema);
2009 assert_eq!(config.limit, None);
2010 let expected_projection: Vec<usize> = (0..file_schema.fields().len()).collect();
2013 assert_eq!(
2014 config
2015 .file_source
2016 .projection()
2017 .as_ref()
2018 .map(|p| p.column_indices()),
2019 Some(expected_projection)
2020 );
2021 assert!(config.table_partition_cols().is_empty());
2022 assert!(config.file_groups.is_empty());
2023 assert_eq!(
2024 config.file_compression_type,
2025 FileCompressionType::UNCOMPRESSED
2026 );
2027 assert!(config.output_ordering.is_empty());
2028 assert!(config.constraints.is_empty());
2029
2030 assert_eq!(config.statistics().num_rows, Precision::Absent);
2032 assert_eq!(config.statistics().total_byte_size, Precision::Absent);
2033 assert_eq!(
2034 config.statistics().column_statistics.len(),
2035 file_schema.fields().len()
2036 );
2037 for stat in config.statistics().column_statistics {
2038 assert_eq!(stat.distinct_count, Precision::Absent);
2039 assert_eq!(stat.min_value, Precision::Absent);
2040 assert_eq!(stat.max_value, Precision::Absent);
2041 assert_eq!(stat.null_count, Precision::Absent);
2042 }
2043 }
2044
2045 #[test]
2046 fn test_file_scan_config_builder_new_from() {
2047 let schema = aggr_test_schema();
2048 let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2049 let partition_cols = vec![Field::new(
2050 "date",
2051 wrap_partition_type_in_dict(DataType::Utf8),
2052 false,
2053 )];
2054 let file = PartitionedFile::new("test_file.parquet", 100);
2055
2056 let table_schema = TableSchema::new(
2057 Arc::clone(&schema),
2058 partition_cols.iter().map(|f| Arc::new(f.clone())).collect(),
2059 );
2060
2061 let file_source: Arc<dyn FileSource> =
2062 Arc::new(MockSource::new(table_schema.clone()));
2063
2064 let original_config = FileScanConfigBuilder::new(
2066 object_store_url.clone(),
2067 Arc::clone(&file_source),
2068 )
2069 .with_projection_indices(Some(vec![0, 2]))
2070 .unwrap()
2071 .with_limit(Some(10))
2072 .with_file(file.clone())
2073 .with_constraints(Constraints::default())
2074 .build();
2075
2076 let new_builder = FileScanConfigBuilder::from(original_config);
2078
2079 let new_config = new_builder.build();
2081
2082 let partition_cols = partition_cols.into_iter().map(Arc::new).collect::<Vec<_>>();
2084 assert_eq!(new_config.object_store_url, object_store_url);
2085 assert_eq!(*new_config.file_schema(), schema);
2086 assert_eq!(
2087 new_config
2088 .file_source
2089 .projection()
2090 .as_ref()
2091 .map(|p| p.column_indices()),
2092 Some(vec![0, 2])
2093 );
2094 assert_eq!(new_config.limit, Some(10));
2095 assert_eq!(*new_config.table_partition_cols(), partition_cols);
2096 assert_eq!(new_config.file_groups.len(), 1);
2097 assert_eq!(new_config.file_groups[0].len(), 1);
2098 assert_eq!(
2099 new_config.file_groups[0][0].object_meta.location.as_ref(),
2100 "test_file.parquet"
2101 );
2102 assert_eq!(new_config.constraints, Constraints::default());
2103 }
2104
2105 #[test]
2106 fn test_split_groups_by_statistics_with_target_partitions() -> Result<()> {
2107 use datafusion_common::DFSchema;
2108 use datafusion_expr::{col, execution_props::ExecutionProps};
2109
2110 let schema = Arc::new(Schema::new(vec![Field::new(
2111 "value",
2112 DataType::Float64,
2113 false,
2114 )]));
2115
2116 let exec_props = ExecutionProps::new();
2118 let df_schema = DFSchema::try_from_qualified_schema("test", schema.as_ref())?;
2119 let sort_expr = [col("value").sort(true, false)];
2120 let sort_ordering = sort_expr
2121 .map(|expr| {
2122 create_physical_sort_expr(&expr, &df_schema, &exec_props).unwrap()
2123 })
2124 .into();
2125
2126 struct TestCase {
2128 name: String,
2129 file_count: usize,
2130 overlap_factor: f64,
2131 target_partitions: usize,
2132 expected_partition_count: usize,
2133 }
2134
2135 let test_cases = vec![
2136 TestCase {
2138 name: "no_overlap_10_files_4_partitions".to_string(),
2139 file_count: 10,
2140 overlap_factor: 0.0,
2141 target_partitions: 4,
2142 expected_partition_count: 4,
2143 },
2144 TestCase {
2145 name: "medium_overlap_20_files_5_partitions".to_string(),
2146 file_count: 20,
2147 overlap_factor: 0.5,
2148 target_partitions: 5,
2149 expected_partition_count: 5,
2150 },
2151 TestCase {
2152 name: "high_overlap_30_files_3_partitions".to_string(),
2153 file_count: 30,
2154 overlap_factor: 0.8,
2155 target_partitions: 3,
2156 expected_partition_count: 7,
2157 },
2158 TestCase {
2160 name: "fewer_files_than_partitions".to_string(),
2161 file_count: 3,
2162 overlap_factor: 0.0,
2163 target_partitions: 10,
2164 expected_partition_count: 3, },
2166 TestCase {
2167 name: "single_file".to_string(),
2168 file_count: 1,
2169 overlap_factor: 0.0,
2170 target_partitions: 5,
2171 expected_partition_count: 1, },
2173 TestCase {
2174 name: "empty_files".to_string(),
2175 file_count: 0,
2176 overlap_factor: 0.0,
2177 target_partitions: 3,
2178 expected_partition_count: 0, },
2180 ];
2181
2182 for case in test_cases {
2183 println!("Running test case: {}", case.name);
2184
2185 let file_groups = generate_test_files(case.file_count, case.overlap_factor);
2187
2188 let result =
2190 FileScanConfig::split_groups_by_statistics_with_target_partitions(
2191 &schema,
2192 &file_groups,
2193 &sort_ordering,
2194 case.target_partitions,
2195 )?;
2196
2197 println!(
2199 "Created {} partitions (target was {})",
2200 result.len(),
2201 case.target_partitions
2202 );
2203
2204 assert_eq!(
2206 result.len(),
2207 case.expected_partition_count,
2208 "Case '{}': Unexpected partition count",
2209 case.name
2210 );
2211
2212 assert!(
2214 verify_sort_integrity(&result),
2215 "Case '{}': Files within partitions are not properly ordered",
2216 case.name
2217 );
2218
2219 if case.file_count > 1 && case.expected_partition_count > 1 {
2221 let group_sizes: Vec<usize> = result.iter().map(FileGroup::len).collect();
2222 let max_size = *group_sizes.iter().max().unwrap();
2223 let min_size = *group_sizes.iter().min().unwrap();
2224
2225 let avg_files_per_partition =
2227 case.file_count as f64 / case.expected_partition_count as f64;
2228 assert!(
2229 (max_size as f64) < 2.0 * avg_files_per_partition,
2230 "Case '{}': Unbalanced distribution. Max partition size {} exceeds twice the average {}",
2231 case.name,
2232 max_size,
2233 avg_files_per_partition
2234 );
2235
2236 println!("Distribution - min files: {min_size}, max files: {max_size}");
2237 }
2238 }
2239
2240 let empty_groups: Vec<FileGroup> = vec![];
2242 let err = FileScanConfig::split_groups_by_statistics_with_target_partitions(
2243 &schema,
2244 &empty_groups,
2245 &sort_ordering,
2246 0,
2247 )
2248 .unwrap_err();
2249
2250 assert!(
2251 err.to_string()
2252 .contains("target_partitions must be greater than 0"),
2253 "Expected error for zero target partitions"
2254 );
2255
2256 Ok(())
2257 }
2258
2259 #[test]
2260 fn test_partition_statistics_projection() {
2261 use crate::source::DataSourceExec;
2267 use datafusion_physical_plan::ExecutionPlan;
2268
2269 let schema = Arc::new(Schema::new(vec![
2271 Field::new("col0", DataType::Int32, false),
2272 Field::new("col1", DataType::Int32, false),
2273 Field::new("col2", DataType::Int32, false),
2274 Field::new("col3", DataType::Int32, false),
2275 ]));
2276
2277 let file_group_stats = Statistics {
2279 num_rows: Precision::Exact(100),
2280 total_byte_size: Precision::Exact(1024),
2281 column_statistics: vec![
2282 ColumnStatistics {
2283 null_count: Precision::Exact(0),
2284 ..ColumnStatistics::new_unknown()
2285 },
2286 ColumnStatistics {
2287 null_count: Precision::Exact(5),
2288 ..ColumnStatistics::new_unknown()
2289 },
2290 ColumnStatistics {
2291 null_count: Precision::Exact(10),
2292 ..ColumnStatistics::new_unknown()
2293 },
2294 ColumnStatistics {
2295 null_count: Precision::Exact(15),
2296 ..ColumnStatistics::new_unknown()
2297 },
2298 ],
2299 };
2300
2301 let file_group = FileGroup::new(vec![PartitionedFile::new("test.parquet", 1024)])
2303 .with_statistics(Arc::new(file_group_stats));
2304
2305 let table_schema = TableSchema::new(Arc::clone(&schema), vec![]);
2306
2307 let config = FileScanConfigBuilder::new(
2309 ObjectStoreUrl::parse("test:///").unwrap(),
2310 Arc::new(MockSource::new(table_schema.clone())),
2311 )
2312 .with_projection_indices(Some(vec![0, 2]))
2313 .unwrap() .with_file_groups(vec![file_group])
2315 .build();
2316
2317 let exec = DataSourceExec::from_data_source(config);
2319
2320 let partition_stats = exec.partition_statistics(Some(0)).unwrap();
2322
2323 assert_eq!(
2325 partition_stats.column_statistics.len(),
2326 2,
2327 "Expected 2 column statistics (projected), but got {}",
2328 partition_stats.column_statistics.len()
2329 );
2330
2331 assert_eq!(
2333 partition_stats.column_statistics[0].null_count,
2334 Precision::Exact(0),
2335 "First projected column should be col0 with 0 nulls"
2336 );
2337 assert_eq!(
2338 partition_stats.column_statistics[1].null_count,
2339 Precision::Exact(10),
2340 "Second projected column should be col2 with 10 nulls"
2341 );
2342
2343 assert_eq!(partition_stats.num_rows, Precision::Exact(100));
2345 assert_eq!(partition_stats.total_byte_size, Precision::Exact(800));
2346 }
2347
2348 #[tokio::test]
2355 async fn reset_state_recreates_shared_work_source() -> Result<()> {
2356 let schema = Arc::new(Schema::new(vec![Field::new(
2357 "value",
2358 DataType::Int32,
2359 false,
2360 )]));
2361 let file_source = Arc::new(
2362 MockSource::new(Arc::clone(&schema))
2363 .with_file_opener(Arc::new(ResetStateTestFileOpener { schema })),
2364 );
2365
2366 let config =
2367 FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
2368 .with_file_group(FileGroup::new(vec![
2369 PartitionedFile::new("file1.parquet", 100),
2370 PartitionedFile::new("file2.parquet", 100),
2371 ]))
2372 .build();
2373
2374 let exec: Arc<dyn ExecutionPlan> = DataSourceExec::from_data_source(config);
2375 let task_ctx = Arc::new(TaskContext::default());
2376
2377 let first_run = collect(Arc::clone(&exec), Arc::clone(&task_ctx)).await?;
2380 let reset_exec = exec.reset_state()?;
2381 let second_run = collect(reset_exec, task_ctx).await?;
2382
2383 let expected = [
2384 "+-------+",
2385 "| value |",
2386 "+-------+",
2387 "| 1 |",
2388 "| 2 |",
2389 "+-------+",
2390 ];
2391 assert_batches_eq!(expected, &first_run);
2392 assert_batches_eq!(expected, &second_run);
2393
2394 Ok(())
2395 }
2396
2397 #[derive(Debug)]
2400 struct ResetStateTestFileOpener {
2401 schema: SchemaRef,
2402 }
2403
2404 impl crate::file_stream::FileOpener for ResetStateTestFileOpener {
2405 fn open(
2406 &self,
2407 file: PartitionedFile,
2408 ) -> Result<crate::file_stream::FileOpenFuture> {
2409 let value = file
2410 .object_meta
2411 .location
2412 .as_ref()
2413 .trim_start_matches("file")
2414 .trim_end_matches(".parquet")
2415 .parse::<i32>()
2416 .expect("invalid test file name");
2417 let schema = Arc::clone(&self.schema);
2418 Ok(async move {
2419 let batch = RecordBatch::try_new(
2420 schema,
2421 vec![Arc::new(Int32Array::from(vec![value]))],
2422 )
2423 .expect("test batch should be valid");
2424 Ok(stream::iter(vec![Ok(batch)]).boxed())
2425 }
2426 .boxed())
2427 }
2428 }
2429
2430 #[test]
2431 fn test_output_partitioning_not_partitioned_by_file_group() {
2432 let file_schema = aggr_test_schema();
2433 let partition_col =
2434 Field::new("date", wrap_partition_type_in_dict(DataType::Utf8), false);
2435
2436 let config = config_for_projection(
2437 Arc::clone(&file_schema),
2438 None,
2439 Statistics::new_unknown(&file_schema),
2440 vec![partition_col],
2441 );
2442
2443 let partitioning = config.output_partitioning();
2445 assert!(matches!(partitioning, Partitioning::UnknownPartitioning(_)));
2446 }
2447
2448 #[test]
2449 fn test_output_partitioning_no_partition_columns() {
2450 let file_schema = aggr_test_schema();
2451 let mut config = config_for_projection(
2452 Arc::clone(&file_schema),
2453 None,
2454 Statistics::new_unknown(&file_schema),
2455 vec![], );
2457 config.partitioned_by_file_group = true;
2458
2459 let partitioning = config.output_partitioning();
2460 assert!(matches!(partitioning, Partitioning::UnknownPartitioning(_)));
2461 }
2462
2463 #[test]
2464 fn test_output_partitioning_with_partition_columns() {
2465 let file_schema = aggr_test_schema();
2466
2467 let single_partition_col = vec![Field::new(
2469 "date",
2470 wrap_partition_type_in_dict(DataType::Utf8),
2471 false,
2472 )];
2473
2474 let mut config = config_for_projection(
2475 Arc::clone(&file_schema),
2476 None,
2477 Statistics::new_unknown(&file_schema),
2478 single_partition_col,
2479 );
2480 config.partitioned_by_file_group = true;
2481 config.file_groups = vec![
2482 FileGroup::new(vec![PartitionedFile::new("f1.parquet".to_string(), 1024)]),
2483 FileGroup::new(vec![PartitionedFile::new("f2.parquet".to_string(), 1024)]),
2484 FileGroup::new(vec![PartitionedFile::new("f3.parquet".to_string(), 1024)]),
2485 ];
2486
2487 let partitioning = config.output_partitioning();
2488 match partitioning {
2489 Partitioning::Hash(exprs, num_partitions) => {
2490 assert_eq!(num_partitions, 3);
2491 assert_eq!(exprs.len(), 1);
2492 assert_eq!(exprs[0].downcast_ref::<Column>().unwrap().name(), "date");
2493 }
2494 _ => panic!("Expected Hash partitioning"),
2495 }
2496
2497 let multiple_partition_cols = vec![
2499 Field::new("year", wrap_partition_type_in_dict(DataType::Utf8), false),
2500 Field::new("month", wrap_partition_type_in_dict(DataType::Utf8), false),
2501 ];
2502
2503 config = config_for_projection(
2504 Arc::clone(&file_schema),
2505 None,
2506 Statistics::new_unknown(&file_schema),
2507 multiple_partition_cols,
2508 );
2509 config.partitioned_by_file_group = true;
2510 config.file_groups = vec![
2511 FileGroup::new(vec![PartitionedFile::new("f1.parquet".to_string(), 1024)]),
2512 FileGroup::new(vec![PartitionedFile::new("f2.parquet".to_string(), 1024)]),
2513 ];
2514
2515 let partitioning = config.output_partitioning();
2516 match partitioning {
2517 Partitioning::Hash(exprs, num_partitions) => {
2518 assert_eq!(num_partitions, 2);
2519 assert_eq!(exprs.len(), 2);
2520 let col_names: Vec<_> = exprs
2521 .iter()
2522 .map(|e| e.downcast_ref::<Column>().unwrap().name())
2523 .collect();
2524 assert_eq!(col_names, vec!["year", "month"]);
2525 }
2526 _ => panic!("Expected Hash partitioning"),
2527 }
2528 }
2529
2530 #[test]
2531 fn try_pushdown_sort_reverses_file_groups_only_when_requested_is_reverse()
2532 -> Result<()> {
2533 let file_schema =
2534 Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)]));
2535
2536 let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
2537 let file_source = Arc::new(InexactSortPushdownSource::new(table_schema));
2538
2539 let file_groups = vec![FileGroup::new(vec![
2540 PartitionedFile::new("file1", 1),
2541 PartitionedFile::new("file2", 1),
2542 ])];
2543
2544 let sort_expr_asc = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
2545 let config =
2546 FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
2547 .with_file_groups(file_groups)
2548 .with_output_ordering(vec![
2549 LexOrdering::new(vec![sort_expr_asc.clone()]).unwrap(),
2550 ])
2551 .build();
2552
2553 let requested_asc = vec![sort_expr_asc.clone()];
2554 let result = config.try_pushdown_sort(&requested_asc)?;
2555 let SortOrderPushdownResult::Inexact { inner } = result else {
2556 panic!("Expected Inexact result");
2557 };
2558 let pushed_config = inner
2559 .downcast_ref::<FileScanConfig>()
2560 .expect("Expected FileScanConfig");
2561 let pushed_files = pushed_config.file_groups[0].files();
2562 assert_eq!(pushed_files[0].object_meta.location.as_ref(), "file1");
2563 assert_eq!(pushed_files[1].object_meta.location.as_ref(), "file2");
2564
2565 let requested_desc = vec![sort_expr_asc.reverse()];
2566 let result = config.try_pushdown_sort(&requested_desc)?;
2567 let SortOrderPushdownResult::Inexact { inner } = result else {
2568 panic!("Expected Inexact result");
2569 };
2570 let pushed_config = inner
2571 .downcast_ref::<FileScanConfig>()
2572 .expect("Expected FileScanConfig");
2573 let pushed_files = pushed_config.file_groups[0].files();
2574 assert_eq!(pushed_files[0].object_meta.location.as_ref(), "file2");
2575 assert_eq!(pushed_files[1].object_meta.location.as_ref(), "file1");
2576
2577 Ok(())
2578 }
2579
2580 fn make_file_with_stats(name: &str, min: f64, max: f64) -> PartitionedFile {
2581 PartitionedFile::new(name.to_string(), 1024).with_statistics(Arc::new(
2582 Statistics {
2583 num_rows: Precision::Exact(100),
2584 total_byte_size: Precision::Exact(1024),
2585 column_statistics: vec![ColumnStatistics {
2586 null_count: Precision::Exact(0),
2587 min_value: Precision::Exact(ScalarValue::Float64(Some(min))),
2588 max_value: Precision::Exact(ScalarValue::Float64(Some(max))),
2589 ..Default::default()
2590 }],
2591 },
2592 ))
2593 }
2594
2595 #[derive(Clone)]
2596 struct ExactSortPushdownSource {
2597 metrics: ExecutionPlanMetricsSet,
2598 table_schema: TableSchema,
2599 }
2600
2601 impl ExactSortPushdownSource {
2602 fn new(table_schema: TableSchema) -> Self {
2603 Self {
2604 metrics: ExecutionPlanMetricsSet::new(),
2605 table_schema,
2606 }
2607 }
2608 }
2609
2610 impl FileSource for ExactSortPushdownSource {
2611 fn create_file_opener(
2612 &self,
2613 _object_store: Arc<dyn ObjectStore>,
2614 _base_config: &FileScanConfig,
2615 _partition: usize,
2616 ) -> Result<Arc<dyn crate::file_stream::FileOpener>> {
2617 unimplemented!()
2618 }
2619
2620 fn table_schema(&self) -> &TableSchema {
2621 &self.table_schema
2622 }
2623
2624 fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
2625 Arc::new(self.clone())
2626 }
2627
2628 fn metrics(&self) -> &ExecutionPlanMetricsSet {
2629 &self.metrics
2630 }
2631
2632 fn file_type(&self) -> &str {
2633 "mock_exact"
2634 }
2635
2636 fn try_pushdown_sort(
2637 &self,
2638 _order: &[PhysicalSortExpr],
2639 _eq_properties: &EquivalenceProperties,
2640 ) -> Result<SortOrderPushdownResult<Arc<dyn FileSource>>> {
2641 Ok(SortOrderPushdownResult::Exact {
2642 inner: Arc::new(self.clone()) as Arc<dyn FileSource>,
2643 })
2644 }
2645 }
2646
2647 #[test]
2648 fn sort_pushdown_unsupported_source_files_get_sorted() -> Result<()> {
2649 let file_schema =
2650 Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)]));
2651 let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
2652 let file_source = Arc::new(MockSource::new(table_schema));
2653
2654 let file_groups = vec![FileGroup::new(vec![
2655 make_file_with_stats("file3", 20.0, 30.0),
2656 make_file_with_stats("file1", 0.0, 9.0),
2657 make_file_with_stats("file2", 10.0, 19.0),
2658 ])];
2659
2660 let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
2661 let config =
2662 FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
2663 .with_file_groups(file_groups)
2664 .build();
2665
2666 let result = config.try_pushdown_sort(&[sort_expr])?;
2667 let SortOrderPushdownResult::Inexact { inner } = result else {
2668 panic!("Expected Inexact result, got {result:?}");
2669 };
2670 let pushed_config = inner
2671 .downcast_ref::<FileScanConfig>()
2672 .expect("Expected FileScanConfig");
2673 let files = pushed_config.file_groups[0].files();
2674 assert_eq!(files[0].object_meta.location.as_ref(), "file1");
2675 assert_eq!(files[1].object_meta.location.as_ref(), "file2");
2676 assert_eq!(files[2].object_meta.location.as_ref(), "file3");
2677 assert!(pushed_config.output_ordering.is_empty());
2678 Ok(())
2679 }
2680
2681 #[test]
2682 fn sort_pushdown_unsupported_source_already_sorted() -> Result<()> {
2683 let file_schema =
2684 Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)]));
2685 let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
2686 let file_source = Arc::new(MockSource::new(table_schema));
2687
2688 let file_groups = vec![FileGroup::new(vec![
2689 make_file_with_stats("file1", 0.0, 9.0),
2690 make_file_with_stats("file2", 10.0, 19.0),
2691 make_file_with_stats("file3", 20.0, 30.0),
2692 ])];
2693
2694 let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
2695 let config =
2696 FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
2697 .with_file_groups(file_groups)
2698 .build();
2699
2700 let result = config.try_pushdown_sort(&[sort_expr])?;
2701 assert!(matches!(result, SortOrderPushdownResult::Unsupported));
2702 Ok(())
2703 }
2704
2705 #[test]
2706 fn sort_pushdown_unsupported_source_descending_sort() -> Result<()> {
2707 let file_schema =
2708 Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)]));
2709 let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
2710 let file_source = Arc::new(MockSource::new(table_schema));
2711
2712 let file_groups = vec![FileGroup::new(vec![
2713 make_file_with_stats("file1", 0.0, 9.0),
2714 make_file_with_stats("file3", 20.0, 30.0),
2715 make_file_with_stats("file2", 10.0, 19.0),
2716 ])];
2717
2718 let sort_expr = PhysicalSortExpr::new(
2719 Arc::new(Column::new("a", 0)),
2720 arrow::compute::SortOptions {
2721 descending: true,
2722 nulls_first: true,
2723 },
2724 );
2725 let config =
2726 FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
2727 .with_file_groups(file_groups)
2728 .build();
2729
2730 let result = config.try_pushdown_sort(&[sort_expr])?;
2731 let SortOrderPushdownResult::Inexact { inner } = result else {
2732 panic!("Expected Inexact result");
2733 };
2734 let pushed_config = inner
2735 .downcast_ref::<FileScanConfig>()
2736 .expect("Expected FileScanConfig");
2737 let files = pushed_config.file_groups[0].files();
2738 assert_eq!(files[0].object_meta.location.as_ref(), "file3");
2739 assert_eq!(files[1].object_meta.location.as_ref(), "file2");
2740 assert_eq!(files[2].object_meta.location.as_ref(), "file1");
2741 Ok(())
2742 }
2743
2744 #[test]
2745 fn sort_pushdown_exact_source_non_overlapping_returns_exact() -> Result<()> {
2746 let file_schema =
2747 Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)]));
2748 let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
2749 let file_source = Arc::new(ExactSortPushdownSource::new(table_schema));
2750
2751 let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
2752
2753 let file_groups = vec![FileGroup::new(vec![
2754 make_file_with_stats("file1", 0.0, 9.0),
2755 make_file_with_stats("file2", 10.0, 19.0),
2756 make_file_with_stats("file3", 20.0, 30.0),
2757 ])];
2758
2759 let config =
2760 FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
2761 .with_file_groups(file_groups)
2762 .with_output_ordering(vec![
2763 LexOrdering::new(vec![sort_expr.clone()]).unwrap(),
2764 ])
2765 .build();
2766
2767 let result = config.try_pushdown_sort(&[sort_expr])?;
2768 let SortOrderPushdownResult::Exact { inner } = result else {
2769 panic!("Expected Exact result, got {result:?}");
2770 };
2771 let pushed_config = inner
2772 .downcast_ref::<FileScanConfig>()
2773 .expect("Expected FileScanConfig");
2774 assert!(!pushed_config.output_ordering.is_empty());
2775 Ok(())
2776 }
2777
2778 #[test]
2779 fn sort_pushdown_exact_source_overlapping_downgraded_to_inexact() -> Result<()> {
2780 let file_schema =
2781 Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)]));
2782 let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
2783 let file_source = Arc::new(ExactSortPushdownSource::new(table_schema));
2784
2785 let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
2786
2787 let file_groups = vec![FileGroup::new(vec![
2788 make_file_with_stats("file1", 0.0, 15.0),
2789 make_file_with_stats("file2", 10.0, 25.0),
2790 make_file_with_stats("file3", 20.0, 30.0),
2791 ])];
2792
2793 let config =
2794 FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
2795 .with_file_groups(file_groups)
2796 .with_output_ordering(vec![
2797 LexOrdering::new(vec![sort_expr.clone()]).unwrap(),
2798 ])
2799 .build();
2800
2801 let result = config.try_pushdown_sort(&[sort_expr])?;
2802 let SortOrderPushdownResult::Inexact { inner } = result else {
2803 panic!("Expected Inexact (downgraded), got {result:?}");
2804 };
2805 let pushed_config = inner
2806 .downcast_ref::<FileScanConfig>()
2807 .expect("Expected FileScanConfig");
2808 assert!(pushed_config.output_ordering.is_empty());
2809 Ok(())
2810 }
2811
2812 #[test]
2813 fn sort_pushdown_exact_source_out_of_order_returns_exact() -> Result<()> {
2814 let file_schema =
2815 Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)]));
2816 let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
2817 let file_source = Arc::new(ExactSortPushdownSource::new(table_schema));
2818
2819 let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
2820
2821 let file_groups = vec![FileGroup::new(vec![
2822 make_file_with_stats("file3", 20.0, 30.0),
2823 make_file_with_stats("file1", 0.0, 9.0),
2824 make_file_with_stats("file2", 10.0, 19.0),
2825 ])];
2826
2827 let config =
2828 FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
2829 .with_file_groups(file_groups)
2830 .with_output_ordering(vec![
2831 LexOrdering::new(vec![sort_expr.clone()]).unwrap(),
2832 ])
2833 .build();
2834
2835 let result = config.try_pushdown_sort(&[sort_expr])?;
2836 let SortOrderPushdownResult::Exact { inner } = result else {
2837 panic!("Expected Exact result, got {result:?}");
2838 };
2839 let pushed_config = inner
2840 .downcast_ref::<FileScanConfig>()
2841 .expect("Expected FileScanConfig");
2842 let files = pushed_config.file_groups[0].files();
2843 assert_eq!(files[0].object_meta.location.as_ref(), "file1");
2844 assert_eq!(files[1].object_meta.location.as_ref(), "file2");
2845 assert_eq!(files[2].object_meta.location.as_ref(), "file3");
2846 assert!(!pushed_config.output_ordering.is_empty());
2847 Ok(())
2848 }
2849
2850 #[test]
2851 fn sort_pushdown_unsupported_source_single_file_groups() -> Result<()> {
2852 let file_schema =
2853 Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)]));
2854 let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
2855 let file_source = Arc::new(MockSource::new(table_schema));
2856
2857 let file_groups = vec![
2858 FileGroup::new(vec![make_file_with_stats("file1", 0.0, 9.0)]),
2859 FileGroup::new(vec![make_file_with_stats("file2", 10.0, 19.0)]),
2860 ];
2861
2862 let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
2863 let config =
2864 FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
2865 .with_file_groups(file_groups)
2866 .build();
2867
2868 let result = config.try_pushdown_sort(&[sort_expr])?;
2869 assert!(
2870 matches!(result, SortOrderPushdownResult::Unsupported),
2871 "Expected Unsupported for single-file groups"
2872 );
2873 Ok(())
2874 }
2875
2876 #[test]
2877 fn sort_pushdown_unsupported_source_multiple_groups() -> Result<()> {
2878 let file_schema =
2879 Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)]));
2880 let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
2881 let file_source = Arc::new(MockSource::new(table_schema));
2882
2883 let file_groups = vec![
2884 FileGroup::new(vec![
2885 make_file_with_stats("file_b", 10.0, 19.0),
2886 make_file_with_stats("file_a", 0.0, 9.0),
2887 ]),
2888 FileGroup::new(vec![
2889 make_file_with_stats("file_d", 30.0, 39.0),
2890 make_file_with_stats("file_c", 20.0, 29.0),
2891 ]),
2892 ];
2893
2894 let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
2895 let config =
2896 FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
2897 .with_file_groups(file_groups)
2898 .build();
2899
2900 let result = config.try_pushdown_sort(&[sort_expr])?;
2901 let SortOrderPushdownResult::Inexact { inner } = result else {
2902 panic!("Expected Inexact result");
2903 };
2904 let pushed_config = inner
2905 .downcast_ref::<FileScanConfig>()
2906 .expect("Expected FileScanConfig");
2907 let files0 = pushed_config.file_groups[0].files();
2908 assert_eq!(files0[0].object_meta.location.as_ref(), "file_a");
2909 assert_eq!(files0[1].object_meta.location.as_ref(), "file_b");
2910 let files1 = pushed_config.file_groups[1].files();
2911 assert_eq!(files1[0].object_meta.location.as_ref(), "file_c");
2912 assert_eq!(files1[1].object_meta.location.as_ref(), "file_d");
2913 Ok(())
2914 }
2915
2916 #[test]
2917 fn sort_pushdown_unsupported_source_partial_statistics() -> Result<()> {
2918 let file_schema =
2919 Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)]));
2920 let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
2921 let file_source = Arc::new(MockSource::new(table_schema));
2922
2923 let file_groups = vec![
2924 FileGroup::new(vec![
2925 make_file_with_stats("file_b", 10.0, 19.0),
2926 make_file_with_stats("file_a", 0.0, 9.0),
2927 ]),
2928 FileGroup::new(vec![
2929 PartitionedFile::new("file_d".to_string(), 1024),
2930 PartitionedFile::new("file_c".to_string(), 1024),
2931 ]),
2932 ];
2933
2934 let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
2935 let config =
2936 FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
2937 .with_file_groups(file_groups)
2938 .build();
2939
2940 let result = config.try_pushdown_sort(&[sort_expr])?;
2941 let SortOrderPushdownResult::Inexact { inner } = result else {
2942 panic!("Expected Inexact result");
2943 };
2944 let pushed_config = inner
2945 .downcast_ref::<FileScanConfig>()
2946 .expect("Expected FileScanConfig");
2947 let files0 = pushed_config.file_groups[0].files();
2948 assert_eq!(files0[0].object_meta.location.as_ref(), "file_a");
2949 assert_eq!(files0[1].object_meta.location.as_ref(), "file_b");
2950 let files1 = pushed_config.file_groups[1].files();
2951 assert_eq!(files1[0].object_meta.location.as_ref(), "file_d");
2952 assert_eq!(files1[1].object_meta.location.as_ref(), "file_c");
2953 Ok(())
2954 }
2955
2956 #[test]
2957 fn sort_pushdown_inexact_source_with_statistics_sorting() -> Result<()> {
2958 let file_schema =
2959 Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)]));
2960 let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
2961 let file_source = Arc::new(InexactSortPushdownSource::new(table_schema));
2962
2963 let file_groups = vec![FileGroup::new(vec![
2964 make_file_with_stats("file2", 10.0, 19.0),
2965 make_file_with_stats("file1", 0.0, 9.0),
2966 ])];
2967
2968 let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
2969 let config =
2970 FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
2971 .with_file_groups(file_groups)
2972 .build();
2973
2974 let result = config.try_pushdown_sort(&[sort_expr])?;
2975 let SortOrderPushdownResult::Inexact { inner } = result else {
2976 panic!("Expected Inexact result");
2977 };
2978 let pushed_config = inner
2979 .downcast_ref::<FileScanConfig>()
2980 .expect("Expected FileScanConfig");
2981 let files = pushed_config.file_groups[0].files();
2982 assert_eq!(files[0].object_meta.location.as_ref(), "file1");
2983 assert_eq!(files[1].object_meta.location.as_ref(), "file2");
2984 assert!(pushed_config.output_ordering.is_empty());
2985 Ok(())
2986 }
2987
2988 #[test]
2989 fn sort_pushdown_exact_multi_group_preserves_parallelism() -> Result<()> {
2990 let file_schema =
2996 Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)]));
2997 let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
2998 let file_source = Arc::new(ExactSortPushdownSource::new(table_schema));
2999
3000 let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
3001
3002 let file_groups = vec![
3006 FileGroup::new(vec![
3007 make_file_with_stats("file_01", 0.0, 9.0),
3008 make_file_with_stats("file_03", 20.0, 29.0),
3009 ]),
3010 FileGroup::new(vec![
3011 make_file_with_stats("file_02", 10.0, 19.0),
3012 make_file_with_stats("file_04", 30.0, 39.0),
3013 ]),
3014 ];
3015
3016 let config =
3017 FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
3018 .with_file_groups(file_groups)
3019 .with_output_ordering(vec![
3020 LexOrdering::new(vec![sort_expr.clone()]).unwrap(),
3021 ])
3022 .build();
3023
3024 let result = config.try_pushdown_sort(&[sort_expr])?;
3025 let SortOrderPushdownResult::Exact { inner } = result else {
3026 panic!("Expected Exact result, got {result:?}");
3027 };
3028 let pushed_config = inner
3029 .downcast_ref::<FileScanConfig>()
3030 .expect("Expected FileScanConfig");
3031
3032 assert_eq!(pushed_config.file_groups.len(), 2);
3034
3035 let files0 = pushed_config.file_groups[0].files();
3038 assert_eq!(files0[0].object_meta.location.as_ref(), "file_01");
3039 assert_eq!(files0[1].object_meta.location.as_ref(), "file_03");
3040 let files1 = pushed_config.file_groups[1].files();
3041 assert_eq!(files1[0].object_meta.location.as_ref(), "file_02");
3042 assert_eq!(files1[1].object_meta.location.as_ref(), "file_04");
3043
3044 assert!(!pushed_config.output_ordering.is_empty());
3046 Ok(())
3047 }
3048
3049 #[test]
3050 fn sort_pushdown_reverse_preserves_file_order_with_stats() -> Result<()> {
3051 let file_schema =
3054 Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)]));
3055 let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
3056 let file_source = Arc::new(InexactSortPushdownSource::new(table_schema));
3057
3058 let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
3059
3060 let file_groups = vec![FileGroup::new(vec![
3062 make_file_with_stats("file1", 0.0, 9.0),
3063 make_file_with_stats("file2", 10.0, 19.0),
3064 make_file_with_stats("file3", 20.0, 30.0),
3065 ])];
3066
3067 let config =
3068 FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
3069 .with_file_groups(file_groups)
3070 .with_output_ordering(vec![
3071 LexOrdering::new(vec![sort_expr.clone()]).unwrap(),
3072 ])
3073 .build();
3074
3075 let result = config.try_pushdown_sort(&[sort_expr.reverse()])?;
3077 let SortOrderPushdownResult::Inexact { inner } = result else {
3078 panic!("Expected Inexact for reverse scan, got {result:?}");
3079 };
3080 let pushed_config = inner
3081 .downcast_ref::<FileScanConfig>()
3082 .expect("Expected FileScanConfig");
3083
3084 let files = pushed_config.file_groups[0].files();
3086 assert_eq!(files[0].object_meta.location.as_ref(), "file3");
3087 assert_eq!(files[1].object_meta.location.as_ref(), "file2");
3088 assert_eq!(files[2].object_meta.location.as_ref(), "file1");
3089
3090 assert!(pushed_config.output_ordering.is_empty());
3092 Ok(())
3093 }
3094
3095 fn make_file_with_null_stats(
3097 name: &str,
3098 min: f64,
3099 max: f64,
3100 null_count: usize,
3101 ) -> PartitionedFile {
3102 PartitionedFile::new(name.to_string(), 1024).with_statistics(Arc::new(
3103 Statistics {
3104 num_rows: Precision::Exact(100),
3105 total_byte_size: Precision::Exact(1024),
3106 column_statistics: vec![ColumnStatistics {
3107 null_count: Precision::Exact(null_count),
3108 min_value: Precision::Exact(ScalarValue::Float64(Some(min))),
3109 max_value: Precision::Exact(ScalarValue::Float64(Some(max))),
3110 ..Default::default()
3111 }],
3112 },
3113 ))
3114 }
3115
3116 #[test]
3117 fn sort_pushdown_unsupported_with_nulls_does_not_upgrade_to_exact() -> Result<()> {
3118 let file_schema =
3121 Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, true)]));
3122 let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
3123 let file_source = Arc::new(MockSource::new(table_schema));
3124
3125 let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
3126
3127 let file_groups = vec![FileGroup::new(vec![
3129 make_file_with_null_stats("b_no_nulls", 10.0, 19.0, 0),
3130 make_file_with_null_stats("a_with_nulls", 0.0, 9.0, 5), ])];
3132
3133 let config =
3134 FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
3135 .with_file_groups(file_groups)
3136 .with_output_ordering(vec![
3137 LexOrdering::new(vec![sort_expr.clone()]).unwrap(),
3138 ])
3139 .build();
3140
3141 let result = config.try_pushdown_sort(&[sort_expr])?;
3142 assert!(
3144 matches!(result, SortOrderPushdownResult::Inexact { .. }),
3145 "Expected Inexact due to NULLs, got {result:?}"
3146 );
3147 Ok(())
3148 }
3149
3150 #[test]
3151 fn sort_pushdown_unsupported_no_nulls_upgrades_to_exact() -> Result<()> {
3152 let file_schema =
3154 Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, true)]));
3155 let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
3156 let file_source = Arc::new(MockSource::new(table_schema));
3157
3158 let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
3159
3160 let file_groups = vec![FileGroup::new(vec![
3161 make_file_with_null_stats("b_high", 10.0, 19.0, 0),
3162 make_file_with_null_stats("a_low", 0.0, 9.0, 0),
3163 ])];
3164
3165 let config =
3166 FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
3167 .with_file_groups(file_groups)
3168 .with_output_ordering(vec![
3169 LexOrdering::new(vec![sort_expr.clone()]).unwrap(),
3170 ])
3171 .build();
3172
3173 let result = config.try_pushdown_sort(&[sort_expr])?;
3174 assert!(
3175 matches!(result, SortOrderPushdownResult::Exact { .. }),
3176 "Expected Exact (no NULLs), got {result:?}"
3177 );
3178 Ok(())
3179 }
3180}