1use std::{
22 any::Any, borrow::Cow, collections::HashMap, fmt::Debug, fmt::Formatter,
23 fmt::Result as FmtResult, marker::PhantomData, sync::Arc,
24};
25
26use arrow::{
27 array::{
28 ArrayData, ArrayRef, BufferBuilder, DictionaryArray, RecordBatch,
29 RecordBatchOptions,
30 },
31 buffer::Buffer,
32 datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef, UInt16Type},
33};
34use datafusion_common::{exec_err, ColumnStatistics, Constraints, Result, Statistics};
35use datafusion_common::{DataFusionError, ScalarValue};
36use datafusion_execution::{
37 object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext,
38};
39use datafusion_physical_expr::{
40 expressions::Column, EquivalenceProperties, LexOrdering, Partitioning,
41 PhysicalSortExpr,
42};
43use datafusion_physical_plan::{
44 display::{display_orderings, ProjectSchemaDisplay},
45 metrics::ExecutionPlanMetricsSet,
46 projection::{all_alias_free_columns, new_projections_for_columns, ProjectionExec},
47 DisplayAs, DisplayFormatType, ExecutionPlan,
48};
49use log::{debug, warn};
50
51use crate::file_groups::FileGroup;
52use crate::{
53 display::FileGroupsDisplay,
54 file::FileSource,
55 file_compression_type::FileCompressionType,
56 file_stream::FileStream,
57 source::{DataSource, DataSourceExec},
58 statistics::MinMaxStatistics,
59 PartitionedFile,
60};
61
62#[derive(Clone)]
125pub struct FileScanConfig {
126 pub object_store_url: ObjectStoreUrl,
138 pub file_schema: SchemaRef,
145 pub file_groups: Vec<FileGroup>,
155 pub constraints: Constraints,
157 pub projection: Option<Vec<usize>>,
160 pub limit: Option<usize>,
163 pub table_partition_cols: Vec<Field>,
165 pub output_ordering: Vec<LexOrdering>,
167 pub file_compression_type: FileCompressionType,
169 pub new_lines_in_values: bool,
171 pub file_source: Arc<dyn FileSource>,
173 pub batch_size: Option<usize>,
176}
177
178#[derive(Clone)]
228pub struct FileScanConfigBuilder {
229 object_store_url: ObjectStoreUrl,
230 file_schema: SchemaRef,
235 file_source: Arc<dyn FileSource>,
236
237 limit: Option<usize>,
238 projection: Option<Vec<usize>>,
239 table_partition_cols: Vec<Field>,
240 constraints: Option<Constraints>,
241 file_groups: Vec<FileGroup>,
242 statistics: Option<Statistics>,
243 output_ordering: Vec<LexOrdering>,
244 file_compression_type: Option<FileCompressionType>,
245 new_lines_in_values: Option<bool>,
246 batch_size: Option<usize>,
247}
248
249impl FileScanConfigBuilder {
250 pub fn new(
257 object_store_url: ObjectStoreUrl,
258 file_schema: SchemaRef,
259 file_source: Arc<dyn FileSource>,
260 ) -> Self {
261 Self {
262 object_store_url,
263 file_schema,
264 file_source,
265 file_groups: vec![],
266 statistics: None,
267 output_ordering: vec![],
268 file_compression_type: None,
269 new_lines_in_values: None,
270 limit: None,
271 projection: None,
272 table_partition_cols: vec![],
273 constraints: None,
274 batch_size: None,
275 }
276 }
277
278 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
281 self.limit = limit;
282 self
283 }
284
285 pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
290 self.file_source = file_source;
291 self
292 }
293
294 pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
297 self.projection = projection;
298 self
299 }
300
301 pub fn with_table_partition_cols(mut self, table_partition_cols: Vec<Field>) -> Self {
303 self.table_partition_cols = table_partition_cols;
304 self
305 }
306
307 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
309 self.constraints = Some(constraints);
310 self
311 }
312
313 pub fn with_statistics(mut self, statistics: Statistics) -> Self {
316 self.statistics = Some(statistics);
317 self
318 }
319
320 pub fn with_file_groups(mut self, file_groups: Vec<FileGroup>) -> Self {
330 self.file_groups = file_groups;
331 self
332 }
333
334 pub fn with_file_group(mut self, file_group: FileGroup) -> Self {
338 self.file_groups.push(file_group);
339 self
340 }
341
342 pub fn with_file(self, file: PartitionedFile) -> Self {
346 self.with_file_group(FileGroup::new(vec![file]))
347 }
348
349 pub fn with_output_ordering(mut self, output_ordering: Vec<LexOrdering>) -> Self {
351 self.output_ordering = output_ordering;
352 self
353 }
354
355 pub fn with_file_compression_type(
357 mut self,
358 file_compression_type: FileCompressionType,
359 ) -> Self {
360 self.file_compression_type = Some(file_compression_type);
361 self
362 }
363
364 pub fn with_newlines_in_values(mut self, new_lines_in_values: bool) -> Self {
370 self.new_lines_in_values = Some(new_lines_in_values);
371 self
372 }
373
374 pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
376 self.batch_size = batch_size;
377 self
378 }
379
380 pub fn build(self) -> FileScanConfig {
385 let Self {
386 object_store_url,
387 file_schema,
388 file_source,
389 limit,
390 projection,
391 table_partition_cols,
392 constraints,
393 file_groups,
394 statistics,
395 output_ordering,
396 file_compression_type,
397 new_lines_in_values,
398 batch_size,
399 } = self;
400
401 let constraints = constraints.unwrap_or_default();
402 let statistics =
403 statistics.unwrap_or_else(|| Statistics::new_unknown(&file_schema));
404
405 let file_source = file_source.with_statistics(statistics.clone());
406 let file_compression_type =
407 file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED);
408 let new_lines_in_values = new_lines_in_values.unwrap_or(false);
409
410 FileScanConfig {
411 object_store_url,
412 file_schema,
413 file_source,
414 limit,
415 projection,
416 table_partition_cols,
417 constraints,
418 file_groups,
419 output_ordering,
420 file_compression_type,
421 new_lines_in_values,
422 batch_size,
423 }
424 }
425}
426
427impl From<FileScanConfig> for FileScanConfigBuilder {
428 fn from(config: FileScanConfig) -> Self {
429 Self {
430 object_store_url: config.object_store_url,
431 file_schema: config.file_schema,
432 file_source: Arc::<dyn FileSource>::clone(&config.file_source),
433 file_groups: config.file_groups,
434 statistics: config.file_source.statistics().ok(),
435 output_ordering: config.output_ordering,
436 file_compression_type: Some(config.file_compression_type),
437 new_lines_in_values: Some(config.new_lines_in_values),
438 limit: config.limit,
439 projection: config.projection,
440 table_partition_cols: config.table_partition_cols,
441 constraints: Some(config.constraints),
442 batch_size: config.batch_size,
443 }
444 }
445}
446
447impl DataSource for FileScanConfig {
448 fn open(
449 &self,
450 partition: usize,
451 context: Arc<TaskContext>,
452 ) -> Result<SendableRecordBatchStream> {
453 let object_store = context.runtime_env().object_store(&self.object_store_url)?;
454 let batch_size = self
455 .batch_size
456 .unwrap_or_else(|| context.session_config().batch_size());
457
458 let source = self
459 .file_source
460 .with_batch_size(batch_size)
461 .with_schema(Arc::clone(&self.file_schema))
462 .with_projection(self);
463
464 let opener = source.create_file_opener(object_store, self, partition);
465
466 let stream = FileStream::new(self, partition, opener, source.metrics())?;
467 Ok(Box::pin(stream))
468 }
469
470 fn as_any(&self) -> &dyn Any {
471 self
472 }
473
474 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
475 match t {
476 DisplayFormatType::Default | DisplayFormatType::Verbose => {
477 let (schema, _, _, orderings) = self.project();
478
479 write!(f, "file_groups=")?;
480 FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
481
482 if !schema.fields().is_empty() {
483 write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
484 }
485
486 if let Some(limit) = self.limit {
487 write!(f, ", limit={limit}")?;
488 }
489
490 display_orderings(f, &orderings)?;
491
492 if !self.constraints.is_empty() {
493 write!(f, ", {}", self.constraints)?;
494 }
495
496 self.fmt_file_source(t, f)
497 }
498 DisplayFormatType::TreeRender => {
499 writeln!(f, "format={}", self.file_source.file_type())?;
500 self.file_source.fmt_extra(t, f)?;
501 let num_files = self.file_groups.iter().map(|fg| fg.len()).sum::<usize>();
502 writeln!(f, "files={num_files}")?;
503 Ok(())
504 }
505 }
506 }
507
508 fn repartitioned(
510 &self,
511 target_partitions: usize,
512 repartition_file_min_size: usize,
513 output_ordering: Option<LexOrdering>,
514 ) -> Result<Option<Arc<dyn DataSource>>> {
515 let source = self.file_source.repartitioned(
516 target_partitions,
517 repartition_file_min_size,
518 output_ordering,
519 self,
520 )?;
521
522 Ok(source.map(|s| Arc::new(s) as _))
523 }
524
525 fn output_partitioning(&self) -> Partitioning {
526 Partitioning::UnknownPartitioning(self.file_groups.len())
527 }
528
529 fn eq_properties(&self) -> EquivalenceProperties {
530 let (schema, constraints, _, orderings) = self.project();
531 EquivalenceProperties::new_with_orderings(schema, orderings.as_slice())
532 .with_constraints(constraints)
533 }
534
535 fn statistics(&self) -> Result<Statistics> {
536 Ok(self.projected_stats())
537 }
538
539 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
540 let source = FileScanConfigBuilder::from(self.clone())
541 .with_limit(limit)
542 .build();
543 Some(Arc::new(source))
544 }
545
546 fn fetch(&self) -> Option<usize> {
547 self.limit
548 }
549
550 fn metrics(&self) -> ExecutionPlanMetricsSet {
551 self.file_source.metrics().clone()
552 }
553
554 fn try_swapping_with_projection(
555 &self,
556 projection: &ProjectionExec,
557 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
558 let partitioned_columns_in_proj = projection.expr().iter().any(|(expr, _)| {
562 expr.as_any()
563 .downcast_ref::<Column>()
564 .map(|expr| expr.index() >= self.file_schema.fields().len())
565 .unwrap_or(false)
566 });
567
568 let no_aliases = all_alias_free_columns(projection.expr());
570
571 Ok((no_aliases && !partitioned_columns_in_proj).then(|| {
572 let file_scan = self.clone();
573 let source = Arc::clone(&file_scan.file_source);
574 let new_projections = new_projections_for_columns(
575 projection,
576 &file_scan
577 .projection
578 .clone()
579 .unwrap_or((0..self.file_schema.fields().len()).collect()),
580 );
581 DataSourceExec::from_data_source(
582 FileScanConfigBuilder::from(file_scan)
583 .with_projection(Some(new_projections))
585 .with_source(source)
586 .build(),
587 ) as _
588 }))
589 }
590}
591
592impl FileScanConfig {
593 #[allow(deprecated)] pub fn new(
605 object_store_url: ObjectStoreUrl,
606 file_schema: SchemaRef,
607 file_source: Arc<dyn FileSource>,
608 ) -> Self {
609 let statistics = Statistics::new_unknown(&file_schema);
610 let file_source = file_source.with_statistics(statistics.clone());
611 Self {
612 object_store_url,
613 file_schema,
614 file_groups: vec![],
615 constraints: Constraints::empty(),
616 projection: None,
617 limit: None,
618 table_partition_cols: vec![],
619 output_ordering: vec![],
620 file_compression_type: FileCompressionType::UNCOMPRESSED,
621 new_lines_in_values: false,
622 file_source: Arc::clone(&file_source),
623 batch_size: None,
624 }
625 }
626
627 #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
629 pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
630 self.file_source =
631 file_source.with_statistics(Statistics::new_unknown(&self.file_schema));
632 self
633 }
634
635 #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
637 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
638 self.constraints = constraints;
639 self
640 }
641
642 #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
644 pub fn with_statistics(mut self, statistics: Statistics) -> Self {
645 self.file_source = self.file_source.with_statistics(statistics);
646 self
647 }
648
649 fn projection_indices(&self) -> Vec<usize> {
650 match &self.projection {
651 Some(proj) => proj.clone(),
652 None => (0..self.file_schema.fields().len()
653 + self.table_partition_cols.len())
654 .collect(),
655 }
656 }
657
658 pub fn projected_stats(&self) -> Statistics {
659 let statistics = self.file_source.statistics().unwrap();
660
661 let table_cols_stats = self
662 .projection_indices()
663 .into_iter()
664 .map(|idx| {
665 if idx < self.file_schema.fields().len() {
666 statistics.column_statistics[idx].clone()
667 } else {
668 ColumnStatistics::new_unknown()
670 }
671 })
672 .collect();
673
674 Statistics {
675 num_rows: statistics.num_rows,
676 total_byte_size: statistics.total_byte_size,
678 column_statistics: table_cols_stats,
679 }
680 }
681
682 pub fn projected_schema(&self) -> Arc<Schema> {
683 let table_fields: Vec<_> = self
684 .projection_indices()
685 .into_iter()
686 .map(|idx| {
687 if idx < self.file_schema.fields().len() {
688 self.file_schema.field(idx).clone()
689 } else {
690 let partition_idx = idx - self.file_schema.fields().len();
691 self.table_partition_cols[partition_idx].clone()
692 }
693 })
694 .collect();
695
696 Arc::new(Schema::new_with_metadata(
697 table_fields,
698 self.file_schema.metadata().clone(),
699 ))
700 }
701
702 pub fn projected_constraints(&self) -> Constraints {
703 let indexes = self.projection_indices();
704
705 self.constraints
706 .project(&indexes)
707 .unwrap_or_else(Constraints::empty)
708 }
709
710 #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
712 pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
713 self.projection = projection;
714 self
715 }
716
717 #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
719 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
720 self.limit = limit;
721 self
722 }
723
724 #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
728 #[allow(deprecated)]
729 pub fn with_file(self, file: PartitionedFile) -> Self {
730 self.with_file_group(FileGroup::new(vec![file]))
731 }
732
733 #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
737 pub fn with_file_groups(mut self, mut file_groups: Vec<FileGroup>) -> Self {
738 self.file_groups.append(&mut file_groups);
739 self
740 }
741
742 #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
746 pub fn with_file_group(mut self, file_group: FileGroup) -> Self {
747 self.file_groups.push(file_group);
748 self
749 }
750
751 #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
753 pub fn with_table_partition_cols(mut self, table_partition_cols: Vec<Field>) -> Self {
754 self.table_partition_cols = table_partition_cols;
755 self
756 }
757
758 #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
760 pub fn with_output_ordering(mut self, output_ordering: Vec<LexOrdering>) -> Self {
761 self.output_ordering = output_ordering;
762 self
763 }
764
765 #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
767 pub fn with_file_compression_type(
768 mut self,
769 file_compression_type: FileCompressionType,
770 ) -> Self {
771 self.file_compression_type = file_compression_type;
772 self
773 }
774
775 #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
777 pub fn with_newlines_in_values(mut self, new_lines_in_values: bool) -> Self {
778 self.new_lines_in_values = new_lines_in_values;
779 self
780 }
781
782 #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
784 pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
785 self.batch_size = batch_size;
786 self
787 }
788
789 pub fn newlines_in_values(&self) -> bool {
797 self.new_lines_in_values
798 }
799
800 pub fn project(&self) -> (SchemaRef, Constraints, Statistics, Vec<LexOrdering>) {
802 if self.projection.is_none() && self.table_partition_cols.is_empty() {
803 return (
804 Arc::clone(&self.file_schema),
805 self.constraints.clone(),
806 self.file_source.statistics().unwrap().clone(),
807 self.output_ordering.clone(),
808 );
809 }
810
811 let schema = self.projected_schema();
812 let constraints = self.projected_constraints();
813 let stats = self.projected_stats();
814
815 let output_ordering = get_projected_output_ordering(self, &schema);
816
817 (schema, constraints, stats, output_ordering)
818 }
819
820 pub fn projected_file_column_names(&self) -> Option<Vec<String>> {
821 self.projection.as_ref().map(|p| {
822 p.iter()
823 .filter(|col_idx| **col_idx < self.file_schema.fields().len())
824 .map(|col_idx| self.file_schema.field(*col_idx).name())
825 .cloned()
826 .collect()
827 })
828 }
829
830 pub fn projected_file_schema(&self) -> SchemaRef {
832 let fields = self.file_column_projection_indices().map(|indices| {
833 indices
834 .iter()
835 .map(|col_idx| self.file_schema.field(*col_idx))
836 .cloned()
837 .collect::<Vec<_>>()
838 });
839
840 fields.map_or_else(
841 || Arc::clone(&self.file_schema),
842 |f| {
843 Arc::new(Schema::new_with_metadata(
844 f,
845 self.file_schema.metadata.clone(),
846 ))
847 },
848 )
849 }
850
851 pub fn file_column_projection_indices(&self) -> Option<Vec<usize>> {
852 self.projection.as_ref().map(|p| {
853 p.iter()
854 .filter(|col_idx| **col_idx < self.file_schema.fields().len())
855 .copied()
856 .collect()
857 })
858 }
859
860 pub fn split_groups_by_statistics_with_target_partitions(
882 table_schema: &SchemaRef,
883 file_groups: &[FileGroup],
884 sort_order: &LexOrdering,
885 target_partitions: usize,
886 ) -> Result<Vec<FileGroup>> {
887 if target_partitions == 0 {
888 return Err(DataFusionError::Internal(
889 "target_partitions must be greater than 0".to_string(),
890 ));
891 }
892
893 let flattened_files = file_groups
894 .iter()
895 .flat_map(FileGroup::iter)
896 .collect::<Vec<_>>();
897
898 if flattened_files.is_empty() {
899 return Ok(vec![]);
900 }
901
902 let statistics = MinMaxStatistics::new_from_files(
903 sort_order,
904 table_schema,
905 None,
906 flattened_files.iter().copied(),
907 )?;
908
909 let indices_sorted_by_min = statistics.min_values_sorted();
910
911 let mut file_groups_indices: Vec<Vec<usize>> = vec![vec![]; target_partitions];
913
914 for (idx, min) in indices_sorted_by_min {
915 if let Some((_, group)) = file_groups_indices
916 .iter_mut()
917 .enumerate()
918 .filter(|(_, group)| {
919 group.is_empty()
920 || min
921 > statistics
922 .max(*group.last().expect("groups should not be empty"))
923 })
924 .min_by_key(|(_, group)| group.len())
925 {
926 group.push(idx);
927 } else {
928 file_groups_indices.push(vec![idx]);
930 }
931 }
932
933 file_groups_indices.retain(|group| !group.is_empty());
935
936 Ok(file_groups_indices
938 .into_iter()
939 .map(|file_group_indices| {
940 FileGroup::new(
941 file_group_indices
942 .into_iter()
943 .map(|idx| flattened_files[idx].clone())
944 .collect(),
945 )
946 })
947 .collect())
948 }
949
950 pub fn split_groups_by_statistics(
954 table_schema: &SchemaRef,
955 file_groups: &[FileGroup],
956 sort_order: &LexOrdering,
957 ) -> Result<Vec<FileGroup>> {
958 let flattened_files = file_groups
959 .iter()
960 .flat_map(FileGroup::iter)
961 .collect::<Vec<_>>();
962 if flattened_files.is_empty() {
974 return Ok(vec![]);
975 }
976
977 let statistics = MinMaxStatistics::new_from_files(
978 sort_order,
979 table_schema,
980 None,
981 flattened_files.iter().copied(),
982 )
983 .map_err(|e| {
984 e.context("construct min/max statistics for split_groups_by_statistics")
985 })?;
986
987 let indices_sorted_by_min = statistics.min_values_sorted();
988 let mut file_groups_indices: Vec<Vec<usize>> = vec![];
989
990 for (idx, min) in indices_sorted_by_min {
991 let file_group_to_insert = file_groups_indices.iter_mut().find(|group| {
992 min > statistics.max(
995 *group
996 .last()
997 .expect("groups should be nonempty at construction"),
998 )
999 });
1000 match file_group_to_insert {
1001 Some(group) => group.push(idx),
1002 None => file_groups_indices.push(vec![idx]),
1003 }
1004 }
1005
1006 Ok(file_groups_indices
1008 .into_iter()
1009 .map(|file_group_indices| {
1010 file_group_indices
1011 .into_iter()
1012 .map(|idx| flattened_files[idx].clone())
1013 .collect()
1014 })
1015 .collect())
1016 }
1017
1018 #[deprecated(since = "47.0.0", note = "use DataSourceExec::new instead")]
1020 pub fn build(self) -> Arc<DataSourceExec> {
1021 DataSourceExec::from_data_source(self)
1022 }
1023
1024 fn fmt_file_source(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
1026 write!(f, ", file_type={}", self.file_source.file_type())?;
1027 self.file_source.fmt_extra(t, f)
1028 }
1029
1030 pub fn file_source(&self) -> &Arc<dyn FileSource> {
1032 &self.file_source
1033 }
1034}
1035
1036impl Debug for FileScanConfig {
1037 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1038 write!(f, "FileScanConfig {{")?;
1039 write!(f, "object_store_url={:?}, ", self.object_store_url)?;
1040
1041 write!(
1042 f,
1043 "statistics={:?}, ",
1044 self.file_source.statistics().unwrap()
1045 )?;
1046
1047 DisplayAs::fmt_as(self, DisplayFormatType::Verbose, f)?;
1048 write!(f, "}}")
1049 }
1050}
1051
1052impl DisplayAs for FileScanConfig {
1053 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
1054 let (schema, _, _, orderings) = self.project();
1055
1056 write!(f, "file_groups=")?;
1057 FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
1058
1059 if !schema.fields().is_empty() {
1060 write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
1061 }
1062
1063 if let Some(limit) = self.limit {
1064 write!(f, ", limit={limit}")?;
1065 }
1066
1067 display_orderings(f, &orderings)?;
1068
1069 if !self.constraints.is_empty() {
1070 write!(f, ", {}", self.constraints)?;
1071 }
1072
1073 Ok(())
1074 }
1075}
1076
1077pub struct PartitionColumnProjector {
1084 key_buffer_cache: ZeroBufferGenerators,
1088 projected_partition_indexes: Vec<(usize, usize)>,
1092 projected_schema: SchemaRef,
1094}
1095
1096impl PartitionColumnProjector {
1097 pub fn new(projected_schema: SchemaRef, table_partition_cols: &[String]) -> Self {
1101 let mut idx_map = HashMap::new();
1102 for (partition_idx, partition_name) in table_partition_cols.iter().enumerate() {
1103 if let Ok(schema_idx) = projected_schema.index_of(partition_name) {
1104 idx_map.insert(partition_idx, schema_idx);
1105 }
1106 }
1107
1108 let mut projected_partition_indexes: Vec<_> = idx_map.into_iter().collect();
1109 projected_partition_indexes.sort_by(|(_, a), (_, b)| a.cmp(b));
1110
1111 Self {
1112 projected_partition_indexes,
1113 key_buffer_cache: Default::default(),
1114 projected_schema,
1115 }
1116 }
1117
1118 pub fn project(
1123 &mut self,
1124 file_batch: RecordBatch,
1125 partition_values: &[ScalarValue],
1126 ) -> Result<RecordBatch> {
1127 let expected_cols =
1128 self.projected_schema.fields().len() - self.projected_partition_indexes.len();
1129
1130 if file_batch.columns().len() != expected_cols {
1131 return exec_err!(
1132 "Unexpected batch schema from file, expected {} cols but got {}",
1133 expected_cols,
1134 file_batch.columns().len()
1135 );
1136 }
1137
1138 let mut cols = file_batch.columns().to_vec();
1139 for &(pidx, sidx) in &self.projected_partition_indexes {
1140 let p_value =
1141 partition_values
1142 .get(pidx)
1143 .ok_or(DataFusionError::Execution(
1144 "Invalid partitioning found on disk".to_string(),
1145 ))?;
1146
1147 let mut partition_value = Cow::Borrowed(p_value);
1148
1149 let field = self.projected_schema.field(sidx);
1151 let expected_data_type = field.data_type();
1152 let actual_data_type = partition_value.data_type();
1153 if let DataType::Dictionary(key_type, _) = expected_data_type {
1154 if !matches!(actual_data_type, DataType::Dictionary(_, _)) {
1155 warn!("Partition value for column {} was not dictionary-encoded, applied auto-fix.", field.name());
1156 partition_value = Cow::Owned(ScalarValue::Dictionary(
1157 key_type.clone(),
1158 Box::new(partition_value.as_ref().clone()),
1159 ));
1160 }
1161 }
1162
1163 cols.insert(
1164 sidx,
1165 create_output_array(
1166 &mut self.key_buffer_cache,
1167 partition_value.as_ref(),
1168 file_batch.num_rows(),
1169 )?,
1170 )
1171 }
1172
1173 RecordBatch::try_new_with_options(
1174 Arc::clone(&self.projected_schema),
1175 cols,
1176 &RecordBatchOptions::new().with_row_count(Some(file_batch.num_rows())),
1177 )
1178 .map_err(Into::into)
1179 }
1180}
1181
1182#[derive(Debug, Default)]
1183struct ZeroBufferGenerators {
1184 gen_i8: ZeroBufferGenerator<i8>,
1185 gen_i16: ZeroBufferGenerator<i16>,
1186 gen_i32: ZeroBufferGenerator<i32>,
1187 gen_i64: ZeroBufferGenerator<i64>,
1188 gen_u8: ZeroBufferGenerator<u8>,
1189 gen_u16: ZeroBufferGenerator<u16>,
1190 gen_u32: ZeroBufferGenerator<u32>,
1191 gen_u64: ZeroBufferGenerator<u64>,
1192}
1193
1194#[derive(Debug, Default)]
1196struct ZeroBufferGenerator<T>
1197where
1198 T: ArrowNativeType,
1199{
1200 cache: Option<Buffer>,
1201 _t: PhantomData<T>,
1202}
1203
1204impl<T> ZeroBufferGenerator<T>
1205where
1206 T: ArrowNativeType,
1207{
1208 const SIZE: usize = size_of::<T>();
1209
1210 fn get_buffer(&mut self, n_vals: usize) -> Buffer {
1211 match &mut self.cache {
1212 Some(buf) if buf.len() >= n_vals * Self::SIZE => {
1213 buf.slice_with_length(0, n_vals * Self::SIZE)
1214 }
1215 _ => {
1216 let mut key_buffer_builder = BufferBuilder::<T>::new(n_vals);
1217 key_buffer_builder.advance(n_vals); self.cache.insert(key_buffer_builder.finish()).clone()
1219 }
1220 }
1221 }
1222}
1223
1224fn create_dict_array<T>(
1225 buffer_gen: &mut ZeroBufferGenerator<T>,
1226 dict_val: &ScalarValue,
1227 len: usize,
1228 data_type: DataType,
1229) -> Result<ArrayRef>
1230where
1231 T: ArrowNativeType,
1232{
1233 let dict_vals = dict_val.to_array()?;
1234
1235 let sliced_key_buffer = buffer_gen.get_buffer(len);
1236
1237 let mut builder = ArrayData::builder(data_type)
1239 .len(len)
1240 .add_buffer(sliced_key_buffer);
1241 builder = builder.add_child_data(dict_vals.to_data());
1242 Ok(Arc::new(DictionaryArray::<UInt16Type>::from(
1243 builder.build().unwrap(),
1244 )))
1245}
1246
1247fn create_output_array(
1248 key_buffer_cache: &mut ZeroBufferGenerators,
1249 val: &ScalarValue,
1250 len: usize,
1251) -> Result<ArrayRef> {
1252 if let ScalarValue::Dictionary(key_type, dict_val) = &val {
1253 match key_type.as_ref() {
1254 DataType::Int8 => {
1255 return create_dict_array(
1256 &mut key_buffer_cache.gen_i8,
1257 dict_val,
1258 len,
1259 val.data_type(),
1260 );
1261 }
1262 DataType::Int16 => {
1263 return create_dict_array(
1264 &mut key_buffer_cache.gen_i16,
1265 dict_val,
1266 len,
1267 val.data_type(),
1268 );
1269 }
1270 DataType::Int32 => {
1271 return create_dict_array(
1272 &mut key_buffer_cache.gen_i32,
1273 dict_val,
1274 len,
1275 val.data_type(),
1276 );
1277 }
1278 DataType::Int64 => {
1279 return create_dict_array(
1280 &mut key_buffer_cache.gen_i64,
1281 dict_val,
1282 len,
1283 val.data_type(),
1284 );
1285 }
1286 DataType::UInt8 => {
1287 return create_dict_array(
1288 &mut key_buffer_cache.gen_u8,
1289 dict_val,
1290 len,
1291 val.data_type(),
1292 );
1293 }
1294 DataType::UInt16 => {
1295 return create_dict_array(
1296 &mut key_buffer_cache.gen_u16,
1297 dict_val,
1298 len,
1299 val.data_type(),
1300 );
1301 }
1302 DataType::UInt32 => {
1303 return create_dict_array(
1304 &mut key_buffer_cache.gen_u32,
1305 dict_val,
1306 len,
1307 val.data_type(),
1308 );
1309 }
1310 DataType::UInt64 => {
1311 return create_dict_array(
1312 &mut key_buffer_cache.gen_u64,
1313 dict_val,
1314 len,
1315 val.data_type(),
1316 );
1317 }
1318 _ => {}
1319 }
1320 }
1321
1322 val.to_array_of_size(len)
1323}
1324
1325fn get_projected_output_ordering(
1385 base_config: &FileScanConfig,
1386 projected_schema: &SchemaRef,
1387) -> Vec<LexOrdering> {
1388 let mut all_orderings = vec![];
1389 for output_ordering in &base_config.output_ordering {
1390 let mut new_ordering = LexOrdering::default();
1391 for PhysicalSortExpr { expr, options } in output_ordering.iter() {
1392 if let Some(col) = expr.as_any().downcast_ref::<Column>() {
1393 let name = col.name();
1394 if let Some((idx, _)) = projected_schema.column_with_name(name) {
1395 new_ordering.push(PhysicalSortExpr {
1397 expr: Arc::new(Column::new(name, idx)),
1398 options: *options,
1399 });
1400 continue;
1401 }
1402 }
1403 break;
1406 }
1407
1408 if new_ordering.is_empty() {
1411 continue;
1412 }
1413
1414 if base_config.file_groups.iter().any(|group| {
1416 if group.len() <= 1 {
1417 return false;
1419 }
1420
1421 let statistics = match MinMaxStatistics::new_from_files(
1422 &new_ordering,
1423 projected_schema,
1424 base_config.projection.as_deref(),
1425 group.iter(),
1426 ) {
1427 Ok(statistics) => statistics,
1428 Err(e) => {
1429 log::trace!("Error fetching statistics for file group: {e}");
1430 return true;
1432 }
1433 };
1434
1435 !statistics.is_sorted()
1436 }) {
1437 debug!(
1438 "Skipping specified output ordering {:?}. \
1439 Some file groups couldn't be determined to be sorted: {:?}",
1440 base_config.output_ordering[0], base_config.file_groups
1441 );
1442 continue;
1443 }
1444
1445 all_orderings.push(new_ordering);
1446 }
1447 all_orderings
1448}
1449
1450pub fn wrap_partition_type_in_dict(val_type: DataType) -> DataType {
1461 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type))
1462}
1463
1464pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {
1468 ScalarValue::Dictionary(Box::new(DataType::UInt16), Box::new(val))
1469}
1470
1471#[cfg(test)]
1472mod tests {
1473 use crate::{
1474 generate_test_files, test_util::MockSource, tests::aggr_test_schema,
1475 verify_sort_integrity,
1476 };
1477
1478 use super::*;
1479 use arrow::{
1480 array::{Int32Array, RecordBatch},
1481 compute::SortOptions,
1482 };
1483
1484 use datafusion_common::stats::Precision;
1485 use datafusion_common::{assert_batches_eq, DFSchema};
1486 use datafusion_expr::{execution_props::ExecutionProps, SortExpr};
1487 use datafusion_physical_expr::create_physical_expr;
1488 use std::collections::HashMap;
1489
1490 fn create_physical_sort_expr(
1491 e: &SortExpr,
1492 input_dfschema: &DFSchema,
1493 execution_props: &ExecutionProps,
1494 ) -> Result<PhysicalSortExpr> {
1495 let SortExpr {
1496 expr,
1497 asc,
1498 nulls_first,
1499 } = e;
1500 Ok(PhysicalSortExpr {
1501 expr: create_physical_expr(expr, input_dfschema, execution_props)?,
1502 options: SortOptions {
1503 descending: !asc,
1504 nulls_first: *nulls_first,
1505 },
1506 })
1507 }
1508
1509 pub fn columns(schema: &Schema) -> Vec<String> {
1511 schema.fields().iter().map(|f| f.name().clone()).collect()
1512 }
1513
1514 #[test]
1515 fn physical_plan_config_no_projection() {
1516 let file_schema = aggr_test_schema();
1517 let conf = config_for_projection(
1518 Arc::clone(&file_schema),
1519 None,
1520 Statistics::new_unknown(&file_schema),
1521 to_partition_cols(vec![(
1522 "date".to_owned(),
1523 wrap_partition_type_in_dict(DataType::Utf8),
1524 )]),
1525 );
1526
1527 let (proj_schema, _, proj_statistics, _) = conf.project();
1528 assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
1529 assert_eq!(
1530 proj_schema.field(file_schema.fields().len()).name(),
1531 "date",
1532 "partition columns are the last columns"
1533 );
1534 assert_eq!(
1535 proj_statistics.column_statistics.len(),
1536 file_schema.fields().len() + 1
1537 );
1538 let col_names = conf.projected_file_column_names();
1541 assert_eq!(col_names, None);
1542
1543 let col_indices = conf.file_column_projection_indices();
1544 assert_eq!(col_indices, None);
1545 }
1546
1547 #[test]
1548 fn physical_plan_config_no_projection_tab_cols_as_field() {
1549 let file_schema = aggr_test_schema();
1550
1551 let table_partition_col =
1553 Field::new("date", wrap_partition_type_in_dict(DataType::Utf8), true)
1554 .with_metadata(HashMap::from_iter(vec![(
1555 "key_whatever".to_owned(),
1556 "value_whatever".to_owned(),
1557 )]));
1558
1559 let conf = config_for_projection(
1560 Arc::clone(&file_schema),
1561 None,
1562 Statistics::new_unknown(&file_schema),
1563 vec![table_partition_col.clone()],
1564 );
1565
1566 let (proj_schema, _, _, _) = conf.project();
1568 assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
1569 assert_eq!(
1570 *proj_schema.field(file_schema.fields().len()),
1571 table_partition_col,
1572 "partition columns are the last columns and ust have all values defined in created field"
1573 );
1574 }
1575
1576 #[test]
1577 fn physical_plan_config_with_projection() {
1578 let file_schema = aggr_test_schema();
1579 let conf = config_for_projection(
1580 Arc::clone(&file_schema),
1581 Some(vec![file_schema.fields().len(), 0]),
1582 Statistics {
1583 num_rows: Precision::Inexact(10),
1584 column_statistics: (0..file_schema.fields().len())
1587 .map(|i| ColumnStatistics {
1588 distinct_count: Precision::Inexact(i),
1589 ..Default::default()
1590 })
1591 .collect(),
1592 total_byte_size: Precision::Absent,
1593 },
1594 to_partition_cols(vec![(
1595 "date".to_owned(),
1596 wrap_partition_type_in_dict(DataType::Utf8),
1597 )]),
1598 );
1599
1600 let (proj_schema, _, proj_statistics, _) = conf.project();
1601 assert_eq!(
1602 columns(&proj_schema),
1603 vec!["date".to_owned(), "c1".to_owned()]
1604 );
1605 let proj_stat_cols = proj_statistics.column_statistics;
1606 assert_eq!(proj_stat_cols.len(), 2);
1607 assert_eq!(proj_stat_cols[1].distinct_count, Precision::Inexact(0));
1610
1611 let col_names = conf.projected_file_column_names();
1612 assert_eq!(col_names, Some(vec!["c1".to_owned()]));
1613
1614 let col_indices = conf.file_column_projection_indices();
1615 assert_eq!(col_indices, Some(vec![0]));
1616 }
1617
1618 #[test]
1619 fn partition_column_projector() {
1620 let file_batch = build_table_i32(
1621 ("a", &vec![0, 1, 2]),
1622 ("b", &vec![-2, -1, 0]),
1623 ("c", &vec![10, 11, 12]),
1624 );
1625 let partition_cols = vec![
1626 (
1627 "year".to_owned(),
1628 wrap_partition_type_in_dict(DataType::Utf8),
1629 ),
1630 (
1631 "month".to_owned(),
1632 wrap_partition_type_in_dict(DataType::Utf8),
1633 ),
1634 (
1635 "day".to_owned(),
1636 wrap_partition_type_in_dict(DataType::Utf8),
1637 ),
1638 ];
1639 let statistics = Statistics {
1641 num_rows: Precision::Inexact(3),
1642 total_byte_size: Precision::Absent,
1643 column_statistics: Statistics::unknown_column(&file_batch.schema()),
1644 };
1645
1646 let conf = config_for_projection(
1647 file_batch.schema(),
1648 Some(vec![
1650 0,
1651 1,
1652 2,
1653 file_batch.schema().fields().len(),
1654 file_batch.schema().fields().len() + 2,
1655 ]),
1656 statistics.clone(),
1657 to_partition_cols(partition_cols.clone()),
1658 );
1659
1660 let source_statistics = conf.file_source.statistics().unwrap();
1661 let conf_stats = conf.statistics().unwrap();
1662
1663 assert_eq!(conf_stats.num_rows, Precision::Inexact(3));
1665
1666 assert_eq!(conf_stats.column_statistics.len(), 5);
1668
1669 assert_eq!(source_statistics, statistics);
1671 assert_eq!(source_statistics.column_statistics.len(), 3);
1672
1673 let (proj_schema, ..) = conf.project();
1674 let mut proj = PartitionColumnProjector::new(
1676 proj_schema,
1677 &partition_cols
1678 .iter()
1679 .map(|x| x.0.clone())
1680 .collect::<Vec<_>>(),
1681 );
1682
1683 let projected_batch = proj
1685 .project(
1686 file_batch,
1688 &[
1689 wrap_partition_value_in_dict(ScalarValue::from("2021")),
1690 wrap_partition_value_in_dict(ScalarValue::from("10")),
1691 wrap_partition_value_in_dict(ScalarValue::from("26")),
1692 ],
1693 )
1694 .expect("Projection of partition columns into record batch failed");
1695 let expected = [
1696 "+---+----+----+------+-----+",
1697 "| a | b | c | year | day |",
1698 "+---+----+----+------+-----+",
1699 "| 0 | -2 | 10 | 2021 | 26 |",
1700 "| 1 | -1 | 11 | 2021 | 26 |",
1701 "| 2 | 0 | 12 | 2021 | 26 |",
1702 "+---+----+----+------+-----+",
1703 ];
1704 assert_batches_eq!(expected, &[projected_batch]);
1705
1706 let file_batch = build_table_i32(
1708 ("a", &vec![5, 6, 7, 8, 9]),
1709 ("b", &vec![-10, -9, -8, -7, -6]),
1710 ("c", &vec![12, 13, 14, 15, 16]),
1711 );
1712 let projected_batch = proj
1713 .project(
1714 file_batch,
1716 &[
1717 wrap_partition_value_in_dict(ScalarValue::from("2021")),
1718 wrap_partition_value_in_dict(ScalarValue::from("10")),
1719 wrap_partition_value_in_dict(ScalarValue::from("27")),
1720 ],
1721 )
1722 .expect("Projection of partition columns into record batch failed");
1723 let expected = [
1724 "+---+-----+----+------+-----+",
1725 "| a | b | c | year | day |",
1726 "+---+-----+----+------+-----+",
1727 "| 5 | -10 | 12 | 2021 | 27 |",
1728 "| 6 | -9 | 13 | 2021 | 27 |",
1729 "| 7 | -8 | 14 | 2021 | 27 |",
1730 "| 8 | -7 | 15 | 2021 | 27 |",
1731 "| 9 | -6 | 16 | 2021 | 27 |",
1732 "+---+-----+----+------+-----+",
1733 ];
1734 assert_batches_eq!(expected, &[projected_batch]);
1735
1736 let file_batch = build_table_i32(
1738 ("a", &vec![0, 1, 3]),
1739 ("b", &vec![2, 3, 4]),
1740 ("c", &vec![4, 5, 6]),
1741 );
1742 let projected_batch = proj
1743 .project(
1744 file_batch,
1746 &[
1747 wrap_partition_value_in_dict(ScalarValue::from("2021")),
1748 wrap_partition_value_in_dict(ScalarValue::from("10")),
1749 wrap_partition_value_in_dict(ScalarValue::from("28")),
1750 ],
1751 )
1752 .expect("Projection of partition columns into record batch failed");
1753 let expected = [
1754 "+---+---+---+------+-----+",
1755 "| a | b | c | year | day |",
1756 "+---+---+---+------+-----+",
1757 "| 0 | 2 | 4 | 2021 | 28 |",
1758 "| 1 | 3 | 5 | 2021 | 28 |",
1759 "| 3 | 4 | 6 | 2021 | 28 |",
1760 "+---+---+---+------+-----+",
1761 ];
1762 assert_batches_eq!(expected, &[projected_batch]);
1763
1764 let file_batch = build_table_i32(
1766 ("a", &vec![0, 1, 2]),
1767 ("b", &vec![-2, -1, 0]),
1768 ("c", &vec![10, 11, 12]),
1769 );
1770 let projected_batch = proj
1771 .project(
1772 file_batch,
1774 &[
1775 ScalarValue::from("2021"),
1776 ScalarValue::from("10"),
1777 ScalarValue::from("26"),
1778 ],
1779 )
1780 .expect("Projection of partition columns into record batch failed");
1781 let expected = [
1782 "+---+----+----+------+-----+",
1783 "| a | b | c | year | day |",
1784 "+---+----+----+------+-----+",
1785 "| 0 | -2 | 10 | 2021 | 26 |",
1786 "| 1 | -1 | 11 | 2021 | 26 |",
1787 "| 2 | 0 | 12 | 2021 | 26 |",
1788 "+---+----+----+------+-----+",
1789 ];
1790 assert_batches_eq!(expected, &[projected_batch]);
1791 }
1792
1793 #[test]
1794 fn test_projected_file_schema_with_partition_col() {
1795 let schema = aggr_test_schema();
1796 let partition_cols = vec![
1797 (
1798 "part1".to_owned(),
1799 wrap_partition_type_in_dict(DataType::Utf8),
1800 ),
1801 (
1802 "part2".to_owned(),
1803 wrap_partition_type_in_dict(DataType::Utf8),
1804 ),
1805 ];
1806
1807 let projection = config_for_projection(
1809 schema.clone(),
1810 Some(vec![0, 3, 5, schema.fields().len()]),
1811 Statistics::new_unknown(&schema),
1812 to_partition_cols(partition_cols),
1813 )
1814 .projected_file_schema();
1815
1816 let expected_columns = vec!["c1", "c4", "c6"];
1818 let actual_columns = projection
1819 .fields()
1820 .iter()
1821 .map(|f| f.name().clone())
1822 .collect::<Vec<_>>();
1823 assert_eq!(expected_columns, actual_columns);
1824 }
1825
1826 #[test]
1827 fn test_projected_file_schema_without_projection() {
1828 let schema = aggr_test_schema();
1829 let partition_cols = vec![
1830 (
1831 "part1".to_owned(),
1832 wrap_partition_type_in_dict(DataType::Utf8),
1833 ),
1834 (
1835 "part2".to_owned(),
1836 wrap_partition_type_in_dict(DataType::Utf8),
1837 ),
1838 ];
1839
1840 let projection = config_for_projection(
1842 schema.clone(),
1843 None,
1844 Statistics::new_unknown(&schema),
1845 to_partition_cols(partition_cols),
1846 )
1847 .projected_file_schema();
1848
1849 assert_eq!(projection.fields(), schema.fields());
1851 }
1852
1853 #[test]
1854 fn test_split_groups_by_statistics() -> Result<()> {
1855 use chrono::TimeZone;
1856 use datafusion_common::DFSchema;
1857 use datafusion_expr::execution_props::ExecutionProps;
1858 use object_store::{path::Path, ObjectMeta};
1859
1860 struct File {
1861 name: &'static str,
1862 date: &'static str,
1863 statistics: Vec<Option<(f64, f64)>>,
1864 }
1865 impl File {
1866 fn new(
1867 name: &'static str,
1868 date: &'static str,
1869 statistics: Vec<Option<(f64, f64)>>,
1870 ) -> Self {
1871 Self {
1872 name,
1873 date,
1874 statistics,
1875 }
1876 }
1877 }
1878
1879 struct TestCase {
1880 name: &'static str,
1881 file_schema: Schema,
1882 files: Vec<File>,
1883 sort: Vec<SortExpr>,
1884 expected_result: Result<Vec<Vec<&'static str>>, &'static str>,
1885 }
1886
1887 use datafusion_expr::col;
1888 let cases = vec![
1889 TestCase {
1890 name: "test sort",
1891 file_schema: Schema::new(vec![Field::new(
1892 "value".to_string(),
1893 DataType::Float64,
1894 false,
1895 )]),
1896 files: vec![
1897 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1898 File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1899 File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1900 ],
1901 sort: vec![col("value").sort(true, false)],
1902 expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1903 },
1904 TestCase {
1907 name: "test sort with files ordered differently",
1908 file_schema: Schema::new(vec![Field::new(
1909 "value".to_string(),
1910 DataType::Float64,
1911 false,
1912 )]),
1913 files: vec![
1914 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1915 File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1916 File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1917 ],
1918 sort: vec![col("value").sort(true, false)],
1919 expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1920 },
1921 TestCase {
1922 name: "reverse sort",
1923 file_schema: Schema::new(vec![Field::new(
1924 "value".to_string(),
1925 DataType::Float64,
1926 false,
1927 )]),
1928 files: vec![
1929 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1930 File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1931 File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1932 ],
1933 sort: vec![col("value").sort(false, true)],
1934 expected_result: Ok(vec![vec!["1", "0"], vec!["2"]]),
1935 },
1936 TestCase {
1938 name: "no nullable sort columns",
1939 file_schema: Schema::new(vec![Field::new(
1940 "value".to_string(),
1941 DataType::Float64,
1942 true, )]),
1944 files: vec![
1945 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1946 File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1947 File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1948 ],
1949 sort: vec![col("value").sort(true, false)],
1950 expected_result: Err("construct min/max statistics for split_groups_by_statistics\ncaused by\nbuild min rows\ncaused by\ncreate sorting columns\ncaused by\nError during planning: cannot sort by nullable column")
1951 },
1952 TestCase {
1953 name: "all three non-overlapping",
1954 file_schema: Schema::new(vec![Field::new(
1955 "value".to_string(),
1956 DataType::Float64,
1957 false,
1958 )]),
1959 files: vec![
1960 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1961 File::new("1", "2023-01-01", vec![Some((0.50, 0.99))]),
1962 File::new("2", "2023-01-02", vec![Some((1.00, 1.49))]),
1963 ],
1964 sort: vec![col("value").sort(true, false)],
1965 expected_result: Ok(vec![vec!["0", "1", "2"]]),
1966 },
1967 TestCase {
1968 name: "all three overlapping",
1969 file_schema: Schema::new(vec![Field::new(
1970 "value".to_string(),
1971 DataType::Float64,
1972 false,
1973 )]),
1974 files: vec![
1975 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1976 File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
1977 File::new("2", "2023-01-02", vec![Some((0.00, 0.49))]),
1978 ],
1979 sort: vec![col("value").sort(true, false)],
1980 expected_result: Ok(vec![vec!["0"], vec!["1"], vec!["2"]]),
1981 },
1982 TestCase {
1983 name: "empty input",
1984 file_schema: Schema::new(vec![Field::new(
1985 "value".to_string(),
1986 DataType::Float64,
1987 false,
1988 )]),
1989 files: vec![],
1990 sort: vec![col("value").sort(true, false)],
1991 expected_result: Ok(vec![]),
1992 },
1993 TestCase {
1994 name: "one file missing statistics",
1995 file_schema: Schema::new(vec![Field::new(
1996 "value".to_string(),
1997 DataType::Float64,
1998 false,
1999 )]),
2000 files: vec![
2001 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
2002 File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
2003 File::new("2", "2023-01-02", vec![None]),
2004 ],
2005 sort: vec![col("value").sort(true, false)],
2006 expected_result: Err("construct min/max statistics for split_groups_by_statistics\ncaused by\ncollect min/max values\ncaused by\nget min/max for column: 'value'\ncaused by\nError during planning: statistics not found"),
2007 },
2008 ];
2009
2010 for case in cases {
2011 let table_schema = Arc::new(Schema::new(
2012 case.file_schema
2013 .fields()
2014 .clone()
2015 .into_iter()
2016 .cloned()
2017 .chain(Some(Arc::new(Field::new(
2018 "date".to_string(),
2019 DataType::Utf8,
2020 false,
2021 ))))
2022 .collect::<Vec<_>>(),
2023 ));
2024 let sort_order = LexOrdering::from(
2025 case.sort
2026 .into_iter()
2027 .map(|expr| {
2028 create_physical_sort_expr(
2029 &expr,
2030 &DFSchema::try_from(table_schema.as_ref().clone())?,
2031 &ExecutionProps::default(),
2032 )
2033 })
2034 .collect::<Result<Vec<_>>>()?,
2035 );
2036
2037 let partitioned_files = FileGroup::new(
2038 case.files.into_iter().map(From::from).collect::<Vec<_>>(),
2039 );
2040 let result = FileScanConfig::split_groups_by_statistics(
2041 &table_schema,
2042 &[partitioned_files.clone()],
2043 &sort_order,
2044 );
2045 let results_by_name = result
2046 .as_ref()
2047 .map(|file_groups| {
2048 file_groups
2049 .iter()
2050 .map(|file_group| {
2051 file_group
2052 .iter()
2053 .map(|file| {
2054 partitioned_files
2055 .iter()
2056 .find_map(|f| {
2057 if f.object_meta == file.object_meta {
2058 Some(
2059 f.object_meta
2060 .location
2061 .as_ref()
2062 .rsplit('/')
2063 .next()
2064 .unwrap()
2065 .trim_end_matches(".parquet"),
2066 )
2067 } else {
2068 None
2069 }
2070 })
2071 .unwrap()
2072 })
2073 .collect::<Vec<_>>()
2074 })
2075 .collect::<Vec<_>>()
2076 })
2077 .map_err(|e| e.strip_backtrace().leak() as &'static str);
2078
2079 assert_eq!(results_by_name, case.expected_result, "{}", case.name);
2080 }
2081
2082 return Ok(());
2083
2084 impl From<File> for PartitionedFile {
2085 fn from(file: File) -> Self {
2086 PartitionedFile {
2087 object_meta: ObjectMeta {
2088 location: Path::from(format!(
2089 "data/date={}/{}.parquet",
2090 file.date, file.name
2091 )),
2092 last_modified: chrono::Utc.timestamp_nanos(0),
2093 size: 0,
2094 e_tag: None,
2095 version: None,
2096 },
2097 partition_values: vec![ScalarValue::from(file.date)],
2098 range: None,
2099 statistics: Some(Arc::new(Statistics {
2100 num_rows: Precision::Absent,
2101 total_byte_size: Precision::Absent,
2102 column_statistics: file
2103 .statistics
2104 .into_iter()
2105 .map(|stats| {
2106 stats
2107 .map(|(min, max)| ColumnStatistics {
2108 min_value: Precision::Exact(ScalarValue::from(
2109 min,
2110 )),
2111 max_value: Precision::Exact(ScalarValue::from(
2112 max,
2113 )),
2114 ..Default::default()
2115 })
2116 .unwrap_or_default()
2117 })
2118 .collect::<Vec<_>>(),
2119 })),
2120 extensions: None,
2121 metadata_size_hint: None,
2122 }
2123 }
2124 }
2125 }
2126
2127 fn config_for_projection(
2129 file_schema: SchemaRef,
2130 projection: Option<Vec<usize>>,
2131 statistics: Statistics,
2132 table_partition_cols: Vec<Field>,
2133 ) -> FileScanConfig {
2134 FileScanConfigBuilder::new(
2135 ObjectStoreUrl::parse("test:///").unwrap(),
2136 file_schema,
2137 Arc::new(MockSource::default()),
2138 )
2139 .with_projection(projection)
2140 .with_statistics(statistics)
2141 .with_table_partition_cols(table_partition_cols)
2142 .build()
2143 }
2144
2145 fn to_partition_cols(table_partition_cols: Vec<(String, DataType)>) -> Vec<Field> {
2147 table_partition_cols
2148 .iter()
2149 .map(|(name, dtype)| Field::new(name, dtype.clone(), false))
2150 .collect::<Vec<_>>()
2151 }
2152
2153 pub fn build_table_i32(
2155 a: (&str, &Vec<i32>),
2156 b: (&str, &Vec<i32>),
2157 c: (&str, &Vec<i32>),
2158 ) -> RecordBatch {
2159 let schema = Schema::new(vec![
2160 Field::new(a.0, DataType::Int32, false),
2161 Field::new(b.0, DataType::Int32, false),
2162 Field::new(c.0, DataType::Int32, false),
2163 ]);
2164
2165 RecordBatch::try_new(
2166 Arc::new(schema),
2167 vec![
2168 Arc::new(Int32Array::from(a.1.clone())),
2169 Arc::new(Int32Array::from(b.1.clone())),
2170 Arc::new(Int32Array::from(c.1.clone())),
2171 ],
2172 )
2173 .unwrap()
2174 }
2175
2176 #[test]
2177 fn test_file_scan_config_builder() {
2178 let file_schema = aggr_test_schema();
2179 let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2180 let file_source: Arc<dyn FileSource> = Arc::new(MockSource::default());
2181
2182 let builder = FileScanConfigBuilder::new(
2184 object_store_url.clone(),
2185 Arc::clone(&file_schema),
2186 Arc::clone(&file_source),
2187 );
2188
2189 let config = builder
2191 .with_limit(Some(1000))
2192 .with_projection(Some(vec![0, 1]))
2193 .with_table_partition_cols(vec![Field::new(
2194 "date",
2195 wrap_partition_type_in_dict(DataType::Utf8),
2196 false,
2197 )])
2198 .with_constraints(Constraints::empty())
2199 .with_statistics(Statistics::new_unknown(&file_schema))
2200 .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new(
2201 "test.parquet".to_string(),
2202 1024,
2203 )])])
2204 .with_output_ordering(vec![LexOrdering::default()])
2205 .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
2206 .with_newlines_in_values(true)
2207 .build();
2208
2209 assert_eq!(config.object_store_url, object_store_url);
2211 assert_eq!(config.file_schema, file_schema);
2212 assert_eq!(config.limit, Some(1000));
2213 assert_eq!(config.projection, Some(vec![0, 1]));
2214 assert_eq!(config.table_partition_cols.len(), 1);
2215 assert_eq!(config.table_partition_cols[0].name(), "date");
2216 assert_eq!(config.file_groups.len(), 1);
2217 assert_eq!(config.file_groups[0].len(), 1);
2218 assert_eq!(
2219 config.file_groups[0][0].object_meta.location.as_ref(),
2220 "test.parquet"
2221 );
2222 assert_eq!(
2223 config.file_compression_type,
2224 FileCompressionType::UNCOMPRESSED
2225 );
2226 assert!(config.new_lines_in_values);
2227 assert_eq!(config.output_ordering.len(), 1);
2228 }
2229
2230 #[test]
2231 fn test_file_scan_config_builder_defaults() {
2232 let file_schema = aggr_test_schema();
2233 let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2234 let file_source: Arc<dyn FileSource> = Arc::new(MockSource::default());
2235
2236 let config = FileScanConfigBuilder::new(
2238 object_store_url.clone(),
2239 Arc::clone(&file_schema),
2240 Arc::clone(&file_source),
2241 )
2242 .build();
2243
2244 assert_eq!(config.object_store_url, object_store_url);
2246 assert_eq!(config.file_schema, file_schema);
2247 assert_eq!(config.limit, None);
2248 assert_eq!(config.projection, None);
2249 assert!(config.table_partition_cols.is_empty());
2250 assert!(config.file_groups.is_empty());
2251 assert_eq!(
2252 config.file_compression_type,
2253 FileCompressionType::UNCOMPRESSED
2254 );
2255 assert!(!config.new_lines_in_values);
2256 assert!(config.output_ordering.is_empty());
2257 assert!(config.constraints.is_empty());
2258
2259 assert_eq!(
2261 config.file_source.statistics().unwrap().num_rows,
2262 Precision::Absent
2263 );
2264 assert_eq!(
2265 config.file_source.statistics().unwrap().total_byte_size,
2266 Precision::Absent
2267 );
2268 assert_eq!(
2269 config
2270 .file_source
2271 .statistics()
2272 .unwrap()
2273 .column_statistics
2274 .len(),
2275 file_schema.fields().len()
2276 );
2277 for stat in config.file_source.statistics().unwrap().column_statistics {
2278 assert_eq!(stat.distinct_count, Precision::Absent);
2279 assert_eq!(stat.min_value, Precision::Absent);
2280 assert_eq!(stat.max_value, Precision::Absent);
2281 assert_eq!(stat.null_count, Precision::Absent);
2282 }
2283 }
2284
2285 #[test]
2286 fn test_file_scan_config_builder_new_from() {
2287 let schema = aggr_test_schema();
2288 let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2289 let file_source: Arc<dyn FileSource> = Arc::new(MockSource::default());
2290 let partition_cols = vec![Field::new(
2291 "date",
2292 wrap_partition_type_in_dict(DataType::Utf8),
2293 false,
2294 )];
2295 let file = PartitionedFile::new("test_file.parquet", 100);
2296
2297 let original_config = FileScanConfigBuilder::new(
2299 object_store_url.clone(),
2300 Arc::clone(&schema),
2301 Arc::clone(&file_source),
2302 )
2303 .with_projection(Some(vec![0, 2]))
2304 .with_limit(Some(10))
2305 .with_table_partition_cols(partition_cols.clone())
2306 .with_file(file.clone())
2307 .with_constraints(Constraints::default())
2308 .with_newlines_in_values(true)
2309 .build();
2310
2311 let new_builder = FileScanConfigBuilder::from(original_config);
2313
2314 let new_config = new_builder.build();
2316
2317 assert_eq!(new_config.object_store_url, object_store_url);
2319 assert_eq!(new_config.file_schema, schema);
2320 assert_eq!(new_config.projection, Some(vec![0, 2]));
2321 assert_eq!(new_config.limit, Some(10));
2322 assert_eq!(new_config.table_partition_cols, partition_cols);
2323 assert_eq!(new_config.file_groups.len(), 1);
2324 assert_eq!(new_config.file_groups[0].len(), 1);
2325 assert_eq!(
2326 new_config.file_groups[0][0].object_meta.location.as_ref(),
2327 "test_file.parquet"
2328 );
2329 assert_eq!(new_config.constraints, Constraints::default());
2330 assert!(new_config.new_lines_in_values);
2331 }
2332
2333 #[test]
2334 fn test_split_groups_by_statistics_with_target_partitions() -> Result<()> {
2335 use datafusion_common::DFSchema;
2336 use datafusion_expr::{col, execution_props::ExecutionProps};
2337
2338 let schema = Arc::new(Schema::new(vec![Field::new(
2339 "value",
2340 DataType::Float64,
2341 false,
2342 )]));
2343
2344 let exec_props = ExecutionProps::new();
2346 let df_schema = DFSchema::try_from_qualified_schema("test", schema.as_ref())?;
2347 let sort_expr = vec![col("value").sort(true, false)];
2348
2349 let physical_sort_exprs: Vec<_> = sort_expr
2350 .iter()
2351 .map(|expr| create_physical_sort_expr(expr, &df_schema, &exec_props).unwrap())
2352 .collect();
2353
2354 let sort_ordering = LexOrdering::from(physical_sort_exprs);
2355
2356 struct TestCase {
2358 name: String,
2359 file_count: usize,
2360 overlap_factor: f64,
2361 target_partitions: usize,
2362 expected_partition_count: usize,
2363 }
2364
2365 let test_cases = vec![
2366 TestCase {
2368 name: "no_overlap_10_files_4_partitions".to_string(),
2369 file_count: 10,
2370 overlap_factor: 0.0,
2371 target_partitions: 4,
2372 expected_partition_count: 4,
2373 },
2374 TestCase {
2375 name: "medium_overlap_20_files_5_partitions".to_string(),
2376 file_count: 20,
2377 overlap_factor: 0.5,
2378 target_partitions: 5,
2379 expected_partition_count: 5,
2380 },
2381 TestCase {
2382 name: "high_overlap_30_files_3_partitions".to_string(),
2383 file_count: 30,
2384 overlap_factor: 0.8,
2385 target_partitions: 3,
2386 expected_partition_count: 7,
2387 },
2388 TestCase {
2390 name: "fewer_files_than_partitions".to_string(),
2391 file_count: 3,
2392 overlap_factor: 0.0,
2393 target_partitions: 10,
2394 expected_partition_count: 3, },
2396 TestCase {
2397 name: "single_file".to_string(),
2398 file_count: 1,
2399 overlap_factor: 0.0,
2400 target_partitions: 5,
2401 expected_partition_count: 1, },
2403 TestCase {
2404 name: "empty_files".to_string(),
2405 file_count: 0,
2406 overlap_factor: 0.0,
2407 target_partitions: 3,
2408 expected_partition_count: 0, },
2410 ];
2411
2412 for case in test_cases {
2413 println!("Running test case: {}", case.name);
2414
2415 let file_groups = generate_test_files(case.file_count, case.overlap_factor);
2417
2418 let result =
2420 FileScanConfig::split_groups_by_statistics_with_target_partitions(
2421 &schema,
2422 &file_groups,
2423 &sort_ordering,
2424 case.target_partitions,
2425 )?;
2426
2427 println!(
2429 "Created {} partitions (target was {})",
2430 result.len(),
2431 case.target_partitions
2432 );
2433
2434 assert_eq!(
2436 result.len(),
2437 case.expected_partition_count,
2438 "Case '{}': Unexpected partition count",
2439 case.name
2440 );
2441
2442 assert!(
2444 verify_sort_integrity(&result),
2445 "Case '{}': Files within partitions are not properly ordered",
2446 case.name
2447 );
2448
2449 if case.file_count > 1 && case.expected_partition_count > 1 {
2451 let group_sizes: Vec<usize> = result.iter().map(FileGroup::len).collect();
2452 let max_size = *group_sizes.iter().max().unwrap();
2453 let min_size = *group_sizes.iter().min().unwrap();
2454
2455 let avg_files_per_partition =
2457 case.file_count as f64 / case.expected_partition_count as f64;
2458 assert!(
2459 (max_size as f64) < 2.0 * avg_files_per_partition,
2460 "Case '{}': Unbalanced distribution. Max partition size {} exceeds twice the average {}",
2461 case.name,
2462 max_size,
2463 avg_files_per_partition
2464 );
2465
2466 println!(
2467 "Distribution - min files: {}, max files: {}",
2468 min_size, max_size
2469 );
2470 }
2471 }
2472
2473 let empty_groups: Vec<FileGroup> = vec![];
2475 let err = FileScanConfig::split_groups_by_statistics_with_target_partitions(
2476 &schema,
2477 &empty_groups,
2478 &sort_ordering,
2479 0,
2480 )
2481 .unwrap_err();
2482
2483 assert!(
2484 err.to_string()
2485 .contains("target_partitions must be greater than 0"),
2486 "Expected error for zero target partitions"
2487 );
2488
2489 Ok(())
2490 }
2491}