1use crate::config::SchemaSource;
19use crate::helpers::{expr_applicable_for_cols, pruned_partition_list};
20use crate::{ListingOptions, ListingTableConfig};
21use arrow::datatypes::{Field, Schema, SchemaBuilder, SchemaRef};
22use async_trait::async_trait;
23use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider};
24use datafusion_common::stats::Precision;
25use datafusion_common::{
26 Constraints, SchemaExt, Statistics, internal_datafusion_err, plan_err, project_schema,
27};
28use datafusion_datasource::file::FileSource;
29use datafusion_datasource::file_groups::FileGroup;
30use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
31use datafusion_datasource::file_sink_config::FileSinkConfig;
32#[expect(deprecated)]
33use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
34use datafusion_datasource::{
35 ListingTableUrl, PartitionedFile, TableSchema, compute_all_files_statistics,
36};
37use datafusion_execution::cache::TableScopedPath;
38use datafusion_execution::cache::cache_manager::FileStatisticsCache;
39use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
40use datafusion_expr::dml::InsertOp;
41use datafusion_expr::execution_props::ExecutionProps;
42use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
43use datafusion_physical_expr::create_lex_ordering;
44use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
45use datafusion_physical_expr_common::sort_expr::LexOrdering;
46use datafusion_physical_plan::ExecutionPlan;
47use datafusion_physical_plan::empty::EmptyExec;
48use futures::{Stream, StreamExt, TryStreamExt, future, stream};
49use object_store::ObjectStore;
50use std::any::Any;
51use std::collections::HashMap;
52use std::sync::Arc;
53
54#[derive(Debug)]
56pub struct ListFilesResult {
57 pub file_groups: Vec<FileGroup>,
59 pub statistics: Statistics,
61 pub grouped_by_partition: bool,
63}
64
65#[derive(Debug, Clone)]
173pub struct ListingTable {
174 table_paths: Vec<ListingTableUrl>,
175 file_schema: SchemaRef,
179 table_schema: SchemaRef,
183 schema_source: SchemaSource,
185 options: ListingOptions,
188 definition: Option<String>,
190 collected_statistics: Arc<dyn FileStatisticsCache>,
192 constraints: Constraints,
194 column_defaults: HashMap<String, Expr>,
196 expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
198}
199
200impl ListingTable {
201 pub fn try_new(config: ListingTableConfig) -> datafusion_common::Result<Self> {
205 let schema_source = config.schema_source();
207
208 let file_schema = config
209 .file_schema
210 .ok_or_else(|| internal_datafusion_err!("No schema provided."))?;
211
212 let options = config
213 .options
214 .ok_or_else(|| internal_datafusion_err!("No ListingOptions provided"))?;
215
216 let mut builder = SchemaBuilder::from(file_schema.as_ref().to_owned());
218 for (part_col_name, part_col_type) in &options.table_partition_cols {
219 builder.push(Field::new(part_col_name, part_col_type.clone(), false));
220 }
221
222 let table_schema = Arc::new(
223 builder
224 .finish()
225 .with_metadata(file_schema.metadata().clone()),
226 );
227
228 let table = Self {
229 table_paths: config.table_paths,
230 file_schema,
231 table_schema,
232 schema_source,
233 options,
234 definition: None,
235 collected_statistics: Arc::new(DefaultFileStatisticsCache::default()),
236 constraints: Constraints::default(),
237 column_defaults: HashMap::new(),
238 expr_adapter_factory: config.expr_adapter_factory,
239 };
240
241 Ok(table)
242 }
243
244 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
246 self.constraints = constraints;
247 self
248 }
249
250 pub fn with_column_defaults(
252 mut self,
253 column_defaults: HashMap<String, Expr>,
254 ) -> Self {
255 self.column_defaults = column_defaults;
256 self
257 }
258
259 pub fn with_cache(mut self, cache: Option<Arc<dyn FileStatisticsCache>>) -> Self {
266 self.collected_statistics =
267 cache.unwrap_or_else(|| Arc::new(DefaultFileStatisticsCache::default()));
268 self
269 }
270
271 pub fn with_definition(mut self, definition: Option<String>) -> Self {
273 self.definition = definition;
274 self
275 }
276
277 pub fn table_paths(&self) -> &Vec<ListingTableUrl> {
279 &self.table_paths
280 }
281
282 pub fn options(&self) -> &ListingOptions {
284 &self.options
285 }
286
287 pub fn schema_source(&self) -> SchemaSource {
289 self.schema_source
290 }
291
292 #[deprecated(
299 since = "52.0.0",
300 note = "SchemaAdapterFactory has been removed. Use ListingTableConfig::with_expr_adapter_factory and PhysicalExprAdapterFactory instead. See upgrading.md for more details."
301 )]
302 #[expect(deprecated)]
303 pub fn with_schema_adapter_factory(
304 self,
305 _schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
306 ) -> Self {
307 self
309 }
310
311 #[deprecated(
318 since = "52.0.0",
319 note = "SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details."
320 )]
321 #[expect(deprecated)]
322 pub fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
323 None
324 }
325
326 fn create_file_source(&self) -> Arc<dyn FileSource> {
328 let table_schema = TableSchema::new(
329 Arc::clone(&self.file_schema),
330 self.options
331 .table_partition_cols
332 .iter()
333 .map(|(col, field)| Arc::new(Field::new(col, field.clone(), false)))
334 .collect(),
335 );
336
337 self.options.format.file_source(table_schema)
338 }
339
340 pub fn try_create_output_ordering(
342 &self,
343 execution_props: &ExecutionProps,
344 ) -> datafusion_common::Result<Vec<LexOrdering>> {
345 create_lex_ordering(
346 &self.table_schema,
347 &self.options.file_sort_order,
348 execution_props,
349 )
350 }
351}
352
353fn can_be_evaluated_for_partition_pruning(
356 partition_column_names: &[&str],
357 expr: &Expr,
358) -> bool {
359 !partition_column_names.is_empty()
360 && expr_applicable_for_cols(partition_column_names, expr)
361}
362
363#[async_trait]
364impl TableProvider for ListingTable {
365 fn as_any(&self) -> &dyn Any {
366 self
367 }
368
369 fn schema(&self) -> SchemaRef {
370 Arc::clone(&self.table_schema)
371 }
372
373 fn constraints(&self) -> Option<&Constraints> {
374 Some(&self.constraints)
375 }
376
377 fn table_type(&self) -> TableType {
378 TableType::Base
379 }
380
381 async fn scan(
382 &self,
383 state: &dyn Session,
384 projection: Option<&Vec<usize>>,
385 filters: &[Expr],
386 limit: Option<usize>,
387 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
388 let options = ScanArgs::default()
389 .with_projection(projection.map(|p| p.as_slice()))
390 .with_filters(Some(filters))
391 .with_limit(limit);
392 Ok(self.scan_with_args(state, options).await?.into_inner())
393 }
394
395 async fn scan_with_args<'a>(
396 &self,
397 state: &dyn Session,
398 args: ScanArgs<'a>,
399 ) -> datafusion_common::Result<ScanResult> {
400 let projection = args.projection().map(|p| p.to_vec());
401 let filters = args.filters().map(|f| f.to_vec()).unwrap_or_default();
402 let limit = args.limit();
403
404 let table_partition_cols = self
406 .options
407 .table_partition_cols
408 .iter()
409 .map(|col| Ok(Arc::new(self.table_schema.field_with_name(&col.0)?.clone())))
410 .collect::<datafusion_common::Result<Vec<_>>>()?;
411
412 let table_partition_col_names = table_partition_cols
413 .iter()
414 .map(|field| field.name().as_str())
415 .collect::<Vec<_>>();
416
417 let (partition_filters, filters): (Vec<_>, Vec<_>) =
420 filters.iter().cloned().partition(|filter| {
421 can_be_evaluated_for_partition_pruning(&table_partition_col_names, filter)
422 });
423
424 let statistic_file_limit = if filters.is_empty() { limit } else { None };
427
428 let ListFilesResult {
429 file_groups: mut partitioned_file_lists,
430 statistics,
431 grouped_by_partition: partitioned_by_file_group,
432 } = self
433 .list_files_for_scan(state, &partition_filters, statistic_file_limit)
434 .await?;
435
436 if partitioned_file_lists.is_empty() {
438 let projected_schema = project_schema(&self.schema(), projection.as_ref())?;
439 return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema))));
440 }
441
442 let output_ordering = self.try_create_output_ordering(state.execution_props())?;
443 match state
444 .config_options()
445 .execution
446 .split_file_groups_by_statistics
447 .then(|| {
448 output_ordering.first().map(|output_ordering| {
449 FileScanConfig::split_groups_by_statistics_with_target_partitions(
450 &self.table_schema,
451 &partitioned_file_lists,
452 output_ordering,
453 self.options.target_partitions,
454 )
455 })
456 })
457 .flatten()
458 {
459 Some(Err(e)) => log::debug!("failed to split file groups by statistics: {e}"),
460 Some(Ok(new_groups)) => {
461 if new_groups.len() <= self.options.target_partitions {
462 partitioned_file_lists = new_groups;
463 } else {
464 log::debug!(
465 "attempted to split file groups by statistics, but there were more file groups than target_partitions; falling back to unordered"
466 )
467 }
468 }
469 None => {} };
471
472 let Some(object_store_url) =
473 self.table_paths.first().map(ListingTableUrl::object_store)
474 else {
475 return Ok(ScanResult::new(Arc::new(EmptyExec::new(Arc::new(
476 Schema::empty(),
477 )))));
478 };
479
480 let file_source = self.create_file_source();
481
482 let plan = self
484 .options
485 .format
486 .create_physical_plan(
487 state,
488 FileScanConfigBuilder::new(object_store_url, file_source)
489 .with_file_groups(partitioned_file_lists)
490 .with_constraints(self.constraints.clone())
491 .with_statistics(statistics)
492 .with_projection_indices(projection)?
493 .with_limit(limit)
494 .with_output_ordering(output_ordering)
495 .with_expr_adapter(self.expr_adapter_factory.clone())
496 .with_partitioned_by_file_group(partitioned_by_file_group)
497 .build(),
498 )
499 .await?;
500
501 Ok(ScanResult::new(plan))
502 }
503
504 fn supports_filters_pushdown(
505 &self,
506 filters: &[&Expr],
507 ) -> datafusion_common::Result<Vec<TableProviderFilterPushDown>> {
508 let partition_column_names = self
509 .options
510 .table_partition_cols
511 .iter()
512 .map(|col| col.0.as_str())
513 .collect::<Vec<_>>();
514 filters
515 .iter()
516 .map(|filter| {
517 if can_be_evaluated_for_partition_pruning(&partition_column_names, filter)
518 {
519 return Ok(TableProviderFilterPushDown::Exact);
521 }
522
523 Ok(TableProviderFilterPushDown::Inexact)
524 })
525 .collect()
526 }
527
528 fn get_table_definition(&self) -> Option<&str> {
529 self.definition.as_deref()
530 }
531
532 async fn insert_into(
533 &self,
534 state: &dyn Session,
535 input: Arc<dyn ExecutionPlan>,
536 insert_op: InsertOp,
537 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
538 self.schema()
540 .logically_equivalent_names_and_types(&input.schema())?;
541
542 let table_path = &self.table_paths()[0];
543 if !table_path.is_collection() {
544 return plan_err!(
545 "Inserting into a ListingTable backed by a single file is not supported, URL is possibly missing a trailing `/`. \
546 To append to an existing file use StreamTable, e.g. by using CREATE UNBOUNDED EXTERNAL TABLE"
547 );
548 }
549
550 let store = state.runtime_env().object_store(table_path)?;
552
553 let file_list_stream = pruned_partition_list(
554 state,
555 store.as_ref(),
556 table_path,
557 &[],
558 &self.options.file_extension,
559 &self.options.table_partition_cols,
560 )
561 .await?;
562
563 let file_group = file_list_stream.try_collect::<Vec<_>>().await?.into();
564 let keep_partition_by_columns =
565 state.config_options().execution.keep_partition_by_columns;
566
567 if let Some(lfc) = state.runtime_env().cache_manager.get_list_files_cache() {
569 let key = TableScopedPath {
570 table: table_path.get_table_ref().clone(),
571 path: table_path.prefix().clone(),
572 };
573 let _ = lfc.remove(&key);
574 }
575
576 let config = FileSinkConfig {
578 original_url: String::default(),
579 object_store_url: self.table_paths()[0].object_store(),
580 table_paths: self.table_paths().clone(),
581 file_group,
582 output_schema: self.schema(),
583 table_partition_cols: self.options.table_partition_cols.clone(),
584 insert_op,
585 keep_partition_by_columns,
586 file_extension: self.options().format.get_ext(),
587 };
588
589 let orderings = self.try_create_output_ordering(state.execution_props())?;
590 let order_requirements = orderings.into_iter().next().map(Into::into);
592
593 self.options()
594 .format
595 .create_writer_physical_plan(input, state, config, order_requirements)
596 .await
597 }
598
599 fn get_column_default(&self, column: &str) -> Option<&Expr> {
600 self.column_defaults.get(column)
601 }
602}
603
604impl ListingTable {
605 pub async fn list_files_for_scan<'a>(
609 &'a self,
610 ctx: &'a dyn Session,
611 filters: &'a [Expr],
612 limit: Option<usize>,
613 ) -> datafusion_common::Result<ListFilesResult> {
614 let store = if let Some(url) = self.table_paths.first() {
615 ctx.runtime_env().object_store(url)?
616 } else {
617 return Ok(ListFilesResult {
618 file_groups: vec![],
619 statistics: Statistics::new_unknown(&self.file_schema),
620 grouped_by_partition: false,
621 });
622 };
623 let file_list = future::try_join_all(self.table_paths.iter().map(|table_path| {
625 pruned_partition_list(
626 ctx,
627 store.as_ref(),
628 table_path,
629 filters,
630 &self.options.file_extension,
631 &self.options.table_partition_cols,
632 )
633 }))
634 .await?;
635 let meta_fetch_concurrency =
636 ctx.config_options().execution.meta_fetch_concurrency;
637 let file_list = stream::iter(file_list).flatten_unordered(meta_fetch_concurrency);
638 let files = file_list
640 .map(|part_file| async {
641 let part_file = part_file?;
642 let statistics = if self.options.collect_stat {
643 self.do_collect_statistics(ctx, &store, &part_file).await?
644 } else {
645 Arc::new(Statistics::new_unknown(&self.file_schema))
646 };
647 Ok(part_file.with_statistics(statistics))
648 })
649 .boxed()
650 .buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency);
651
652 let (file_group, inexact_stats) =
653 get_files_with_limit(files, limit, self.options.collect_stat).await?;
654
655 let threshold = ctx.config_options().optimizer.preserve_file_partitions;
661
662 let (file_groups, grouped_by_partition) = if threshold > 0
663 && !self.options.table_partition_cols.is_empty()
664 {
665 let grouped =
666 file_group.group_by_partition_values(self.options.target_partitions);
667 if grouped.len() >= threshold {
668 (grouped, true)
669 } else {
670 let all_files: Vec<_> =
671 grouped.into_iter().flat_map(|g| g.into_inner()).collect();
672 (
673 FileGroup::new(all_files).split_files(self.options.target_partitions),
674 false,
675 )
676 }
677 } else {
678 (
679 file_group.split_files(self.options.target_partitions),
680 false,
681 )
682 };
683
684 let (file_groups, stats) = compute_all_files_statistics(
685 file_groups,
686 self.schema(),
687 self.options.collect_stat,
688 inexact_stats,
689 )?;
690
691 Ok(ListFilesResult {
696 file_groups,
697 statistics: stats,
698 grouped_by_partition,
699 })
700 }
701
702 async fn do_collect_statistics(
708 &self,
709 ctx: &dyn Session,
710 store: &Arc<dyn ObjectStore>,
711 part_file: &PartitionedFile,
712 ) -> datafusion_common::Result<Arc<Statistics>> {
713 match self
714 .collected_statistics
715 .get_with_extra(&part_file.object_meta.location, &part_file.object_meta)
716 {
717 Some(statistics) => Ok(statistics),
718 None => {
719 let statistics = self
720 .options
721 .format
722 .infer_stats(
723 ctx,
724 store,
725 Arc::clone(&self.file_schema),
726 &part_file.object_meta,
727 )
728 .await?;
729 let statistics = Arc::new(statistics);
730 self.collected_statistics.put_with_extra(
731 &part_file.object_meta.location,
732 Arc::clone(&statistics),
733 &part_file.object_meta,
734 );
735 Ok(statistics)
736 }
737 }
738 }
739}
740
741async fn get_files_with_limit(
762 files: impl Stream<Item = datafusion_common::Result<PartitionedFile>>,
763 limit: Option<usize>,
764 collect_stats: bool,
765) -> datafusion_common::Result<(FileGroup, bool)> {
766 let mut file_group = FileGroup::default();
767 let mut all_files = Box::pin(files.fuse());
769 enum ProcessingState {
770 ReadingFiles,
771 ReachedLimit,
772 }
773
774 let mut state = ProcessingState::ReadingFiles;
775 let mut num_rows = Precision::Absent;
776
777 while let Some(file_result) = all_files.next().await {
778 if matches!(state, ProcessingState::ReachedLimit) {
780 break;
781 }
782
783 let file = file_result?;
784
785 if collect_stats && let Some(file_stats) = &file.statistics {
787 num_rows = if file_group.is_empty() {
788 file_stats.num_rows
790 } else {
791 num_rows.add(&file_stats.num_rows)
793 };
794 }
795
796 file_group.push(file);
798
799 if let Some(limit) = limit
801 && let Precision::Exact(row_count) = num_rows
802 && row_count > limit
803 {
804 state = ProcessingState::ReachedLimit;
805 }
806 }
807 let inexact_stats = all_files.next().await.is_some();
811 Ok((file_group, inexact_stats))
812}