1use crate::config::SchemaSource;
19use crate::helpers::{expr_applicable_for_cols, pruned_partition_list};
20use crate::{ListingOptions, ListingTableConfig};
21use arrow::datatypes::{Field, Schema, SchemaBuilder, SchemaRef};
22use async_trait::async_trait;
23use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider};
24use datafusion_common::stats::Precision;
25use datafusion_common::{
26 Constraints, SchemaExt, Statistics, internal_datafusion_err, plan_err, project_schema,
27};
28use datafusion_datasource::file::FileSource;
29use datafusion_datasource::file_groups::FileGroup;
30use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
31use datafusion_datasource::file_sink_config::{FileOutputMode, FileSinkConfig};
32#[expect(deprecated)]
33use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
34use datafusion_datasource::{
35 ListingTableUrl, PartitionedFile, TableSchema, compute_all_files_statistics,
36};
37use datafusion_execution::cache::TableScopedPath;
38use datafusion_execution::cache::cache_manager::FileStatisticsCache;
39use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
40use datafusion_expr::dml::InsertOp;
41use datafusion_expr::execution_props::ExecutionProps;
42use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
43use datafusion_physical_expr::create_lex_ordering;
44use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
45use datafusion_physical_expr_common::sort_expr::LexOrdering;
46use datafusion_physical_plan::ExecutionPlan;
47use datafusion_physical_plan::empty::EmptyExec;
48use futures::{Stream, StreamExt, TryStreamExt, future, stream};
49use object_store::ObjectStore;
50use std::any::Any;
51use std::collections::HashMap;
52use std::sync::Arc;
53
54#[derive(Debug)]
56pub struct ListFilesResult {
57 pub file_groups: Vec<FileGroup>,
59 pub statistics: Statistics,
61 pub grouped_by_partition: bool,
63}
64
65#[derive(Debug, Clone)]
173pub struct ListingTable {
174 table_paths: Vec<ListingTableUrl>,
175 file_schema: SchemaRef,
179 table_schema: SchemaRef,
183 schema_source: SchemaSource,
185 options: ListingOptions,
188 definition: Option<String>,
190 collected_statistics: Arc<dyn FileStatisticsCache>,
192 constraints: Constraints,
194 column_defaults: HashMap<String, Expr>,
196 expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
198}
199
200impl ListingTable {
201 pub fn try_new(config: ListingTableConfig) -> datafusion_common::Result<Self> {
205 let schema_source = config.schema_source();
207
208 let file_schema = config
209 .file_schema
210 .ok_or_else(|| internal_datafusion_err!("No schema provided."))?;
211
212 let options = config
213 .options
214 .ok_or_else(|| internal_datafusion_err!("No ListingOptions provided"))?;
215
216 let mut builder = SchemaBuilder::from(file_schema.as_ref().to_owned());
218 for (part_col_name, part_col_type) in &options.table_partition_cols {
219 builder.push(Field::new(part_col_name, part_col_type.clone(), false));
220 }
221
222 let table_schema = Arc::new(
223 builder
224 .finish()
225 .with_metadata(file_schema.metadata().clone()),
226 );
227
228 let table = Self {
229 table_paths: config.table_paths,
230 file_schema,
231 table_schema,
232 schema_source,
233 options,
234 definition: None,
235 collected_statistics: Arc::new(DefaultFileStatisticsCache::default()),
236 constraints: Constraints::default(),
237 column_defaults: HashMap::new(),
238 expr_adapter_factory: config.expr_adapter_factory,
239 };
240
241 Ok(table)
242 }
243
244 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
246 self.constraints = constraints;
247 self
248 }
249
250 pub fn with_column_defaults(
252 mut self,
253 column_defaults: HashMap<String, Expr>,
254 ) -> Self {
255 self.column_defaults = column_defaults;
256 self
257 }
258
259 pub fn with_cache(mut self, cache: Option<Arc<dyn FileStatisticsCache>>) -> Self {
266 self.collected_statistics =
267 cache.unwrap_or_else(|| Arc::new(DefaultFileStatisticsCache::default()));
268 self
269 }
270
271 pub fn with_definition(mut self, definition: Option<String>) -> Self {
273 self.definition = definition;
274 self
275 }
276
277 pub fn table_paths(&self) -> &Vec<ListingTableUrl> {
279 &self.table_paths
280 }
281
282 pub fn options(&self) -> &ListingOptions {
284 &self.options
285 }
286
287 pub fn schema_source(&self) -> SchemaSource {
289 self.schema_source
290 }
291
292 #[deprecated(
299 since = "52.0.0",
300 note = "SchemaAdapterFactory has been removed. Use ListingTableConfig::with_expr_adapter_factory and PhysicalExprAdapterFactory instead. See upgrading.md for more details."
301 )]
302 #[expect(deprecated)]
303 pub fn with_schema_adapter_factory(
304 self,
305 _schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
306 ) -> Self {
307 self
309 }
310
311 #[deprecated(
318 since = "52.0.0",
319 note = "SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details."
320 )]
321 #[expect(deprecated)]
322 pub fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
323 None
324 }
325
326 fn create_file_source(&self) -> Arc<dyn FileSource> {
328 let table_schema = TableSchema::new(
329 Arc::clone(&self.file_schema),
330 self.options
331 .table_partition_cols
332 .iter()
333 .map(|(col, field)| Arc::new(Field::new(col, field.clone(), false)))
334 .collect(),
335 );
336
337 self.options.format.file_source(table_schema)
338 }
339
340 pub fn try_create_output_ordering(
347 &self,
348 execution_props: &ExecutionProps,
349 file_groups: &[FileGroup],
350 ) -> datafusion_common::Result<Vec<LexOrdering>> {
351 if !self.options.file_sort_order.is_empty() {
353 return create_lex_ordering(
354 &self.table_schema,
355 &self.options.file_sort_order,
356 execution_props,
357 );
358 }
359 if let Some(ordering) = derive_common_ordering_from_files(file_groups) {
360 return Ok(vec![ordering]);
361 }
362 Ok(vec![])
363 }
364}
365
366fn derive_common_ordering_from_files(file_groups: &[FileGroup]) -> Option<LexOrdering> {
375 enum CurrentOrderingState {
376 FirstFile,
378 SomeOrdering(LexOrdering),
380 NoOrdering,
382 }
383 let mut state = CurrentOrderingState::FirstFile;
384
385 for group in file_groups {
387 for file in group.iter() {
388 state = match (&state, &file.ordering) {
389 (CurrentOrderingState::FirstFile, Some(ordering)) => {
391 CurrentOrderingState::SomeOrdering(ordering.clone())
392 }
393 (CurrentOrderingState::FirstFile, None) => {
394 CurrentOrderingState::NoOrdering
395 }
396 (CurrentOrderingState::SomeOrdering(current), Some(ordering)) => {
398 let prefix_len = current
400 .as_ref()
401 .iter()
402 .zip(ordering.as_ref().iter())
403 .take_while(|(a, b)| a == b)
404 .count();
405 if prefix_len == 0 {
406 log::trace!(
407 "Cannot derive common ordering: no common prefix between orderings {current:?} and {ordering:?}"
408 );
409 return None;
410 } else {
411 let ordering =
412 LexOrdering::new(current.as_ref()[..prefix_len].to_vec())
413 .expect("prefix_len > 0, so ordering must be valid");
414 CurrentOrderingState::SomeOrdering(ordering)
415 }
416 }
417 (CurrentOrderingState::SomeOrdering(ordering), None)
420 | (CurrentOrderingState::NoOrdering, Some(ordering)) => {
421 log::trace!(
422 "Cannot derive common ordering: some files have ordering {ordering:?}, others don't"
423 );
424 return None;
425 }
426 (CurrentOrderingState::NoOrdering, None) => {
428 CurrentOrderingState::NoOrdering
429 }
430 };
431 }
432 }
433
434 match state {
435 CurrentOrderingState::SomeOrdering(ordering) => Some(ordering),
436 _ => None,
437 }
438}
439
440fn can_be_evaluated_for_partition_pruning(
443 partition_column_names: &[&str],
444 expr: &Expr,
445) -> bool {
446 !partition_column_names.is_empty()
447 && expr_applicable_for_cols(partition_column_names, expr)
448}
449
450#[async_trait]
451impl TableProvider for ListingTable {
452 fn as_any(&self) -> &dyn Any {
453 self
454 }
455
456 fn schema(&self) -> SchemaRef {
457 Arc::clone(&self.table_schema)
458 }
459
460 fn constraints(&self) -> Option<&Constraints> {
461 Some(&self.constraints)
462 }
463
464 fn table_type(&self) -> TableType {
465 TableType::Base
466 }
467
468 async fn scan(
469 &self,
470 state: &dyn Session,
471 projection: Option<&Vec<usize>>,
472 filters: &[Expr],
473 limit: Option<usize>,
474 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
475 let options = ScanArgs::default()
476 .with_projection(projection.map(|p| p.as_slice()))
477 .with_filters(Some(filters))
478 .with_limit(limit);
479 Ok(self.scan_with_args(state, options).await?.into_inner())
480 }
481
482 async fn scan_with_args<'a>(
483 &self,
484 state: &dyn Session,
485 args: ScanArgs<'a>,
486 ) -> datafusion_common::Result<ScanResult> {
487 let projection = args.projection().map(|p| p.to_vec());
488 let filters = args.filters().map(|f| f.to_vec()).unwrap_or_default();
489 let limit = args.limit();
490
491 let table_partition_cols = self
493 .options
494 .table_partition_cols
495 .iter()
496 .map(|col| Ok(Arc::new(self.table_schema.field_with_name(&col.0)?.clone())))
497 .collect::<datafusion_common::Result<Vec<_>>>()?;
498
499 let table_partition_col_names = table_partition_cols
500 .iter()
501 .map(|field| field.name().as_str())
502 .collect::<Vec<_>>();
503
504 let (partition_filters, filters): (Vec<_>, Vec<_>) =
507 filters.iter().cloned().partition(|filter| {
508 can_be_evaluated_for_partition_pruning(&table_partition_col_names, filter)
509 });
510
511 let statistic_file_limit = if filters.is_empty() { limit } else { None };
514
515 let ListFilesResult {
516 file_groups: mut partitioned_file_lists,
517 statistics,
518 grouped_by_partition: partitioned_by_file_group,
519 } = self
520 .list_files_for_scan(state, &partition_filters, statistic_file_limit)
521 .await?;
522
523 if partitioned_file_lists.is_empty() {
525 let projected_schema = project_schema(&self.schema(), projection.as_ref())?;
526 return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema))));
527 }
528
529 let output_ordering = self.try_create_output_ordering(
530 state.execution_props(),
531 &partitioned_file_lists,
532 )?;
533 match state
534 .config_options()
535 .execution
536 .split_file_groups_by_statistics
537 .then(|| {
538 output_ordering.first().map(|output_ordering| {
539 FileScanConfig::split_groups_by_statistics_with_target_partitions(
540 &self.table_schema,
541 &partitioned_file_lists,
542 output_ordering,
543 self.options.target_partitions,
544 )
545 })
546 })
547 .flatten()
548 {
549 Some(Err(e)) => log::debug!("failed to split file groups by statistics: {e}"),
550 Some(Ok(new_groups)) => {
551 if new_groups.len() <= self.options.target_partitions {
552 partitioned_file_lists = new_groups;
553 } else {
554 log::debug!(
555 "attempted to split file groups by statistics, but there were more file groups than target_partitions; falling back to unordered"
556 )
557 }
558 }
559 None => {} };
561
562 let Some(object_store_url) =
563 self.table_paths.first().map(ListingTableUrl::object_store)
564 else {
565 return Ok(ScanResult::new(Arc::new(EmptyExec::new(Arc::new(
566 Schema::empty(),
567 )))));
568 };
569
570 let file_source = self.create_file_source();
571
572 let plan = self
574 .options
575 .format
576 .create_physical_plan(
577 state,
578 FileScanConfigBuilder::new(object_store_url, file_source)
579 .with_file_groups(partitioned_file_lists)
580 .with_constraints(self.constraints.clone())
581 .with_statistics(statistics)
582 .with_projection_indices(projection)?
583 .with_limit(limit)
584 .with_output_ordering(output_ordering)
585 .with_expr_adapter(self.expr_adapter_factory.clone())
586 .with_partitioned_by_file_group(partitioned_by_file_group)
587 .build(),
588 )
589 .await?;
590
591 Ok(ScanResult::new(plan))
592 }
593
594 fn supports_filters_pushdown(
595 &self,
596 filters: &[&Expr],
597 ) -> datafusion_common::Result<Vec<TableProviderFilterPushDown>> {
598 let partition_column_names = self
599 .options
600 .table_partition_cols
601 .iter()
602 .map(|col| col.0.as_str())
603 .collect::<Vec<_>>();
604 filters
605 .iter()
606 .map(|filter| {
607 if can_be_evaluated_for_partition_pruning(&partition_column_names, filter)
608 {
609 return Ok(TableProviderFilterPushDown::Exact);
611 }
612
613 Ok(TableProviderFilterPushDown::Inexact)
614 })
615 .collect()
616 }
617
618 fn get_table_definition(&self) -> Option<&str> {
619 self.definition.as_deref()
620 }
621
622 async fn insert_into(
623 &self,
624 state: &dyn Session,
625 input: Arc<dyn ExecutionPlan>,
626 insert_op: InsertOp,
627 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
628 self.schema()
630 .logically_equivalent_names_and_types(&input.schema())?;
631
632 let table_path = &self.table_paths()[0];
633 if !table_path.is_collection() {
634 return plan_err!(
635 "Inserting into a ListingTable backed by a single file is not supported, URL is possibly missing a trailing `/`. \
636 To append to an existing file use StreamTable, e.g. by using CREATE UNBOUNDED EXTERNAL TABLE"
637 );
638 }
639
640 let store = state.runtime_env().object_store(table_path)?;
642
643 let file_list_stream = pruned_partition_list(
644 state,
645 store.as_ref(),
646 table_path,
647 &[],
648 &self.options.file_extension,
649 &self.options.table_partition_cols,
650 )
651 .await?;
652
653 let file_group = file_list_stream.try_collect::<Vec<_>>().await?.into();
654 let keep_partition_by_columns =
655 state.config_options().execution.keep_partition_by_columns;
656
657 if let Some(lfc) = state.runtime_env().cache_manager.get_list_files_cache() {
659 let key = TableScopedPath {
660 table: table_path.get_table_ref().clone(),
661 path: table_path.prefix().clone(),
662 };
663 let _ = lfc.remove(&key);
664 }
665
666 let config = FileSinkConfig {
668 original_url: String::default(),
669 object_store_url: self.table_paths()[0].object_store(),
670 table_paths: self.table_paths().clone(),
671 file_group,
672 output_schema: self.schema(),
673 table_partition_cols: self.options.table_partition_cols.clone(),
674 insert_op,
675 keep_partition_by_columns,
676 file_extension: self.options().format.get_ext(),
677 file_output_mode: FileOutputMode::Automatic,
678 };
679
680 let orderings = self.try_create_output_ordering(state.execution_props(), &[])?;
682 let order_requirements = orderings.into_iter().next().map(Into::into);
684
685 self.options()
686 .format
687 .create_writer_physical_plan(input, state, config, order_requirements)
688 .await
689 }
690
691 fn get_column_default(&self, column: &str) -> Option<&Expr> {
692 self.column_defaults.get(column)
693 }
694}
695
696impl ListingTable {
697 pub async fn list_files_for_scan<'a>(
701 &'a self,
702 ctx: &'a dyn Session,
703 filters: &'a [Expr],
704 limit: Option<usize>,
705 ) -> datafusion_common::Result<ListFilesResult> {
706 let store = if let Some(url) = self.table_paths.first() {
707 ctx.runtime_env().object_store(url)?
708 } else {
709 return Ok(ListFilesResult {
710 file_groups: vec![],
711 statistics: Statistics::new_unknown(&self.file_schema),
712 grouped_by_partition: false,
713 });
714 };
715 let file_list = future::try_join_all(self.table_paths.iter().map(|table_path| {
717 pruned_partition_list(
718 ctx,
719 store.as_ref(),
720 table_path,
721 filters,
722 &self.options.file_extension,
723 &self.options.table_partition_cols,
724 )
725 }))
726 .await?;
727 let meta_fetch_concurrency =
728 ctx.config_options().execution.meta_fetch_concurrency;
729 let file_list = stream::iter(file_list).flatten_unordered(meta_fetch_concurrency);
730 let files = file_list
732 .map(|part_file| async {
733 let part_file = part_file?;
734 let (statistics, ordering) = if self.options.collect_stat {
735 self.do_collect_statistics_and_ordering(ctx, &store, &part_file)
736 .await?
737 } else {
738 (Arc::new(Statistics::new_unknown(&self.file_schema)), None)
739 };
740 Ok(part_file
741 .with_statistics(statistics)
742 .with_ordering(ordering))
743 })
744 .boxed()
745 .buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency);
746
747 let (file_group, inexact_stats) =
748 get_files_with_limit(files, limit, self.options.collect_stat).await?;
749
750 let threshold = ctx.config_options().optimizer.preserve_file_partitions;
756
757 let (file_groups, grouped_by_partition) = if threshold > 0
758 && !self.options.table_partition_cols.is_empty()
759 {
760 let grouped =
761 file_group.group_by_partition_values(self.options.target_partitions);
762 if grouped.len() >= threshold {
763 (grouped, true)
764 } else {
765 let all_files: Vec<_> =
766 grouped.into_iter().flat_map(|g| g.into_inner()).collect();
767 (
768 FileGroup::new(all_files).split_files(self.options.target_partitions),
769 false,
770 )
771 }
772 } else {
773 (
774 file_group.split_files(self.options.target_partitions),
775 false,
776 )
777 };
778
779 let (file_groups, stats) = compute_all_files_statistics(
780 file_groups,
781 self.schema(),
782 self.options.collect_stat,
783 inexact_stats,
784 )?;
785
786 Ok(ListFilesResult {
791 file_groups,
792 statistics: stats,
793 grouped_by_partition,
794 })
795 }
796
797 async fn do_collect_statistics_and_ordering(
803 &self,
804 ctx: &dyn Session,
805 store: &Arc<dyn ObjectStore>,
806 part_file: &PartitionedFile,
807 ) -> datafusion_common::Result<(Arc<Statistics>, Option<LexOrdering>)> {
808 use datafusion_execution::cache::cache_manager::CachedFileMetadata;
809
810 let path = &part_file.object_meta.location;
811 let meta = &part_file.object_meta;
812
813 if let Some(cached) = self.collected_statistics.get(path)
815 && cached.is_valid_for(meta)
816 {
817 return Ok((Arc::clone(&cached.statistics), cached.ordering.clone()));
819 }
820
821 let file_meta = self
823 .options
824 .format
825 .infer_stats_and_ordering(ctx, store, Arc::clone(&self.file_schema), meta)
826 .await?;
827
828 let statistics = Arc::new(file_meta.statistics);
829
830 self.collected_statistics.put(
832 path,
833 CachedFileMetadata::new(
834 meta.clone(),
835 Arc::clone(&statistics),
836 file_meta.ordering.clone(),
837 ),
838 );
839
840 Ok((statistics, file_meta.ordering))
841 }
842}
843
844async fn get_files_with_limit(
865 files: impl Stream<Item = datafusion_common::Result<PartitionedFile>>,
866 limit: Option<usize>,
867 collect_stats: bool,
868) -> datafusion_common::Result<(FileGroup, bool)> {
869 let mut file_group = FileGroup::default();
870 let mut all_files = Box::pin(files.fuse());
872 enum ProcessingState {
873 ReadingFiles,
874 ReachedLimit,
875 }
876
877 let mut state = ProcessingState::ReadingFiles;
878 let mut num_rows = Precision::Absent;
879
880 while let Some(file_result) = all_files.next().await {
881 if matches!(state, ProcessingState::ReachedLimit) {
883 break;
884 }
885
886 let file = file_result?;
887
888 if collect_stats && let Some(file_stats) = &file.statistics {
890 num_rows = if file_group.is_empty() {
891 file_stats.num_rows
893 } else {
894 num_rows.add(&file_stats.num_rows)
896 };
897 }
898
899 file_group.push(file);
901
902 if let Some(limit) = limit
904 && let Precision::Exact(row_count) = num_rows
905 && row_count > limit
906 {
907 state = ProcessingState::ReachedLimit;
908 }
909 }
910 let inexact_stats = all_files.next().await.is_some();
914 Ok((file_group, inexact_stats))
915}
916
917#[cfg(test)]
918mod tests {
919 use super::*;
920 use arrow::compute::SortOptions;
921 use datafusion_physical_expr::expressions::Column;
922 use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
923 use std::sync::Arc;
924
925 fn sort_expr(
927 name: &str,
928 idx: usize,
929 descending: bool,
930 nulls_first: bool,
931 ) -> PhysicalSortExpr {
932 PhysicalSortExpr::new(
933 Arc::new(Column::new(name, idx)),
934 SortOptions {
935 descending,
936 nulls_first,
937 },
938 )
939 }
940
941 fn lex_ordering(exprs: Vec<PhysicalSortExpr>) -> LexOrdering {
943 LexOrdering::new(exprs).expect("expected non-empty ordering")
944 }
945
946 fn create_file(name: &str, ordering: Option<LexOrdering>) -> PartitionedFile {
948 PartitionedFile::new(name.to_string(), 1024).with_ordering(ordering)
949 }
950
951 #[test]
952 fn test_derive_common_ordering_all_files_same_ordering() {
953 let ordering = lex_ordering(vec![
955 sort_expr("a", 0, false, true),
956 sort_expr("b", 1, true, false),
957 ]);
958
959 let file_groups = vec![
960 FileGroup::new(vec![
961 create_file("f1.parquet", Some(ordering.clone())),
962 create_file("f2.parquet", Some(ordering.clone())),
963 ]),
964 FileGroup::new(vec![create_file("f3.parquet", Some(ordering.clone()))]),
965 ];
966
967 let result = derive_common_ordering_from_files(&file_groups);
968 assert_eq!(result, Some(ordering));
969 }
970
971 #[test]
972 fn test_derive_common_ordering_common_prefix() {
973 let ordering_abc = lex_ordering(vec![
975 sort_expr("a", 0, false, true),
976 sort_expr("b", 1, false, true),
977 sort_expr("c", 2, false, true),
978 ]);
979 let ordering_ab = lex_ordering(vec![
980 sort_expr("a", 0, false, true),
981 sort_expr("b", 1, false, true),
982 ]);
983
984 let file_groups = vec![FileGroup::new(vec![
985 create_file("f1.parquet", Some(ordering_abc)),
986 create_file("f2.parquet", Some(ordering_ab.clone())),
987 ])];
988
989 let result = derive_common_ordering_from_files(&file_groups);
990 assert_eq!(result, Some(ordering_ab));
991 }
992
993 #[test]
994 fn test_derive_common_ordering_no_common_prefix() {
995 let ordering_a = lex_ordering(vec![sort_expr("a", 0, false, true)]);
997 let ordering_b = lex_ordering(vec![sort_expr("b", 1, false, true)]);
998
999 let file_groups = vec![FileGroup::new(vec![
1000 create_file("f1.parquet", Some(ordering_a)),
1001 create_file("f2.parquet", Some(ordering_b)),
1002 ])];
1003
1004 let result = derive_common_ordering_from_files(&file_groups);
1005 assert_eq!(result, None);
1006 }
1007
1008 #[test]
1009 fn test_derive_common_ordering_mixed_with_none() {
1010 let ordering = lex_ordering(vec![sort_expr("a", 0, false, true)]);
1012
1013 let file_groups = vec![FileGroup::new(vec![
1014 create_file("f1.parquet", Some(ordering)),
1015 create_file("f2.parquet", None),
1016 ])];
1017
1018 let result = derive_common_ordering_from_files(&file_groups);
1019 assert_eq!(result, None);
1020 }
1021
1022 #[test]
1023 fn test_derive_common_ordering_all_none() {
1024 let file_groups = vec![FileGroup::new(vec![
1026 create_file("f1.parquet", None),
1027 create_file("f2.parquet", None),
1028 ])];
1029
1030 let result = derive_common_ordering_from_files(&file_groups);
1031 assert_eq!(result, None);
1032 }
1033
1034 #[test]
1035 fn test_derive_common_ordering_empty_groups() {
1036 let file_groups: Vec<FileGroup> = vec![];
1038 let result = derive_common_ordering_from_files(&file_groups);
1039 assert_eq!(result, None);
1040 }
1041
1042 #[test]
1043 fn test_derive_common_ordering_single_file() {
1044 let ordering = lex_ordering(vec![
1046 sort_expr("a", 0, false, true),
1047 sort_expr("b", 1, true, false),
1048 ]);
1049
1050 let file_groups = vec![FileGroup::new(vec![create_file(
1051 "f1.parquet",
1052 Some(ordering.clone()),
1053 )])];
1054
1055 let result = derive_common_ordering_from_files(&file_groups);
1056 assert_eq!(result, Some(ordering));
1057 }
1058}