1use crate::config::SchemaSource;
19use crate::helpers::{expr_applicable_for_cols, pruned_partition_list};
20use crate::{ListingOptions, ListingTableConfig};
21use arrow::datatypes::{Field, Schema, SchemaBuilder, SchemaRef};
22use async_trait::async_trait;
23use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider};
24use datafusion_common::stats::Precision;
25use datafusion_common::{
26 Constraints, SchemaExt, Statistics, internal_datafusion_err, plan_err, project_schema,
27};
28use datafusion_datasource::file::FileSource;
29use datafusion_datasource::file_groups::FileGroup;
30use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
31use datafusion_datasource::file_sink_config::{FileOutputMode, FileSinkConfig};
32#[expect(deprecated)]
33use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
34use datafusion_datasource::{
35 ListingTableUrl, PartitionedFile, TableSchema, compute_all_files_statistics,
36};
37use datafusion_execution::cache::TableScopedPath;
38use datafusion_execution::cache::cache_manager::FileStatisticsCache;
39use datafusion_expr::dml::InsertOp;
40use datafusion_expr::execution_props::ExecutionProps;
41use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
42use datafusion_physical_expr::create_lex_ordering;
43use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
44use datafusion_physical_expr_common::sort_expr::LexOrdering;
45use datafusion_physical_plan::ExecutionPlan;
46use datafusion_physical_plan::empty::EmptyExec;
47use futures::{Stream, StreamExt, TryStreamExt, future, stream};
48use object_store::ObjectStore;
49use std::collections::HashMap;
50use std::sync::Arc;
51
52#[derive(Debug)]
54pub struct ListFilesResult {
55 pub file_groups: Vec<FileGroup>,
57 pub statistics: Statistics,
59 pub grouped_by_partition: bool,
61}
62
63#[derive(Debug, Clone)]
171pub struct ListingTable {
172 table_paths: Vec<ListingTableUrl>,
173 file_schema: SchemaRef,
177 table_schema: SchemaRef,
181 schema_source: SchemaSource,
183 options: ListingOptions,
186 definition: Option<String>,
188 collected_statistics: Option<Arc<dyn FileStatisticsCache>>,
190 constraints: Constraints,
192 column_defaults: HashMap<String, Expr>,
194 expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
196}
197
198impl ListingTable {
199 pub fn try_new(config: ListingTableConfig) -> datafusion_common::Result<Self> {
203 let schema_source = config.schema_source();
205
206 let file_schema = config
207 .file_schema
208 .ok_or_else(|| internal_datafusion_err!("No schema provided."))?;
209
210 let options = config
211 .options
212 .ok_or_else(|| internal_datafusion_err!("No ListingOptions provided"))?;
213
214 let mut builder = SchemaBuilder::from(file_schema.as_ref().to_owned());
216 for (part_col_name, part_col_type) in &options.table_partition_cols {
217 builder.push(Field::new(part_col_name, part_col_type.clone(), false));
218 }
219
220 let table_schema = Arc::new(
221 builder
222 .finish()
223 .with_metadata(file_schema.metadata().clone()),
224 );
225
226 let table = Self {
227 table_paths: config.table_paths,
228 file_schema,
229 table_schema,
230 schema_source,
231 options,
232 definition: None,
233 collected_statistics: None,
234 constraints: Constraints::default(),
235 column_defaults: HashMap::new(),
236 expr_adapter_factory: config.expr_adapter_factory,
237 };
238
239 Ok(table)
240 }
241
242 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
244 self.constraints = constraints;
245 self
246 }
247
248 pub fn with_column_defaults(
250 mut self,
251 column_defaults: HashMap<String, Expr>,
252 ) -> Self {
253 self.column_defaults = column_defaults;
254 self
255 }
256
257 pub fn with_cache(mut self, cache: Option<Arc<dyn FileStatisticsCache>>) -> Self {
263 self.collected_statistics = cache;
264 self
265 }
266
267 pub fn with_definition(mut self, definition: Option<String>) -> Self {
269 self.definition = definition;
270 self
271 }
272
273 pub fn table_paths(&self) -> &Vec<ListingTableUrl> {
275 &self.table_paths
276 }
277
278 pub fn options(&self) -> &ListingOptions {
280 &self.options
281 }
282
283 pub fn schema_source(&self) -> SchemaSource {
285 self.schema_source
286 }
287
288 #[deprecated(
295 since = "52.0.0",
296 note = "SchemaAdapterFactory has been removed. Use ListingTableConfig::with_expr_adapter_factory and PhysicalExprAdapterFactory instead. See upgrading.md for more details."
297 )]
298 #[expect(deprecated)]
299 pub fn with_schema_adapter_factory(
300 self,
301 _schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
302 ) -> Self {
303 self
305 }
306
307 #[deprecated(
314 since = "52.0.0",
315 note = "SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details."
316 )]
317 #[expect(deprecated)]
318 pub fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
319 None
320 }
321
322 fn create_file_source(&self) -> Arc<dyn FileSource> {
324 let table_schema = TableSchema::new(
325 Arc::clone(&self.file_schema),
326 self.options
327 .table_partition_cols
328 .iter()
329 .map(|(col, field)| Arc::new(Field::new(col, field.clone(), false)))
330 .collect(),
331 );
332
333 self.options.format.file_source(table_schema)
334 }
335
336 pub fn try_create_output_ordering(
343 &self,
344 execution_props: &ExecutionProps,
345 file_groups: &[FileGroup],
346 ) -> datafusion_common::Result<Vec<LexOrdering>> {
347 if !self.options.file_sort_order.is_empty() {
349 return create_lex_ordering(
350 &self.table_schema,
351 &self.options.file_sort_order,
352 execution_props,
353 );
354 }
355 if let Some(ordering) = derive_common_ordering_from_files(file_groups) {
356 return Ok(vec![ordering]);
357 }
358 Ok(vec![])
359 }
360}
361
362fn derive_common_ordering_from_files(file_groups: &[FileGroup]) -> Option<LexOrdering> {
371 enum CurrentOrderingState {
372 FirstFile,
374 SomeOrdering(LexOrdering),
376 NoOrdering,
378 }
379 let mut state = CurrentOrderingState::FirstFile;
380
381 for group in file_groups {
383 for file in group.iter() {
384 state = match (&state, &file.ordering) {
385 (CurrentOrderingState::FirstFile, Some(ordering)) => {
387 CurrentOrderingState::SomeOrdering(ordering.clone())
388 }
389 (CurrentOrderingState::FirstFile, None) => {
390 CurrentOrderingState::NoOrdering
391 }
392 (CurrentOrderingState::SomeOrdering(current), Some(ordering)) => {
394 let prefix_len = current
396 .as_ref()
397 .iter()
398 .zip(ordering.as_ref().iter())
399 .take_while(|(a, b)| a == b)
400 .count();
401 if prefix_len == 0 {
402 log::trace!(
403 "Cannot derive common ordering: no common prefix between orderings {current:?} and {ordering:?}"
404 );
405 return None;
406 } else {
407 let ordering =
408 LexOrdering::new(current.as_ref()[..prefix_len].to_vec())
409 .expect("prefix_len > 0, so ordering must be valid");
410 CurrentOrderingState::SomeOrdering(ordering)
411 }
412 }
413 (CurrentOrderingState::SomeOrdering(ordering), None)
416 | (CurrentOrderingState::NoOrdering, Some(ordering)) => {
417 log::trace!(
418 "Cannot derive common ordering: some files have ordering {ordering:?}, others don't"
419 );
420 return None;
421 }
422 (CurrentOrderingState::NoOrdering, None) => {
424 CurrentOrderingState::NoOrdering
425 }
426 };
427 }
428 }
429
430 match state {
431 CurrentOrderingState::SomeOrdering(ordering) => Some(ordering),
432 _ => None,
433 }
434}
435
436fn can_be_evaluated_for_partition_pruning(
439 partition_column_names: &[&str],
440 expr: &Expr,
441) -> bool {
442 !partition_column_names.is_empty()
443 && expr_applicable_for_cols(partition_column_names, expr)
444}
445
446#[async_trait]
447impl TableProvider for ListingTable {
448 fn schema(&self) -> SchemaRef {
449 Arc::clone(&self.table_schema)
450 }
451
452 fn constraints(&self) -> Option<&Constraints> {
453 Some(&self.constraints)
454 }
455
456 fn table_type(&self) -> TableType {
457 TableType::Base
458 }
459
460 async fn scan(
461 &self,
462 state: &dyn Session,
463 projection: Option<&Vec<usize>>,
464 filters: &[Expr],
465 limit: Option<usize>,
466 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
467 let options = ScanArgs::default()
468 .with_projection(projection.map(|p| p.as_slice()))
469 .with_filters(Some(filters))
470 .with_limit(limit);
471 Ok(self.scan_with_args(state, options).await?.into_inner())
472 }
473
474 async fn scan_with_args<'a>(
475 &self,
476 state: &dyn Session,
477 args: ScanArgs<'a>,
478 ) -> datafusion_common::Result<ScanResult> {
479 let projection = args.projection().map(|p| p.to_vec());
480 let filters = args.filters().map(|f| f.to_vec()).unwrap_or_default();
481 let limit = args.limit();
482
483 let table_partition_cols = self
485 .options
486 .table_partition_cols
487 .iter()
488 .map(|col| Ok(Arc::new(self.table_schema.field_with_name(&col.0)?.clone())))
489 .collect::<datafusion_common::Result<Vec<_>>>()?;
490
491 let table_partition_col_names = table_partition_cols
492 .iter()
493 .map(|field| field.name().as_str())
494 .collect::<Vec<_>>();
495
496 let (partition_filters, filters): (Vec<_>, Vec<_>) =
499 filters.iter().cloned().partition(|filter| {
500 can_be_evaluated_for_partition_pruning(&table_partition_col_names, filter)
501 });
502
503 let statistic_file_limit = if filters.is_empty() { limit } else { None };
506
507 let ListFilesResult {
508 file_groups: mut partitioned_file_lists,
509 statistics,
510 grouped_by_partition: partitioned_by_file_group,
511 } = self
512 .list_files_for_scan(state, &partition_filters, statistic_file_limit)
513 .await?;
514
515 if partitioned_file_lists.is_empty() {
517 let projected_schema = project_schema(&self.schema(), projection.as_ref())?;
518 return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema))));
519 }
520
521 let output_ordering = self.try_create_output_ordering(
522 state.execution_props(),
523 &partitioned_file_lists,
524 )?;
525 match state
526 .config_options()
527 .execution
528 .split_file_groups_by_statistics
529 .then(|| {
530 output_ordering.first().map(|output_ordering| {
531 FileScanConfig::split_groups_by_statistics_with_target_partitions(
532 &self.table_schema,
533 &partitioned_file_lists,
534 output_ordering,
535 self.options.target_partitions,
536 )
537 })
538 })
539 .flatten()
540 {
541 Some(Err(e)) => log::debug!("failed to split file groups by statistics: {e}"),
542 Some(Ok(new_groups)) => {
543 if new_groups.len() <= self.options.target_partitions {
544 partitioned_file_lists = new_groups;
545 } else {
546 log::debug!(
547 "attempted to split file groups by statistics, but there were more file groups than target_partitions; falling back to unordered"
548 )
549 }
550 }
551 None => {} };
553
554 let Some(object_store_url) =
555 self.table_paths.first().map(ListingTableUrl::object_store)
556 else {
557 return Ok(ScanResult::new(Arc::new(EmptyExec::new(Arc::new(
558 Schema::empty(),
559 )))));
560 };
561
562 let file_source = self.create_file_source();
563
564 let plan = self
566 .options
567 .format
568 .create_physical_plan(
569 state,
570 FileScanConfigBuilder::new(object_store_url, file_source)
571 .with_file_groups(partitioned_file_lists)
572 .with_constraints(self.constraints.clone())
573 .with_statistics(statistics)
574 .with_projection_indices(projection)?
575 .with_limit(limit)
576 .with_output_ordering(output_ordering)
577 .with_expr_adapter(self.expr_adapter_factory.clone())
578 .with_partitioned_by_file_group(partitioned_by_file_group)
579 .build(),
580 )
581 .await?;
582
583 Ok(ScanResult::new(plan))
584 }
585
586 fn supports_filters_pushdown(
587 &self,
588 filters: &[&Expr],
589 ) -> datafusion_common::Result<Vec<TableProviderFilterPushDown>> {
590 let partition_column_names = self
591 .options
592 .table_partition_cols
593 .iter()
594 .map(|col| col.0.as_str())
595 .collect::<Vec<_>>();
596 filters
597 .iter()
598 .map(|filter| {
599 if can_be_evaluated_for_partition_pruning(&partition_column_names, filter)
600 {
601 return Ok(TableProviderFilterPushDown::Exact);
603 }
604
605 Ok(TableProviderFilterPushDown::Inexact)
606 })
607 .collect()
608 }
609
610 fn get_table_definition(&self) -> Option<&str> {
611 self.definition.as_deref()
612 }
613
614 async fn insert_into(
615 &self,
616 state: &dyn Session,
617 input: Arc<dyn ExecutionPlan>,
618 insert_op: InsertOp,
619 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
620 self.schema()
622 .logically_equivalent_names_and_types(&input.schema())?;
623
624 let table_path = &self.table_paths()[0];
625 if !table_path.is_collection() {
626 return plan_err!(
627 "Inserting into a ListingTable backed by a single file is not supported, URL is possibly missing a trailing `/`. \
628 To append to an existing file use StreamTable, e.g. by using CREATE UNBOUNDED EXTERNAL TABLE"
629 );
630 }
631
632 let store = state.runtime_env().object_store(table_path)?;
634
635 let file_list_stream = pruned_partition_list(
636 state,
637 store.as_ref(),
638 table_path,
639 &[],
640 &self.options.file_extension,
641 &self.options.table_partition_cols,
642 )
643 .await?;
644
645 let file_group = file_list_stream.try_collect::<Vec<_>>().await?.into();
646 let keep_partition_by_columns =
647 state.config_options().execution.keep_partition_by_columns;
648
649 if let Some(lfc) = state.runtime_env().cache_manager.get_list_files_cache() {
651 let key = TableScopedPath {
652 table: table_path.get_table_ref().clone(),
653 path: table_path.prefix().clone(),
654 };
655 let _ = lfc.remove(&key);
656 }
657
658 let config = FileSinkConfig {
660 original_url: String::default(),
661 object_store_url: self.table_paths()[0].object_store(),
662 table_paths: self.table_paths().clone(),
663 file_group,
664 output_schema: self.schema(),
665 table_partition_cols: self.options.table_partition_cols.clone(),
666 insert_op,
667 keep_partition_by_columns,
668 file_extension: self.options().format.get_ext(),
669 file_output_mode: FileOutputMode::Automatic,
670 };
671
672 let orderings = self.try_create_output_ordering(state.execution_props(), &[])?;
674 let order_requirements = orderings.into_iter().next().map(Into::into);
676
677 self.options()
678 .format
679 .create_writer_physical_plan(input, state, config, order_requirements)
680 .await
681 }
682
683 fn get_column_default(&self, column: &str) -> Option<&Expr> {
684 self.column_defaults.get(column)
685 }
686}
687
688impl ListingTable {
689 pub async fn list_files_for_scan<'a>(
693 &'a self,
694 ctx: &'a dyn Session,
695 filters: &'a [Expr],
696 limit: Option<usize>,
697 ) -> datafusion_common::Result<ListFilesResult> {
698 let store = if let Some(url) = self.table_paths.first() {
699 ctx.runtime_env().object_store(url)?
700 } else {
701 return Ok(ListFilesResult {
702 file_groups: vec![],
703 statistics: Statistics::new_unknown(&self.file_schema),
704 grouped_by_partition: false,
705 });
706 };
707 let file_list = future::try_join_all(self.table_paths.iter().map(|table_path| {
709 pruned_partition_list(
710 ctx,
711 store.as_ref(),
712 table_path,
713 filters,
714 &self.options.file_extension,
715 &self.options.table_partition_cols,
716 )
717 }))
718 .await?;
719 let meta_fetch_concurrency =
720 ctx.config_options().execution.meta_fetch_concurrency;
721 let file_list = stream::iter(file_list).flatten_unordered(meta_fetch_concurrency);
722 let files = file_list
724 .map(|part_file| async {
725 let part_file = part_file?;
726 let (statistics, ordering) = if self.options.collect_stat {
727 self.do_collect_statistics_and_ordering(ctx, &store, &part_file)
728 .await?
729 } else {
730 (Arc::new(Statistics::new_unknown(&self.file_schema)), None)
731 };
732 Ok(part_file
733 .with_statistics(statistics)
734 .with_ordering(ordering))
735 })
736 .boxed()
737 .buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency);
738
739 let (file_group, inexact_stats) =
740 get_files_with_limit(files, limit, self.options.collect_stat).await?;
741
742 let threshold = ctx.config_options().optimizer.preserve_file_partitions;
748
749 let (file_groups, grouped_by_partition) = if threshold > 0
750 && !self.options.table_partition_cols.is_empty()
751 {
752 let grouped =
753 file_group.group_by_partition_values(self.options.target_partitions);
754 if grouped.len() >= threshold {
755 (grouped, true)
756 } else {
757 let all_files: Vec<_> =
758 grouped.into_iter().flat_map(|g| g.into_inner()).collect();
759 (
760 FileGroup::new(all_files).split_files(self.options.target_partitions),
761 false,
762 )
763 }
764 } else {
765 (
766 file_group.split_files(self.options.target_partitions),
767 false,
768 )
769 };
770
771 let (file_groups, stats) = compute_all_files_statistics(
772 file_groups,
773 self.schema(),
774 self.options.collect_stat,
775 inexact_stats,
776 )?;
777
778 Ok(ListFilesResult {
783 file_groups,
784 statistics: stats,
785 grouped_by_partition,
786 })
787 }
788
789 async fn do_collect_statistics_and_ordering(
795 &self,
796 ctx: &dyn Session,
797 store: &Arc<dyn ObjectStore>,
798 part_file: &PartitionedFile,
799 ) -> datafusion_common::Result<(Arc<Statistics>, Option<LexOrdering>)> {
800 use datafusion_execution::cache::cache_manager::CachedFileMetadata;
801
802 let path = TableScopedPath {
803 table: part_file.table_reference.clone(),
804 path: part_file.object_meta.location.clone(),
805 };
806 let meta = &part_file.object_meta;
807
808 if let Some(cache) = &self.collected_statistics
810 && let Some(cached) = cache.get(&path)
811 && cached.is_valid_for(meta)
812 {
813 return Ok((Arc::clone(&cached.statistics), cached.ordering.clone()));
815 }
816
817 let file_meta = self
819 .options
820 .format
821 .infer_stats_and_ordering(ctx, store, Arc::clone(&self.file_schema), meta)
822 .await?;
823
824 let statistics = Arc::new(file_meta.statistics);
825
826 if let Some(cache) = &self.collected_statistics {
828 cache.put(
829 &path,
830 CachedFileMetadata::new(
831 meta.clone(),
832 Arc::clone(&statistics),
833 file_meta.ordering.clone(),
834 ),
835 );
836 }
837
838 Ok((statistics, file_meta.ordering))
839 }
840}
841
842async fn get_files_with_limit(
863 files: impl Stream<Item = datafusion_common::Result<PartitionedFile>>,
864 limit: Option<usize>,
865 collect_stats: bool,
866) -> datafusion_common::Result<(FileGroup, bool)> {
867 let mut file_group = FileGroup::default();
868 let mut all_files = Box::pin(files.fuse());
870 enum ProcessingState {
871 ReadingFiles,
872 ReachedLimit,
873 }
874
875 let mut state = ProcessingState::ReadingFiles;
876 let mut num_rows = Precision::Absent;
877
878 while let Some(file_result) = all_files.next().await {
879 if matches!(state, ProcessingState::ReachedLimit) {
881 break;
882 }
883
884 let file = file_result?;
885
886 if collect_stats && let Some(file_stats) = &file.statistics {
888 num_rows = if file_group.is_empty() {
889 file_stats.num_rows
891 } else {
892 num_rows.add(&file_stats.num_rows)
894 };
895 }
896
897 file_group.push(file);
899
900 if let Some(limit) = limit
902 && let Precision::Exact(row_count) = num_rows
903 && row_count > limit
904 {
905 state = ProcessingState::ReachedLimit;
906 }
907 }
908 let inexact_stats = all_files.next().await.is_some();
912 Ok((file_group, inexact_stats))
913}
914
915#[cfg(test)]
916mod tests {
917 use super::*;
918 use arrow::compute::SortOptions;
919 use datafusion_physical_expr::expressions::Column;
920 use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
921
922 fn sort_expr(
924 name: &str,
925 idx: usize,
926 descending: bool,
927 nulls_first: bool,
928 ) -> PhysicalSortExpr {
929 PhysicalSortExpr::new(
930 Arc::new(Column::new(name, idx)),
931 SortOptions {
932 descending,
933 nulls_first,
934 },
935 )
936 }
937
938 fn lex_ordering(exprs: Vec<PhysicalSortExpr>) -> LexOrdering {
940 LexOrdering::new(exprs).expect("expected non-empty ordering")
941 }
942
943 fn create_file(name: &str, ordering: Option<LexOrdering>) -> PartitionedFile {
945 PartitionedFile::new(name.to_string(), 1024).with_ordering(ordering)
946 }
947
948 #[test]
949 fn test_derive_common_ordering_all_files_same_ordering() {
950 let ordering = lex_ordering(vec![
952 sort_expr("a", 0, false, true),
953 sort_expr("b", 1, true, false),
954 ]);
955
956 let file_groups = vec![
957 FileGroup::new(vec![
958 create_file("f1.parquet", Some(ordering.clone())),
959 create_file("f2.parquet", Some(ordering.clone())),
960 ]),
961 FileGroup::new(vec![create_file("f3.parquet", Some(ordering.clone()))]),
962 ];
963
964 let result = derive_common_ordering_from_files(&file_groups);
965 assert_eq!(result, Some(ordering));
966 }
967
968 #[test]
969 fn test_derive_common_ordering_common_prefix() {
970 let ordering_abc = lex_ordering(vec![
972 sort_expr("a", 0, false, true),
973 sort_expr("b", 1, false, true),
974 sort_expr("c", 2, false, true),
975 ]);
976 let ordering_ab = lex_ordering(vec![
977 sort_expr("a", 0, false, true),
978 sort_expr("b", 1, false, true),
979 ]);
980
981 let file_groups = vec![FileGroup::new(vec![
982 create_file("f1.parquet", Some(ordering_abc)),
983 create_file("f2.parquet", Some(ordering_ab.clone())),
984 ])];
985
986 let result = derive_common_ordering_from_files(&file_groups);
987 assert_eq!(result, Some(ordering_ab));
988 }
989
990 #[test]
991 fn test_derive_common_ordering_no_common_prefix() {
992 let ordering_a = lex_ordering(vec![sort_expr("a", 0, false, true)]);
994 let ordering_b = lex_ordering(vec![sort_expr("b", 1, false, true)]);
995
996 let file_groups = vec![FileGroup::new(vec![
997 create_file("f1.parquet", Some(ordering_a)),
998 create_file("f2.parquet", Some(ordering_b)),
999 ])];
1000
1001 let result = derive_common_ordering_from_files(&file_groups);
1002 assert_eq!(result, None);
1003 }
1004
1005 #[test]
1006 fn test_derive_common_ordering_mixed_with_none() {
1007 let ordering = lex_ordering(vec![sort_expr("a", 0, false, true)]);
1009
1010 let file_groups = vec![FileGroup::new(vec![
1011 create_file("f1.parquet", Some(ordering)),
1012 create_file("f2.parquet", None),
1013 ])];
1014
1015 let result = derive_common_ordering_from_files(&file_groups);
1016 assert_eq!(result, None);
1017 }
1018
1019 #[test]
1020 fn test_derive_common_ordering_all_none() {
1021 let file_groups = vec![FileGroup::new(vec![
1023 create_file("f1.parquet", None),
1024 create_file("f2.parquet", None),
1025 ])];
1026
1027 let result = derive_common_ordering_from_files(&file_groups);
1028 assert_eq!(result, None);
1029 }
1030
1031 #[test]
1032 fn test_derive_common_ordering_empty_groups() {
1033 let file_groups: Vec<FileGroup> = vec![];
1035 let result = derive_common_ordering_from_files(&file_groups);
1036 assert_eq!(result, None);
1037 }
1038
1039 #[test]
1040 fn test_derive_common_ordering_single_file() {
1041 let ordering = lex_ordering(vec![
1043 sort_expr("a", 0, false, true),
1044 sort_expr("b", 1, true, false),
1045 ]);
1046
1047 let file_groups = vec![FileGroup::new(vec![create_file(
1048 "f1.parquet",
1049 Some(ordering.clone()),
1050 )])];
1051
1052 let result = derive_common_ordering_from_files(&file_groups);
1053 assert_eq!(result, Some(ordering));
1054 }
1055}