1use std::{
22 any::Any, borrow::Cow, collections::HashMap, fmt::Debug, fmt::Formatter,
23 fmt::Result as FmtResult, marker::PhantomData, sync::Arc,
24};
25
26use arrow::{
27 array::{
28 ArrayData, ArrayRef, BufferBuilder, DictionaryArray, RecordBatch,
29 RecordBatchOptions,
30 },
31 buffer::Buffer,
32 datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef, UInt16Type},
33};
34use datafusion_common::{exec_err, ColumnStatistics, Constraints, Result, Statistics};
35use datafusion_common::{DataFusionError, ScalarValue};
36use datafusion_execution::{
37 object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext,
38};
39use datafusion_physical_expr::{
40 expressions::Column, EquivalenceProperties, LexOrdering, Partitioning,
41 PhysicalSortExpr,
42};
43use datafusion_physical_plan::{
44 display::{display_orderings, ProjectSchemaDisplay},
45 metrics::ExecutionPlanMetricsSet,
46 projection::{all_alias_free_columns, new_projections_for_columns, ProjectionExec},
47 DisplayAs, DisplayFormatType, ExecutionPlan,
48};
49use log::{debug, warn};
50
51use crate::{
52 display::FileGroupsDisplay,
53 file::FileSource,
54 file_compression_type::FileCompressionType,
55 file_stream::FileStream,
56 source::{DataSource, DataSourceExec},
57 statistics::MinMaxStatistics,
58 PartitionedFile,
59};
60
61#[derive(Clone)]
122pub struct FileScanConfig {
123 pub object_store_url: ObjectStoreUrl,
135 pub file_schema: SchemaRef,
139 pub file_groups: Vec<Vec<PartitionedFile>>,
149 pub constraints: Constraints,
151 pub statistics: Statistics,
154 pub projection: Option<Vec<usize>>,
157 pub limit: Option<usize>,
160 pub table_partition_cols: Vec<Field>,
162 pub output_ordering: Vec<LexOrdering>,
164 pub file_compression_type: FileCompressionType,
166 pub new_lines_in_values: bool,
168 pub file_source: Arc<dyn FileSource>,
170}
171
172impl DataSource for FileScanConfig {
173 fn open(
174 &self,
175 partition: usize,
176 context: Arc<TaskContext>,
177 ) -> Result<SendableRecordBatchStream> {
178 let object_store = context.runtime_env().object_store(&self.object_store_url)?;
179
180 let source = self
181 .file_source
182 .with_batch_size(context.session_config().batch_size())
183 .with_schema(Arc::clone(&self.file_schema))
184 .with_projection(self);
185
186 let opener = source.create_file_opener(object_store, self, partition);
187
188 let stream = FileStream::new(self, partition, opener, source.metrics())?;
189 Ok(Box::pin(stream))
190 }
191
192 fn as_any(&self) -> &dyn Any {
193 self
194 }
195
196 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
197 let (schema, _, _, orderings) = self.project();
198
199 write!(f, "file_groups=")?;
200 FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
201
202 if !schema.fields().is_empty() {
203 write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
204 }
205
206 if let Some(limit) = self.limit {
207 write!(f, ", limit={limit}")?;
208 }
209
210 display_orderings(f, &orderings)?;
211
212 if !self.constraints.is_empty() {
213 write!(f, ", {}", self.constraints)?;
214 }
215
216 self.fmt_file_source(t, f)
217 }
218
219 fn repartitioned(
221 &self,
222 target_partitions: usize,
223 repartition_file_min_size: usize,
224 output_ordering: Option<LexOrdering>,
225 ) -> Result<Option<Arc<dyn DataSource>>> {
226 let source = self.file_source.repartitioned(
227 target_partitions,
228 repartition_file_min_size,
229 output_ordering,
230 self,
231 )?;
232
233 Ok(source.map(|s| Arc::new(s) as _))
234 }
235
236 fn output_partitioning(&self) -> Partitioning {
237 Partitioning::UnknownPartitioning(self.file_groups.len())
238 }
239
240 fn eq_properties(&self) -> EquivalenceProperties {
241 let (schema, constraints, _, orderings) = self.project();
242 EquivalenceProperties::new_with_orderings(schema, orderings.as_slice())
243 .with_constraints(constraints)
244 }
245
246 fn statistics(&self) -> Result<Statistics> {
247 Ok(self.projected_stats())
248 }
249
250 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
251 let source = self.clone();
252 Some(Arc::new(source.with_limit(limit)))
253 }
254
255 fn fetch(&self) -> Option<usize> {
256 self.limit
257 }
258
259 fn metrics(&self) -> ExecutionPlanMetricsSet {
260 self.file_source.metrics().clone()
261 }
262
263 fn try_swapping_with_projection(
264 &self,
265 projection: &ProjectionExec,
266 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
267 let partitioned_columns_in_proj = projection.expr().iter().any(|(expr, _)| {
271 expr.as_any()
272 .downcast_ref::<Column>()
273 .map(|expr| expr.index() >= self.file_schema.fields().len())
274 .unwrap_or(false)
275 });
276
277 Ok(
278 (all_alias_free_columns(projection.expr()) && !partitioned_columns_in_proj)
279 .then(|| {
280 let file_scan = self.clone();
281 let source = Arc::clone(&file_scan.file_source);
282 let new_projections = new_projections_for_columns(
283 projection,
284 &file_scan
285 .projection
286 .clone()
287 .unwrap_or((0..self.file_schema.fields().len()).collect()),
288 );
289 file_scan
290 .with_projection(Some(new_projections))
292 .with_source(source)
293 .build() as _
294 }),
295 )
296 }
297}
298
299impl FileScanConfig {
300 pub fn new(
311 object_store_url: ObjectStoreUrl,
312 file_schema: SchemaRef,
313 file_source: Arc<dyn FileSource>,
314 ) -> Self {
315 let statistics = Statistics::new_unknown(&file_schema);
316
317 let mut config = Self {
318 object_store_url,
319 file_schema,
320 file_groups: vec![],
321 constraints: Constraints::empty(),
322 statistics,
323 projection: None,
324 limit: None,
325 table_partition_cols: vec![],
326 output_ordering: vec![],
327 file_compression_type: FileCompressionType::UNCOMPRESSED,
328 new_lines_in_values: false,
329 file_source: Arc::clone(&file_source),
330 };
331
332 config = config.with_source(Arc::clone(&file_source));
333 config
334 }
335
336 pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
338 self.file_source = file_source.with_statistics(self.statistics.clone());
339 self
340 }
341
342 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
344 self.constraints = constraints;
345 self
346 }
347
348 pub fn with_statistics(mut self, statistics: Statistics) -> Self {
350 self.statistics = statistics.clone();
351 self.file_source = self.file_source.with_statistics(statistics);
352 self
353 }
354
355 fn projection_indices(&self) -> Vec<usize> {
356 match &self.projection {
357 Some(proj) => proj.clone(),
358 None => (0..self.file_schema.fields().len()
359 + self.table_partition_cols.len())
360 .collect(),
361 }
362 }
363
364 fn projected_stats(&self) -> Statistics {
365 let statistics = self
366 .file_source
367 .statistics()
368 .unwrap_or(self.statistics.clone());
369
370 let table_cols_stats = self
371 .projection_indices()
372 .into_iter()
373 .map(|idx| {
374 if idx < self.file_schema.fields().len() {
375 statistics.column_statistics[idx].clone()
376 } else {
377 ColumnStatistics::new_unknown()
379 }
380 })
381 .collect();
382
383 Statistics {
384 num_rows: statistics.num_rows,
385 total_byte_size: statistics.total_byte_size,
387 column_statistics: table_cols_stats,
388 }
389 }
390
391 fn projected_schema(&self) -> Arc<Schema> {
392 let table_fields: Vec<_> = self
393 .projection_indices()
394 .into_iter()
395 .map(|idx| {
396 if idx < self.file_schema.fields().len() {
397 self.file_schema.field(idx).clone()
398 } else {
399 let partition_idx = idx - self.file_schema.fields().len();
400 self.table_partition_cols[partition_idx].clone()
401 }
402 })
403 .collect();
404
405 Arc::new(Schema::new_with_metadata(
406 table_fields,
407 self.file_schema.metadata().clone(),
408 ))
409 }
410
411 fn projected_constraints(&self) -> Constraints {
412 let indexes = self.projection_indices();
413
414 self.constraints
415 .project(&indexes)
416 .unwrap_or_else(Constraints::empty)
417 }
418
419 pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
421 self.projection = projection;
422 self
423 }
424
425 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
427 self.limit = limit;
428 self
429 }
430
431 pub fn with_file(self, file: PartitionedFile) -> Self {
435 self.with_file_group(vec![file])
436 }
437
438 pub fn with_file_groups(
442 mut self,
443 mut file_groups: Vec<Vec<PartitionedFile>>,
444 ) -> Self {
445 self.file_groups.append(&mut file_groups);
446 self
447 }
448
449 pub fn with_file_group(mut self, file_group: Vec<PartitionedFile>) -> Self {
453 self.file_groups.push(file_group);
454 self
455 }
456
457 pub fn with_table_partition_cols(mut self, table_partition_cols: Vec<Field>) -> Self {
459 self.table_partition_cols = table_partition_cols;
460 self
461 }
462
463 pub fn with_output_ordering(mut self, output_ordering: Vec<LexOrdering>) -> Self {
465 self.output_ordering = output_ordering;
466 self
467 }
468
469 pub fn with_file_compression_type(
471 mut self,
472 file_compression_type: FileCompressionType,
473 ) -> Self {
474 self.file_compression_type = file_compression_type;
475 self
476 }
477
478 pub fn with_newlines_in_values(mut self, new_lines_in_values: bool) -> Self {
480 self.new_lines_in_values = new_lines_in_values;
481 self
482 }
483
484 pub fn newlines_in_values(&self) -> bool {
492 self.new_lines_in_values
493 }
494
495 pub fn project(&self) -> (SchemaRef, Constraints, Statistics, Vec<LexOrdering>) {
497 if self.projection.is_none() && self.table_partition_cols.is_empty() {
498 return (
499 Arc::clone(&self.file_schema),
500 self.constraints.clone(),
501 self.statistics.clone(),
502 self.output_ordering.clone(),
503 );
504 }
505
506 let schema = self.projected_schema();
507 let constraints = self.projected_constraints();
508 let stats = self.projected_stats();
509
510 let output_ordering = get_projected_output_ordering(self, &schema);
511
512 (schema, constraints, stats, output_ordering)
513 }
514
515 #[cfg_attr(not(feature = "avro"), allow(unused))] pub fn projected_file_column_names(&self) -> Option<Vec<String>> {
517 self.projection.as_ref().map(|p| {
518 p.iter()
519 .filter(|col_idx| **col_idx < self.file_schema.fields().len())
520 .map(|col_idx| self.file_schema.field(*col_idx).name())
521 .cloned()
522 .collect()
523 })
524 }
525
526 pub fn projected_file_schema(&self) -> SchemaRef {
528 let fields = self.file_column_projection_indices().map(|indices| {
529 indices
530 .iter()
531 .map(|col_idx| self.file_schema.field(*col_idx))
532 .cloned()
533 .collect::<Vec<_>>()
534 });
535
536 fields.map_or_else(
537 || Arc::clone(&self.file_schema),
538 |f| {
539 Arc::new(Schema::new_with_metadata(
540 f,
541 self.file_schema.metadata.clone(),
542 ))
543 },
544 )
545 }
546
547 pub fn file_column_projection_indices(&self) -> Option<Vec<usize>> {
548 self.projection.as_ref().map(|p| {
549 p.iter()
550 .filter(|col_idx| **col_idx < self.file_schema.fields().len())
551 .copied()
552 .collect()
553 })
554 }
555
556 pub fn split_groups_by_statistics(
560 table_schema: &SchemaRef,
561 file_groups: &[Vec<PartitionedFile>],
562 sort_order: &LexOrdering,
563 ) -> Result<Vec<Vec<PartitionedFile>>> {
564 let flattened_files = file_groups.iter().flatten().collect::<Vec<_>>();
565 if flattened_files.is_empty() {
577 return Ok(vec![]);
578 }
579
580 let statistics = MinMaxStatistics::new_from_files(
581 sort_order,
582 table_schema,
583 None,
584 flattened_files.iter().copied(),
585 )
586 .map_err(|e| {
587 e.context("construct min/max statistics for split_groups_by_statistics")
588 })?;
589
590 let indices_sorted_by_min = statistics.min_values_sorted();
591 let mut file_groups_indices: Vec<Vec<usize>> = vec![];
592
593 for (idx, min) in indices_sorted_by_min {
594 let file_group_to_insert = file_groups_indices.iter_mut().find(|group| {
595 min > statistics.max(
598 *group
599 .last()
600 .expect("groups should be nonempty at construction"),
601 )
602 });
603 match file_group_to_insert {
604 Some(group) => group.push(idx),
605 None => file_groups_indices.push(vec![idx]),
606 }
607 }
608
609 Ok(file_groups_indices
611 .into_iter()
612 .map(|file_group_indices| {
613 file_group_indices
614 .into_iter()
615 .map(|idx| flattened_files[idx].clone())
616 .collect()
617 })
618 .collect())
619 }
620
621 pub fn build(self) -> Arc<DataSourceExec> {
624 Arc::new(DataSourceExec::new(Arc::new(self)))
625 }
626
627 fn fmt_file_source(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
629 write!(f, ", file_type={}", self.file_source.file_type())?;
630 self.file_source.fmt_extra(t, f)
631 }
632
633 pub fn file_source(&self) -> &Arc<dyn FileSource> {
635 &self.file_source
636 }
637}
638
639impl Debug for FileScanConfig {
640 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
641 write!(f, "FileScanConfig {{")?;
642 write!(f, "object_store_url={:?}, ", self.object_store_url)?;
643
644 write!(f, "statistics={:?}, ", self.statistics)?;
645
646 DisplayAs::fmt_as(self, DisplayFormatType::Verbose, f)?;
647 write!(f, "}}")
648 }
649}
650
651impl DisplayAs for FileScanConfig {
652 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
653 let (schema, _, _, orderings) = self.project();
654
655 write!(f, "file_groups=")?;
656 FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
657
658 if !schema.fields().is_empty() {
659 write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
660 }
661
662 if let Some(limit) = self.limit {
663 write!(f, ", limit={limit}")?;
664 }
665
666 display_orderings(f, &orderings)?;
667
668 if !self.constraints.is_empty() {
669 write!(f, ", {}", self.constraints)?;
670 }
671
672 Ok(())
673 }
674}
675
676pub struct PartitionColumnProjector {
683 key_buffer_cache: ZeroBufferGenerators,
687 projected_partition_indexes: Vec<(usize, usize)>,
691 projected_schema: SchemaRef,
693}
694
695impl PartitionColumnProjector {
696 pub fn new(projected_schema: SchemaRef, table_partition_cols: &[String]) -> Self {
700 let mut idx_map = HashMap::new();
701 for (partition_idx, partition_name) in table_partition_cols.iter().enumerate() {
702 if let Ok(schema_idx) = projected_schema.index_of(partition_name) {
703 idx_map.insert(partition_idx, schema_idx);
704 }
705 }
706
707 let mut projected_partition_indexes: Vec<_> = idx_map.into_iter().collect();
708 projected_partition_indexes.sort_by(|(_, a), (_, b)| a.cmp(b));
709
710 Self {
711 projected_partition_indexes,
712 key_buffer_cache: Default::default(),
713 projected_schema,
714 }
715 }
716
717 pub fn project(
722 &mut self,
723 file_batch: RecordBatch,
724 partition_values: &[ScalarValue],
725 ) -> Result<RecordBatch> {
726 let expected_cols =
727 self.projected_schema.fields().len() - self.projected_partition_indexes.len();
728
729 if file_batch.columns().len() != expected_cols {
730 return exec_err!(
731 "Unexpected batch schema from file, expected {} cols but got {}",
732 expected_cols,
733 file_batch.columns().len()
734 );
735 }
736
737 let mut cols = file_batch.columns().to_vec();
738 for &(pidx, sidx) in &self.projected_partition_indexes {
739 let p_value =
740 partition_values
741 .get(pidx)
742 .ok_or(DataFusionError::Execution(
743 "Invalid partitioning found on disk".to_string(),
744 ))?;
745
746 let mut partition_value = Cow::Borrowed(p_value);
747
748 let field = self.projected_schema.field(sidx);
750 let expected_data_type = field.data_type();
751 let actual_data_type = partition_value.data_type();
752 if let DataType::Dictionary(key_type, _) = expected_data_type {
753 if !matches!(actual_data_type, DataType::Dictionary(_, _)) {
754 warn!("Partition value for column {} was not dictionary-encoded, applied auto-fix.", field.name());
755 partition_value = Cow::Owned(ScalarValue::Dictionary(
756 key_type.clone(),
757 Box::new(partition_value.as_ref().clone()),
758 ));
759 }
760 }
761
762 cols.insert(
763 sidx,
764 create_output_array(
765 &mut self.key_buffer_cache,
766 partition_value.as_ref(),
767 file_batch.num_rows(),
768 )?,
769 )
770 }
771
772 RecordBatch::try_new_with_options(
773 Arc::clone(&self.projected_schema),
774 cols,
775 &RecordBatchOptions::new().with_row_count(Some(file_batch.num_rows())),
776 )
777 .map_err(Into::into)
778 }
779}
780
781#[derive(Debug, Default)]
782struct ZeroBufferGenerators {
783 gen_i8: ZeroBufferGenerator<i8>,
784 gen_i16: ZeroBufferGenerator<i16>,
785 gen_i32: ZeroBufferGenerator<i32>,
786 gen_i64: ZeroBufferGenerator<i64>,
787 gen_u8: ZeroBufferGenerator<u8>,
788 gen_u16: ZeroBufferGenerator<u16>,
789 gen_u32: ZeroBufferGenerator<u32>,
790 gen_u64: ZeroBufferGenerator<u64>,
791}
792
793#[derive(Debug, Default)]
795struct ZeroBufferGenerator<T>
796where
797 T: ArrowNativeType,
798{
799 cache: Option<Buffer>,
800 _t: PhantomData<T>,
801}
802
803impl<T> ZeroBufferGenerator<T>
804where
805 T: ArrowNativeType,
806{
807 const SIZE: usize = size_of::<T>();
808
809 fn get_buffer(&mut self, n_vals: usize) -> Buffer {
810 match &mut self.cache {
811 Some(buf) if buf.len() >= n_vals * Self::SIZE => {
812 buf.slice_with_length(0, n_vals * Self::SIZE)
813 }
814 _ => {
815 let mut key_buffer_builder = BufferBuilder::<T>::new(n_vals);
816 key_buffer_builder.advance(n_vals); self.cache.insert(key_buffer_builder.finish()).clone()
818 }
819 }
820 }
821}
822
823fn create_dict_array<T>(
824 buffer_gen: &mut ZeroBufferGenerator<T>,
825 dict_val: &ScalarValue,
826 len: usize,
827 data_type: DataType,
828) -> Result<ArrayRef>
829where
830 T: ArrowNativeType,
831{
832 let dict_vals = dict_val.to_array()?;
833
834 let sliced_key_buffer = buffer_gen.get_buffer(len);
835
836 let mut builder = ArrayData::builder(data_type)
838 .len(len)
839 .add_buffer(sliced_key_buffer);
840 builder = builder.add_child_data(dict_vals.to_data());
841 Ok(Arc::new(DictionaryArray::<UInt16Type>::from(
842 builder.build().unwrap(),
843 )))
844}
845
846fn create_output_array(
847 key_buffer_cache: &mut ZeroBufferGenerators,
848 val: &ScalarValue,
849 len: usize,
850) -> Result<ArrayRef> {
851 if let ScalarValue::Dictionary(key_type, dict_val) = &val {
852 match key_type.as_ref() {
853 DataType::Int8 => {
854 return create_dict_array(
855 &mut key_buffer_cache.gen_i8,
856 dict_val,
857 len,
858 val.data_type(),
859 );
860 }
861 DataType::Int16 => {
862 return create_dict_array(
863 &mut key_buffer_cache.gen_i16,
864 dict_val,
865 len,
866 val.data_type(),
867 );
868 }
869 DataType::Int32 => {
870 return create_dict_array(
871 &mut key_buffer_cache.gen_i32,
872 dict_val,
873 len,
874 val.data_type(),
875 );
876 }
877 DataType::Int64 => {
878 return create_dict_array(
879 &mut key_buffer_cache.gen_i64,
880 dict_val,
881 len,
882 val.data_type(),
883 );
884 }
885 DataType::UInt8 => {
886 return create_dict_array(
887 &mut key_buffer_cache.gen_u8,
888 dict_val,
889 len,
890 val.data_type(),
891 );
892 }
893 DataType::UInt16 => {
894 return create_dict_array(
895 &mut key_buffer_cache.gen_u16,
896 dict_val,
897 len,
898 val.data_type(),
899 );
900 }
901 DataType::UInt32 => {
902 return create_dict_array(
903 &mut key_buffer_cache.gen_u32,
904 dict_val,
905 len,
906 val.data_type(),
907 );
908 }
909 DataType::UInt64 => {
910 return create_dict_array(
911 &mut key_buffer_cache.gen_u64,
912 dict_val,
913 len,
914 val.data_type(),
915 );
916 }
917 _ => {}
918 }
919 }
920
921 val.to_array_of_size(len)
922}
923
924fn get_projected_output_ordering(
984 base_config: &FileScanConfig,
985 projected_schema: &SchemaRef,
986) -> Vec<LexOrdering> {
987 let mut all_orderings = vec![];
988 for output_ordering in &base_config.output_ordering {
989 let mut new_ordering = LexOrdering::default();
990 for PhysicalSortExpr { expr, options } in output_ordering.iter() {
991 if let Some(col) = expr.as_any().downcast_ref::<Column>() {
992 let name = col.name();
993 if let Some((idx, _)) = projected_schema.column_with_name(name) {
994 new_ordering.push(PhysicalSortExpr {
996 expr: Arc::new(Column::new(name, idx)),
997 options: *options,
998 });
999 continue;
1000 }
1001 }
1002 break;
1005 }
1006
1007 if new_ordering.is_empty() {
1010 continue;
1011 }
1012
1013 if base_config.file_groups.iter().any(|group| {
1015 if group.len() <= 1 {
1016 return false;
1018 }
1019
1020 let statistics = match MinMaxStatistics::new_from_files(
1021 &new_ordering,
1022 projected_schema,
1023 base_config.projection.as_deref(),
1024 group,
1025 ) {
1026 Ok(statistics) => statistics,
1027 Err(e) => {
1028 log::trace!("Error fetching statistics for file group: {e}");
1029 return true;
1031 }
1032 };
1033
1034 !statistics.is_sorted()
1035 }) {
1036 debug!(
1037 "Skipping specified output ordering {:?}. \
1038 Some file groups couldn't be determined to be sorted: {:?}",
1039 base_config.output_ordering[0], base_config.file_groups
1040 );
1041 continue;
1042 }
1043
1044 all_orderings.push(new_ordering);
1045 }
1046 all_orderings
1047}
1048
1049pub fn wrap_partition_type_in_dict(val_type: DataType) -> DataType {
1060 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type))
1061}
1062
1063pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {
1067 ScalarValue::Dictionary(Box::new(DataType::UInt16), Box::new(val))
1068}
1069
1070#[cfg(test)]
1071mod tests {
1072 use crate::{test_util::MockSource, tests::aggr_test_schema};
1073
1074 use super::*;
1075 use arrow::{
1076 array::{Int32Array, RecordBatch},
1077 compute::SortOptions,
1078 };
1079
1080 use datafusion_common::stats::Precision;
1081 use datafusion_common::{assert_batches_eq, DFSchema};
1082 use datafusion_expr::{execution_props::ExecutionProps, SortExpr};
1083 use datafusion_physical_expr::create_physical_expr;
1084 use std::collections::HashMap;
1085
1086 fn create_physical_sort_expr(
1087 e: &SortExpr,
1088 input_dfschema: &DFSchema,
1089 execution_props: &ExecutionProps,
1090 ) -> Result<PhysicalSortExpr> {
1091 let SortExpr {
1092 expr,
1093 asc,
1094 nulls_first,
1095 } = e;
1096 Ok(PhysicalSortExpr {
1097 expr: create_physical_expr(expr, input_dfschema, execution_props)?,
1098 options: SortOptions {
1099 descending: !asc,
1100 nulls_first: *nulls_first,
1101 },
1102 })
1103 }
1104
1105 pub fn columns(schema: &Schema) -> Vec<String> {
1107 schema.fields().iter().map(|f| f.name().clone()).collect()
1108 }
1109
1110 #[test]
1111 fn physical_plan_config_no_projection() {
1112 let file_schema = aggr_test_schema();
1113 let conf = config_for_projection(
1114 Arc::clone(&file_schema),
1115 None,
1116 Statistics::new_unknown(&file_schema),
1117 to_partition_cols(vec![(
1118 "date".to_owned(),
1119 wrap_partition_type_in_dict(DataType::Utf8),
1120 )]),
1121 );
1122
1123 let (proj_schema, _, proj_statistics, _) = conf.project();
1124 assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
1125 assert_eq!(
1126 proj_schema.field(file_schema.fields().len()).name(),
1127 "date",
1128 "partition columns are the last columns"
1129 );
1130 assert_eq!(
1131 proj_statistics.column_statistics.len(),
1132 file_schema.fields().len() + 1
1133 );
1134 let col_names = conf.projected_file_column_names();
1137 assert_eq!(col_names, None);
1138
1139 let col_indices = conf.file_column_projection_indices();
1140 assert_eq!(col_indices, None);
1141 }
1142
1143 #[test]
1144 fn physical_plan_config_no_projection_tab_cols_as_field() {
1145 let file_schema = aggr_test_schema();
1146
1147 let table_partition_col =
1149 Field::new("date", wrap_partition_type_in_dict(DataType::Utf8), true)
1150 .with_metadata(HashMap::from_iter(vec![(
1151 "key_whatever".to_owned(),
1152 "value_whatever".to_owned(),
1153 )]));
1154
1155 let conf = config_for_projection(
1156 Arc::clone(&file_schema),
1157 None,
1158 Statistics::new_unknown(&file_schema),
1159 vec![table_partition_col.clone()],
1160 );
1161
1162 let (proj_schema, _, _, _) = conf.project();
1164 assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
1165 assert_eq!(
1166 *proj_schema.field(file_schema.fields().len()),
1167 table_partition_col,
1168 "partition columns are the last columns and ust have all values defined in created field"
1169 );
1170 }
1171
1172 #[test]
1173 fn physical_plan_config_with_projection() {
1174 let file_schema = aggr_test_schema();
1175 let conf = config_for_projection(
1176 Arc::clone(&file_schema),
1177 Some(vec![file_schema.fields().len(), 0]),
1178 Statistics {
1179 num_rows: Precision::Inexact(10),
1180 column_statistics: (0..file_schema.fields().len())
1183 .map(|i| ColumnStatistics {
1184 distinct_count: Precision::Inexact(i),
1185 ..Default::default()
1186 })
1187 .collect(),
1188 total_byte_size: Precision::Absent,
1189 },
1190 to_partition_cols(vec![(
1191 "date".to_owned(),
1192 wrap_partition_type_in_dict(DataType::Utf8),
1193 )]),
1194 );
1195
1196 let (proj_schema, _, proj_statistics, _) = conf.project();
1197 assert_eq!(
1198 columns(&proj_schema),
1199 vec!["date".to_owned(), "c1".to_owned()]
1200 );
1201 let proj_stat_cols = proj_statistics.column_statistics;
1202 assert_eq!(proj_stat_cols.len(), 2);
1203 assert_eq!(proj_stat_cols[1].distinct_count, Precision::Inexact(0));
1206
1207 let col_names = conf.projected_file_column_names();
1208 assert_eq!(col_names, Some(vec!["c1".to_owned()]));
1209
1210 let col_indices = conf.file_column_projection_indices();
1211 assert_eq!(col_indices, Some(vec![0]));
1212 }
1213
1214 #[test]
1215 fn partition_column_projector() {
1216 let file_batch = build_table_i32(
1217 ("a", &vec![0, 1, 2]),
1218 ("b", &vec![-2, -1, 0]),
1219 ("c", &vec![10, 11, 12]),
1220 );
1221 let partition_cols = vec![
1222 (
1223 "year".to_owned(),
1224 wrap_partition_type_in_dict(DataType::Utf8),
1225 ),
1226 (
1227 "month".to_owned(),
1228 wrap_partition_type_in_dict(DataType::Utf8),
1229 ),
1230 (
1231 "day".to_owned(),
1232 wrap_partition_type_in_dict(DataType::Utf8),
1233 ),
1234 ];
1235 let statistics = Statistics {
1237 num_rows: Precision::Inexact(3),
1238 total_byte_size: Precision::Absent,
1239 column_statistics: Statistics::unknown_column(&file_batch.schema()),
1240 };
1241
1242 let conf = config_for_projection(
1243 file_batch.schema(),
1244 Some(vec![
1246 0,
1247 1,
1248 2,
1249 file_batch.schema().fields().len(),
1250 file_batch.schema().fields().len() + 2,
1251 ]),
1252 statistics.clone(),
1253 to_partition_cols(partition_cols.clone()),
1254 );
1255
1256 let source_statistics = conf.file_source.statistics().unwrap();
1257 let conf_stats = conf.statistics().unwrap();
1258
1259 assert_eq!(conf_stats.num_rows, Precision::Inexact(3));
1261
1262 assert_eq!(conf_stats.column_statistics.len(), 5);
1264
1265 assert_eq!(source_statistics, statistics);
1267 assert_eq!(source_statistics.column_statistics.len(), 3);
1268
1269 let (proj_schema, ..) = conf.project();
1270 let mut proj = PartitionColumnProjector::new(
1272 proj_schema,
1273 &partition_cols
1274 .iter()
1275 .map(|x| x.0.clone())
1276 .collect::<Vec<_>>(),
1277 );
1278
1279 let projected_batch = proj
1281 .project(
1282 file_batch,
1284 &[
1285 wrap_partition_value_in_dict(ScalarValue::from("2021")),
1286 wrap_partition_value_in_dict(ScalarValue::from("10")),
1287 wrap_partition_value_in_dict(ScalarValue::from("26")),
1288 ],
1289 )
1290 .expect("Projection of partition columns into record batch failed");
1291 let expected = [
1292 "+---+----+----+------+-----+",
1293 "| a | b | c | year | day |",
1294 "+---+----+----+------+-----+",
1295 "| 0 | -2 | 10 | 2021 | 26 |",
1296 "| 1 | -1 | 11 | 2021 | 26 |",
1297 "| 2 | 0 | 12 | 2021 | 26 |",
1298 "+---+----+----+------+-----+",
1299 ];
1300 assert_batches_eq!(expected, &[projected_batch]);
1301
1302 let file_batch = build_table_i32(
1304 ("a", &vec![5, 6, 7, 8, 9]),
1305 ("b", &vec![-10, -9, -8, -7, -6]),
1306 ("c", &vec![12, 13, 14, 15, 16]),
1307 );
1308 let projected_batch = proj
1309 .project(
1310 file_batch,
1312 &[
1313 wrap_partition_value_in_dict(ScalarValue::from("2021")),
1314 wrap_partition_value_in_dict(ScalarValue::from("10")),
1315 wrap_partition_value_in_dict(ScalarValue::from("27")),
1316 ],
1317 )
1318 .expect("Projection of partition columns into record batch failed");
1319 let expected = [
1320 "+---+-----+----+------+-----+",
1321 "| a | b | c | year | day |",
1322 "+---+-----+----+------+-----+",
1323 "| 5 | -10 | 12 | 2021 | 27 |",
1324 "| 6 | -9 | 13 | 2021 | 27 |",
1325 "| 7 | -8 | 14 | 2021 | 27 |",
1326 "| 8 | -7 | 15 | 2021 | 27 |",
1327 "| 9 | -6 | 16 | 2021 | 27 |",
1328 "+---+-----+----+------+-----+",
1329 ];
1330 assert_batches_eq!(expected, &[projected_batch]);
1331
1332 let file_batch = build_table_i32(
1334 ("a", &vec![0, 1, 3]),
1335 ("b", &vec![2, 3, 4]),
1336 ("c", &vec![4, 5, 6]),
1337 );
1338 let projected_batch = proj
1339 .project(
1340 file_batch,
1342 &[
1343 wrap_partition_value_in_dict(ScalarValue::from("2021")),
1344 wrap_partition_value_in_dict(ScalarValue::from("10")),
1345 wrap_partition_value_in_dict(ScalarValue::from("28")),
1346 ],
1347 )
1348 .expect("Projection of partition columns into record batch failed");
1349 let expected = [
1350 "+---+---+---+------+-----+",
1351 "| a | b | c | year | day |",
1352 "+---+---+---+------+-----+",
1353 "| 0 | 2 | 4 | 2021 | 28 |",
1354 "| 1 | 3 | 5 | 2021 | 28 |",
1355 "| 3 | 4 | 6 | 2021 | 28 |",
1356 "+---+---+---+------+-----+",
1357 ];
1358 assert_batches_eq!(expected, &[projected_batch]);
1359
1360 let file_batch = build_table_i32(
1362 ("a", &vec![0, 1, 2]),
1363 ("b", &vec![-2, -1, 0]),
1364 ("c", &vec![10, 11, 12]),
1365 );
1366 let projected_batch = proj
1367 .project(
1368 file_batch,
1370 &[
1371 ScalarValue::from("2021"),
1372 ScalarValue::from("10"),
1373 ScalarValue::from("26"),
1374 ],
1375 )
1376 .expect("Projection of partition columns into record batch failed");
1377 let expected = [
1378 "+---+----+----+------+-----+",
1379 "| a | b | c | year | day |",
1380 "+---+----+----+------+-----+",
1381 "| 0 | -2 | 10 | 2021 | 26 |",
1382 "| 1 | -1 | 11 | 2021 | 26 |",
1383 "| 2 | 0 | 12 | 2021 | 26 |",
1384 "+---+----+----+------+-----+",
1385 ];
1386 assert_batches_eq!(expected, &[projected_batch]);
1387 }
1388
1389 #[test]
1390 fn test_projected_file_schema_with_partition_col() {
1391 let schema = aggr_test_schema();
1392 let partition_cols = vec![
1393 (
1394 "part1".to_owned(),
1395 wrap_partition_type_in_dict(DataType::Utf8),
1396 ),
1397 (
1398 "part2".to_owned(),
1399 wrap_partition_type_in_dict(DataType::Utf8),
1400 ),
1401 ];
1402
1403 let projection = config_for_projection(
1405 schema.clone(),
1406 Some(vec![0, 3, 5, schema.fields().len()]),
1407 Statistics::new_unknown(&schema),
1408 to_partition_cols(partition_cols),
1409 )
1410 .projected_file_schema();
1411
1412 let expected_columns = vec!["c1", "c4", "c6"];
1414 let actual_columns = projection
1415 .fields()
1416 .iter()
1417 .map(|f| f.name().clone())
1418 .collect::<Vec<_>>();
1419 assert_eq!(expected_columns, actual_columns);
1420 }
1421
1422 #[test]
1423 fn test_projected_file_schema_without_projection() {
1424 let schema = aggr_test_schema();
1425 let partition_cols = vec![
1426 (
1427 "part1".to_owned(),
1428 wrap_partition_type_in_dict(DataType::Utf8),
1429 ),
1430 (
1431 "part2".to_owned(),
1432 wrap_partition_type_in_dict(DataType::Utf8),
1433 ),
1434 ];
1435
1436 let projection = config_for_projection(
1438 schema.clone(),
1439 None,
1440 Statistics::new_unknown(&schema),
1441 to_partition_cols(partition_cols),
1442 )
1443 .projected_file_schema();
1444
1445 assert_eq!(projection.fields(), schema.fields());
1447 }
1448
1449 #[test]
1450 fn test_split_groups_by_statistics() -> Result<()> {
1451 use chrono::TimeZone;
1452 use datafusion_common::DFSchema;
1453 use datafusion_expr::execution_props::ExecutionProps;
1454 use object_store::{path::Path, ObjectMeta};
1455
1456 struct File {
1457 name: &'static str,
1458 date: &'static str,
1459 statistics: Vec<Option<(f64, f64)>>,
1460 }
1461 impl File {
1462 fn new(
1463 name: &'static str,
1464 date: &'static str,
1465 statistics: Vec<Option<(f64, f64)>>,
1466 ) -> Self {
1467 Self {
1468 name,
1469 date,
1470 statistics,
1471 }
1472 }
1473 }
1474
1475 struct TestCase {
1476 name: &'static str,
1477 file_schema: Schema,
1478 files: Vec<File>,
1479 sort: Vec<SortExpr>,
1480 expected_result: Result<Vec<Vec<&'static str>>, &'static str>,
1481 }
1482
1483 use datafusion_expr::col;
1484 let cases = vec![
1485 TestCase {
1486 name: "test sort",
1487 file_schema: Schema::new(vec![Field::new(
1488 "value".to_string(),
1489 DataType::Float64,
1490 false,
1491 )]),
1492 files: vec![
1493 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1494 File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1495 File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1496 ],
1497 sort: vec![col("value").sort(true, false)],
1498 expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1499 },
1500 TestCase {
1503 name: "test sort with files ordered differently",
1504 file_schema: Schema::new(vec![Field::new(
1505 "value".to_string(),
1506 DataType::Float64,
1507 false,
1508 )]),
1509 files: vec![
1510 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1511 File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1512 File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1513 ],
1514 sort: vec![col("value").sort(true, false)],
1515 expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1516 },
1517 TestCase {
1518 name: "reverse sort",
1519 file_schema: Schema::new(vec![Field::new(
1520 "value".to_string(),
1521 DataType::Float64,
1522 false,
1523 )]),
1524 files: vec![
1525 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1526 File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1527 File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1528 ],
1529 sort: vec![col("value").sort(false, true)],
1530 expected_result: Ok(vec![vec!["1", "0"], vec!["2"]]),
1531 },
1532 TestCase {
1534 name: "no nullable sort columns",
1535 file_schema: Schema::new(vec![Field::new(
1536 "value".to_string(),
1537 DataType::Float64,
1538 true, )]),
1540 files: vec![
1541 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1542 File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1543 File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1544 ],
1545 sort: vec![col("value").sort(true, false)],
1546 expected_result: Err("construct min/max statistics for split_groups_by_statistics\ncaused by\nbuild min rows\ncaused by\ncreate sorting columns\ncaused by\nError during planning: cannot sort by nullable column")
1547 },
1548 TestCase {
1549 name: "all three non-overlapping",
1550 file_schema: Schema::new(vec![Field::new(
1551 "value".to_string(),
1552 DataType::Float64,
1553 false,
1554 )]),
1555 files: vec![
1556 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1557 File::new("1", "2023-01-01", vec![Some((0.50, 0.99))]),
1558 File::new("2", "2023-01-02", vec![Some((1.00, 1.49))]),
1559 ],
1560 sort: vec![col("value").sort(true, false)],
1561 expected_result: Ok(vec![vec!["0", "1", "2"]]),
1562 },
1563 TestCase {
1564 name: "all three overlapping",
1565 file_schema: Schema::new(vec![Field::new(
1566 "value".to_string(),
1567 DataType::Float64,
1568 false,
1569 )]),
1570 files: vec![
1571 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1572 File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
1573 File::new("2", "2023-01-02", vec![Some((0.00, 0.49))]),
1574 ],
1575 sort: vec![col("value").sort(true, false)],
1576 expected_result: Ok(vec![vec!["0"], vec!["1"], vec!["2"]]),
1577 },
1578 TestCase {
1579 name: "empty input",
1580 file_schema: Schema::new(vec![Field::new(
1581 "value".to_string(),
1582 DataType::Float64,
1583 false,
1584 )]),
1585 files: vec![],
1586 sort: vec![col("value").sort(true, false)],
1587 expected_result: Ok(vec![]),
1588 },
1589 TestCase {
1590 name: "one file missing statistics",
1591 file_schema: Schema::new(vec![Field::new(
1592 "value".to_string(),
1593 DataType::Float64,
1594 false,
1595 )]),
1596 files: vec![
1597 File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1598 File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
1599 File::new("2", "2023-01-02", vec![None]),
1600 ],
1601 sort: vec![col("value").sort(true, false)],
1602 expected_result: Err("construct min/max statistics for split_groups_by_statistics\ncaused by\ncollect min/max values\ncaused by\nget min/max for column: 'value'\ncaused by\nError during planning: statistics not found"),
1603 },
1604 ];
1605
1606 for case in cases {
1607 let table_schema = Arc::new(Schema::new(
1608 case.file_schema
1609 .fields()
1610 .clone()
1611 .into_iter()
1612 .cloned()
1613 .chain(Some(Arc::new(Field::new(
1614 "date".to_string(),
1615 DataType::Utf8,
1616 false,
1617 ))))
1618 .collect::<Vec<_>>(),
1619 ));
1620 let sort_order = LexOrdering::from(
1621 case.sort
1622 .into_iter()
1623 .map(|expr| {
1624 create_physical_sort_expr(
1625 &expr,
1626 &DFSchema::try_from(table_schema.as_ref().clone())?,
1627 &ExecutionProps::default(),
1628 )
1629 })
1630 .collect::<Result<Vec<_>>>()?,
1631 );
1632
1633 let partitioned_files =
1634 case.files.into_iter().map(From::from).collect::<Vec<_>>();
1635 let result = FileScanConfig::split_groups_by_statistics(
1636 &table_schema,
1637 &[partitioned_files.clone()],
1638 &sort_order,
1639 );
1640 let results_by_name = result
1641 .as_ref()
1642 .map(|file_groups| {
1643 file_groups
1644 .iter()
1645 .map(|file_group| {
1646 file_group
1647 .iter()
1648 .map(|file| {
1649 partitioned_files
1650 .iter()
1651 .find_map(|f| {
1652 if f.object_meta == file.object_meta {
1653 Some(
1654 f.object_meta
1655 .location
1656 .as_ref()
1657 .rsplit('/')
1658 .next()
1659 .unwrap()
1660 .trim_end_matches(".parquet"),
1661 )
1662 } else {
1663 None
1664 }
1665 })
1666 .unwrap()
1667 })
1668 .collect::<Vec<_>>()
1669 })
1670 .collect::<Vec<_>>()
1671 })
1672 .map_err(|e| e.strip_backtrace().leak() as &'static str);
1673
1674 assert_eq!(results_by_name, case.expected_result, "{}", case.name);
1675 }
1676
1677 return Ok(());
1678
1679 impl From<File> for PartitionedFile {
1680 fn from(file: File) -> Self {
1681 PartitionedFile {
1682 object_meta: ObjectMeta {
1683 location: Path::from(format!(
1684 "data/date={}/{}.parquet",
1685 file.date, file.name
1686 )),
1687 last_modified: chrono::Utc.timestamp_nanos(0),
1688 size: 0,
1689 e_tag: None,
1690 version: None,
1691 },
1692 partition_values: vec![ScalarValue::from(file.date)],
1693 range: None,
1694 statistics: Some(Statistics {
1695 num_rows: Precision::Absent,
1696 total_byte_size: Precision::Absent,
1697 column_statistics: file
1698 .statistics
1699 .into_iter()
1700 .map(|stats| {
1701 stats
1702 .map(|(min, max)| ColumnStatistics {
1703 min_value: Precision::Exact(ScalarValue::from(
1704 min,
1705 )),
1706 max_value: Precision::Exact(ScalarValue::from(
1707 max,
1708 )),
1709 ..Default::default()
1710 })
1711 .unwrap_or_default()
1712 })
1713 .collect::<Vec<_>>(),
1714 }),
1715 extensions: None,
1716 metadata_size_hint: None,
1717 }
1718 }
1719 }
1720 }
1721
1722 fn config_for_projection(
1724 file_schema: SchemaRef,
1725 projection: Option<Vec<usize>>,
1726 statistics: Statistics,
1727 table_partition_cols: Vec<Field>,
1728 ) -> FileScanConfig {
1729 FileScanConfig::new(
1730 ObjectStoreUrl::parse("test:///").unwrap(),
1731 file_schema,
1732 Arc::new(MockSource::default()),
1733 )
1734 .with_projection(projection)
1735 .with_statistics(statistics)
1736 .with_table_partition_cols(table_partition_cols)
1737 }
1738
1739 fn to_partition_cols(table_partition_cols: Vec<(String, DataType)>) -> Vec<Field> {
1741 table_partition_cols
1742 .iter()
1743 .map(|(name, dtype)| Field::new(name, dtype.clone(), false))
1744 .collect::<Vec<_>>()
1745 }
1746
1747 pub fn build_table_i32(
1749 a: (&str, &Vec<i32>),
1750 b: (&str, &Vec<i32>),
1751 c: (&str, &Vec<i32>),
1752 ) -> RecordBatch {
1753 let schema = Schema::new(vec![
1754 Field::new(a.0, DataType::Int32, false),
1755 Field::new(b.0, DataType::Int32, false),
1756 Field::new(c.0, DataType::Int32, false),
1757 ]);
1758
1759 RecordBatch::try_new(
1760 Arc::new(schema),
1761 vec![
1762 Arc::new(Int32Array::from(a.1.clone())),
1763 Arc::new(Int32Array::from(b.1.clone())),
1764 Arc::new(Int32Array::from(c.1.clone())),
1765 ],
1766 )
1767 .unwrap()
1768 }
1769}