datafusion_materialized_views/materialized/
file_metadata.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow::array::{StringBuilder, TimestampNanosecondBuilder, UInt64Builder};
19use arrow::record_batch::RecordBatch;
20use arrow_schema::{DataType, Field, TimeUnit};
21use async_trait::async_trait;
22use datafusion::arrow::datatypes::{Schema, SchemaRef};
23use datafusion::catalog::SchemaProvider;
24use datafusion::catalog::{CatalogProvider, Session};
25use datafusion::datasource::listing::ListingTableUrl;
26use datafusion::datasource::TableProvider;
27use datafusion::execution::object_store::ObjectStoreUrl;
28use datafusion::physical_expr::{create_physical_expr, EquivalenceProperties};
29use datafusion::physical_plan::expressions::{BinaryExpr, Column, Literal};
30use datafusion::physical_plan::limit::LimitStream;
31use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
32use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
33use datafusion::physical_plan::{
34    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr, PlanProperties,
35};
36use datafusion::{
37    catalog::CatalogProviderList, execution::TaskContext, physical_plan::SendableRecordBatchStream,
38};
39use datafusion_common::{DFSchema, DataFusionError, Result, ScalarValue};
40use datafusion_expr::{Expr, Operator, TableProviderFilterPushDown, TableType};
41use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
42use futures::stream::{self, BoxStream};
43use futures::{future, Future, FutureExt, StreamExt, TryStreamExt};
44use itertools::Itertools;
45use log::debug;
46use object_store::{ObjectMeta, ObjectStore};
47use std::any::Any;
48use std::sync::Arc;
49
50use crate::materialized::cast_to_listing_table;
51
52/// A virtual file metadata table, inspired by the information schema column table.
53#[derive(Debug, Clone)]
54pub struct FileMetadata {
55    table_schema: SchemaRef,
56    catalog_list: Arc<dyn CatalogProviderList>,
57    metadata_provider: Arc<dyn FileMetadataProvider>,
58}
59
60impl FileMetadata {
61    /// Construct a new [`FileMetadata`] table provider that lists files for all
62    /// tables in the provided catalog list.
63    pub fn new(
64        catalog_list: Arc<dyn CatalogProviderList>,
65        metadata_provider: Arc<dyn FileMetadataProvider>,
66    ) -> Self {
67        Self {
68            table_schema: Arc::new(Schema::new(vec![
69                Field::new("table_catalog", DataType::Utf8, false),
70                Field::new("table_schema", DataType::Utf8, false),
71                Field::new("table_name", DataType::Utf8, false),
72                Field::new("file_path", DataType::Utf8, false),
73                Field::new(
74                    "last_modified",
75                    DataType::Timestamp(TimeUnit::Nanosecond, Some(Arc::from("UTC"))),
76                    false,
77                ),
78                Field::new("size", DataType::UInt64, false),
79            ])),
80            catalog_list,
81            metadata_provider,
82        }
83    }
84}
85
86#[async_trait]
87impl TableProvider for FileMetadata {
88    fn as_any(&self) -> &dyn Any {
89        self
90    }
91
92    fn schema(&self) -> SchemaRef {
93        self.table_schema.clone()
94    }
95
96    fn table_type(&self) -> TableType {
97        TableType::Base
98    }
99
100    async fn scan(
101        &self,
102        session_state: &dyn Session,
103        projection: Option<&Vec<usize>>,
104        filters: &[Expr],
105        limit: Option<usize>,
106    ) -> Result<Arc<dyn ExecutionPlan>> {
107        let dfschema = DFSchema::try_from(self.table_schema.as_ref().clone())?;
108
109        let filters = filters
110            .iter()
111            .map(|expr| {
112                create_physical_expr(expr, &dfschema, session_state.execution_props())
113                    .map_err(|e| e.context("failed to create file metadata physical expr"))
114            })
115            .collect::<Result<Vec<_>, _>>()?;
116
117        let exec = FileMetadataExec::try_new(
118            self.table_schema.clone(),
119            projection.cloned(),
120            filters,
121            limit,
122            self.catalog_list.clone(),
123            self.metadata_provider.clone(),
124        )?;
125
126        Ok(Arc::new(exec))
127    }
128
129    fn supports_filters_pushdown(
130        &self,
131        filters: &[&Expr],
132    ) -> Result<Vec<TableProviderFilterPushDown>> {
133        Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
134    }
135}
136
137/// An [`ExecutionPlan`] that scans object store metadata.
138pub struct FileMetadataExec {
139    table_schema: SchemaRef,
140    plan_properties: PlanProperties,
141    projection: Option<Vec<usize>>,
142    filters: Vec<Arc<dyn PhysicalExpr>>,
143    limit: Option<usize>,
144    metrics: ExecutionPlanMetricsSet,
145    catalog_list: Arc<dyn CatalogProviderList>,
146    metadata_provider: Arc<dyn FileMetadataProvider>,
147}
148
149impl FileMetadataExec {
150    fn try_new(
151        table_schema: SchemaRef,
152        projection: Option<Vec<usize>>,
153        filters: Vec<Arc<dyn PhysicalExpr>>,
154        limit: Option<usize>,
155        catalog_list: Arc<dyn CatalogProviderList>,
156        metadata_provider: Arc<dyn FileMetadataProvider>,
157    ) -> Result<Self> {
158        let projected_schema = match projection.as_ref() {
159            Some(projection) => Arc::new(table_schema.project(projection)?),
160            None => table_schema.clone(),
161        };
162        let eq_properties = EquivalenceProperties::new(projected_schema);
163        let partitioning = Partitioning::UnknownPartitioning(1);
164        let plan_properties = PlanProperties::new(
165            eq_properties,
166            partitioning,
167            EmissionType::Final,
168            Boundedness::Bounded,
169        );
170
171        let exec = Self {
172            table_schema,
173            plan_properties,
174            projection,
175            filters,
176            limit,
177            metrics: ExecutionPlanMetricsSet::new(),
178            catalog_list,
179            metadata_provider,
180        };
181
182        Ok(exec)
183    }
184}
185
186impl ExecutionPlan for FileMetadataExec {
187    fn as_any(&self) -> &dyn Any {
188        self
189    }
190
191    fn name(&self) -> &str {
192        "FileMetadataExec"
193    }
194
195    fn properties(&self) -> &PlanProperties {
196        &self.plan_properties
197    }
198
199    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
200        vec![]
201    }
202
203    fn with_new_children(
204        self: Arc<Self>,
205        _children: Vec<Arc<dyn ExecutionPlan>>,
206    ) -> Result<Arc<dyn ExecutionPlan>> {
207        Ok(self)
208    }
209
210    fn execute(
211        &self,
212        partition: usize,
213        context: Arc<TaskContext>,
214    ) -> Result<SendableRecordBatchStream> {
215        let projection = self.projection.clone();
216        let record_batches = self.build_record_batch(context)?;
217
218        let projected_record_batches = record_batches.map(move |record_batches| {
219            let record_batches = match record_batches {
220                Ok(record_batches) => record_batches,
221                Err(err) => return vec![Err(err)],
222            };
223
224            if let Some(projection) = projection {
225                return record_batches
226                    .into_iter()
227                    .map(|record_batch| {
228                        record_batch
229                            .project(&projection)
230                            .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))
231                    })
232                    .collect::<Vec<_>>();
233            }
234
235            record_batches.into_iter().map(Ok).collect::<Vec<_>>()
236        });
237
238        let mut record_batch_stream: SendableRecordBatchStream =
239            Box::pin(RecordBatchStreamAdapter::new(
240                self.schema(),
241                futures::stream::once(projected_record_batches)
242                    .map(stream::iter)
243                    .flatten(),
244            ));
245
246        if let Some(limit) = self.limit {
247            let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
248            let limit_stream =
249                LimitStream::new(record_batch_stream, 0, Some(limit), baseline_metrics);
250            record_batch_stream = Box::pin(limit_stream);
251        }
252
253        Ok(record_batch_stream)
254    }
255
256    fn metrics(&self) -> Option<MetricsSet> {
257        Some(self.metrics.clone_inner())
258    }
259}
260
261impl FileMetadataExec {
262    fn get_column_index(&self, column_name: &str) -> Result<usize> {
263        let (index, _) = self
264            .table_schema
265            .column_with_name(column_name)
266            .ok_or_else(|| {
267                DataFusionError::Internal(format!("column '{column_name}' does not exists"))
268            })?;
269        Ok(index)
270    }
271
272    /// Get the string literal value from an 'equals' BinaryExpr with a column.
273    fn get_column_literal(column_idx: usize, filter: &Arc<dyn PhysicalExpr>) -> Option<String> {
274        let binary_expr = filter.as_any().downcast_ref::<BinaryExpr>()?;
275
276        if !matches!(binary_expr.op(), Operator::Eq) {
277            return None;
278        }
279
280        let (column, literal) = if let Some(left_column) =
281            binary_expr.left().as_any().downcast_ref::<Column>()
282        {
283            let right_literal = binary_expr.right().as_any().downcast_ref::<Literal>()?;
284            (left_column, right_literal)
285        } else if let Some(right_column) = binary_expr.right().as_any().downcast_ref::<Column>() {
286            let left_literal = binary_expr.left().as_any().downcast_ref::<Literal>()?;
287            (right_column, left_literal)
288        } else {
289            return None;
290        };
291
292        if column.index() != column_idx {
293            return None;
294        }
295
296        match literal.value() {
297            ScalarValue::Utf8(str) => str.clone(),
298            ScalarValue::LargeUtf8(str) => str.clone(),
299            _ => None,
300        }
301    }
302
303    /// Builds a RecordBatch containing file metadata that satisfies the provided filters.
304    fn build_record_batch(
305        &self,
306        context: Arc<TaskContext>,
307    ) -> Result<impl Future<Output = Result<Vec<RecordBatch>>>> {
308        let catalog_column = self.get_column_index("table_catalog")?;
309        let schema_column = self.get_column_index("table_schema")?;
310        let table_column = self.get_column_index("table_name")?;
311
312        let catalog_name = self
313            .filters
314            .iter()
315            .filter_map(|filter| Self::get_column_literal(catalog_column, filter))
316            .next();
317
318        let schema_name = self
319            .filters
320            .iter()
321            .filter_map(|filter| Self::get_column_literal(schema_column, filter))
322            .next();
323
324        let table_name = self
325            .filters
326            .iter()
327            .filter_map(|filter| Self::get_column_literal(table_column, filter))
328            .next();
329
330        let table_schema = self.table_schema.clone();
331        let catalog_list = self.catalog_list.clone();
332        let metadata_provider = self.metadata_provider.clone();
333
334        let record_batch = async move {
335            // If we cannot determine the catalog, build from the entire catalog list.
336            let catalog_name = match catalog_name {
337                Some(catalog_name) => catalog_name,
338                None => {
339                    debug!("No catalog filter exists, returning entire catalog list.");
340                    return FileMetadataBuilder::build_from_catalog_list(
341                        catalog_list,
342                        metadata_provider,
343                        table_schema,
344                        context,
345                    )
346                    .await;
347                }
348            };
349
350            // If the specified catalog doesn't exist, return an empty result;
351            let catalog_provider = match catalog_list.catalog(&catalog_name) {
352                Some(catalog_provider) => catalog_provider,
353                None => {
354                    debug!("No catalog named '{catalog_name}' exists, returning an empty result.");
355                    return Ok(vec![]);
356                }
357            };
358
359            // If we cannot determine the schema, build from the catalog.
360            let schema_name = match schema_name {
361                Some(schema_name) => schema_name,
362                None => {
363                    debug!("No schema filter exists, returning catalog '{catalog_name}'.");
364                    return FileMetadataBuilder::build_from_catalog(
365                        &catalog_name,
366                        catalog_provider,
367                        metadata_provider,
368                        table_schema,
369                        context,
370                    )
371                    .await;
372                }
373            };
374
375            // If the specified schema doesn't exist, return an empty result.
376            let schema_provider = match catalog_provider.schema(&schema_name) {
377                Some(schema_provider) => schema_provider,
378                None => {
379                    debug!("No schema named '{catalog_name}.{schema_name}' exists, returning an empty result.");
380                    return Ok(vec![]);
381                }
382            };
383
384            // If we cannot determine a table , build from the schema.
385            let table_name = match table_name {
386                Some(table_name) => table_name,
387                None => {
388                    debug!(
389                        "No table filter exists, returning schema '{catalog_name}.{schema_name}'."
390                    );
391                    return FileMetadataBuilder::build_from_schema(
392                        &catalog_name,
393                        &schema_name,
394                        schema_provider,
395                        metadata_provider,
396                        table_schema,
397                        context,
398                    )
399                    .await;
400                }
401            };
402
403            // If the specified table doesn't exist, return an empty result;
404            let table_provider = match schema_provider.table(&table_name).await? {
405                Some(table_provider) => table_provider,
406                None => {
407                    debug!("No table named '{catalog_name}.{schema_name}.{table_name}' exists, returning an empty result.");
408                    return Ok(vec![]);
409                }
410            };
411
412            debug!("Returning table '{catalog_name}.{schema_name}.{table_name}'.");
413
414            let record_batch = FileMetadataBuilder::build_from_table(
415                &catalog_name,
416                &schema_name,
417                &table_name,
418                table_provider,
419                metadata_provider,
420                table_schema,
421                context,
422            )
423            .await?;
424
425            Ok(record_batch.into_iter().collect_vec())
426        };
427
428        Ok(record_batch)
429    }
430}
431
432impl std::fmt::Debug for FileMetadataExec {
433    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
434        f.debug_struct("FileMetadataExec")
435            .field("plan_properties", &self.plan_properties)
436            .field("filters", &self.filters)
437            .field("limit", &self.limit)
438            .finish_non_exhaustive()
439    }
440}
441
442impl DisplayAs for FileMetadataExec {
443    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
444        write!(f, "FileMetadataExec: ")?;
445
446        write!(f, "filters=[")?;
447        let mut filters = self.filters.iter().peekable();
448        while let Some(filter) = filters.next() {
449            std::fmt::Display::fmt(filter, f)?;
450            if filters.peek().is_some() {
451                write!(f, ", ")?;
452            }
453        }
454        write!(f, "]")?;
455
456        if let Some(limit) = &self.limit {
457            write!(f, ", limit={limit}")?;
458        }
459
460        Ok(())
461    }
462}
463
464struct FileMetadataBuilder {
465    schema: SchemaRef,
466    metadata_provider: Arc<dyn FileMetadataProvider>,
467    catalog_names: StringBuilder,
468    schema_names: StringBuilder,
469    table_names: StringBuilder,
470    file_paths: StringBuilder,
471    last_modified: TimestampNanosecondBuilder,
472    size: UInt64Builder,
473}
474
475impl FileMetadataBuilder {
476    fn new(schema: SchemaRef, metadata_provider: Arc<dyn FileMetadataProvider>) -> Self {
477        Self {
478            schema,
479            metadata_provider,
480            catalog_names: StringBuilder::new(),
481            schema_names: StringBuilder::new(),
482            table_names: StringBuilder::new(),
483            file_paths: StringBuilder::new(),
484            last_modified: TimestampNanosecondBuilder::new().with_timezone("UTC"),
485            size: UInt64Builder::new(),
486        }
487    }
488
489    async fn build_from_catalog_list(
490        catalog_list: Arc<dyn CatalogProviderList>,
491        metadata_provider: Arc<dyn FileMetadataProvider>,
492        schema: SchemaRef,
493        context: Arc<TaskContext>,
494    ) -> Result<Vec<RecordBatch>> {
495        let mut tasks = vec![];
496
497        for catalog_name in catalog_list.catalog_names() {
498            let catalog_provider = match catalog_list.catalog(&catalog_name) {
499                Some(catalog_provider) => catalog_provider,
500                None => continue,
501            };
502            let metadata_provider = metadata_provider.clone();
503            let schema = schema.clone();
504            let context = context.clone();
505
506            tasks.push(async move {
507                Self::build_from_catalog(
508                    &catalog_name,
509                    catalog_provider,
510                    metadata_provider,
511                    schema,
512                    context,
513                )
514                .await
515            });
516        }
517
518        let results = future::join_all(tasks).await;
519
520        let record_batches = results
521            .into_iter()
522            .collect::<Result<Vec<_>, _>>()?
523            .into_iter()
524            .flatten()
525            .collect();
526
527        Ok(record_batches)
528    }
529
530    async fn build_from_catalog(
531        catalog_name: &str,
532        catalog_provider: Arc<dyn CatalogProvider>,
533        metadata_provider: Arc<dyn FileMetadataProvider>,
534        schema: SchemaRef,
535        context: Arc<TaskContext>,
536    ) -> Result<Vec<RecordBatch>> {
537        let mut tasks = vec![];
538
539        for schema_name in catalog_provider.schema_names() {
540            let schema_provider = match catalog_provider.schema(&schema_name) {
541                Some(schema_provider) => schema_provider,
542                None => continue,
543            };
544            let metadata_provider = metadata_provider.clone();
545            let schema = schema.clone();
546            let context = context.clone();
547
548            tasks.push(async move {
549                Self::build_from_schema(
550                    catalog_name,
551                    &schema_name,
552                    schema_provider,
553                    metadata_provider,
554                    schema,
555                    context,
556                )
557                .await
558            });
559        }
560
561        let results = future::join_all(tasks).await;
562
563        let record_batches = results
564            .into_iter()
565            .collect::<Result<Vec<_>, _>>()?
566            .into_iter()
567            .flatten()
568            .collect();
569
570        Ok(record_batches)
571    }
572
573    async fn build_from_schema(
574        catalog_name: &str,
575        schema_name: &str,
576        schema_provider: Arc<dyn SchemaProvider>,
577        metadata_provider: Arc<dyn FileMetadataProvider>,
578        schema: SchemaRef,
579        context: Arc<TaskContext>,
580    ) -> Result<Vec<RecordBatch>> {
581        let mut tasks = vec![];
582
583        for table_name in schema_provider.table_names() {
584            let table_provider = match schema_provider.table(&table_name).await? {
585                Some(table_provider) => table_provider,
586                None => continue,
587            };
588            let metadata_provider = metadata_provider.clone();
589            let schema = schema.clone();
590            let context = context.clone();
591
592            tasks.push(async move {
593                Self::build_from_table(
594                    catalog_name,
595                    schema_name,
596                    &table_name,
597                    table_provider,
598                    metadata_provider,
599                    schema,
600                    context,
601                )
602                .await
603            })
604        }
605
606        let results = future::join_all(tasks).await;
607        let record_batches = results
608            .into_iter()
609            .collect::<Result<Vec<_>, _>>()?
610            .into_iter()
611            .flatten()
612            .collect();
613
614        Ok(record_batches)
615    }
616
617    async fn build_from_table(
618        catalog_name: &str,
619        schema_name: &str,
620        table_name: &str,
621        table_provider: Arc<dyn TableProvider>,
622        metadata_provider: Arc<dyn FileMetadataProvider>,
623        schema: SchemaRef,
624        context: Arc<TaskContext>,
625    ) -> Result<Option<RecordBatch>> {
626        let mut builder = Self::new(schema.clone(), metadata_provider.clone());
627
628        let listing_table_like = match cast_to_listing_table(table_provider.as_ref()) {
629            None => return Ok(None),
630            Some(t) => t,
631        };
632
633        let table_paths = listing_table_like.table_paths();
634        let file_extension = listing_table_like.file_ext();
635
636        for table_path in table_paths {
637            builder
638                .read_table_files(
639                    catalog_name,
640                    schema_name,
641                    table_name,
642                    &table_path,
643                    &file_extension,
644                    &context,
645                )
646                .await?;
647        }
648
649        builder.finish().map(Some)
650    }
651
652    async fn read_table_files(
653        &mut self,
654        catalog_name: &str,
655        schema_name: &str,
656        table_name: &str,
657        table_path: &ListingTableUrl,
658        file_ext: &str,
659        context: &TaskContext,
660    ) -> Result<()> {
661        let store_url = table_path.object_store();
662        let store = context.runtime_env().object_store(table_path)?;
663
664        let mut file_stream = self
665            .metadata_provider
666            .list_all_files(
667                store.clone(),
668                table_path.clone(),
669                file_ext.to_string(),
670                context
671                    .session_config()
672                    .options()
673                    .execution
674                    .listing_table_ignore_subdirectory,
675            )
676            .await;
677
678        while let Some(file_meta) = file_stream.try_next().await? {
679            self.append(
680                catalog_name,
681                schema_name,
682                table_name,
683                &store_url,
684                &file_meta,
685            );
686        }
687
688        Ok(())
689    }
690
691    fn append(
692        &mut self,
693        catalog_name: &str,
694        schema_name: &str,
695        table_name: &str,
696        store_url: &ObjectStoreUrl,
697        meta: &ObjectMeta,
698    ) {
699        self.catalog_names.append_value(catalog_name);
700        self.schema_names.append_value(schema_name);
701        self.table_names.append_value(table_name);
702        self.file_paths
703            .append_value(format!("{store_url}{}", meta.location));
704        self.last_modified
705            .append_option(meta.last_modified.timestamp_nanos_opt());
706        self.size.append_value(meta.size); // this is not lossy assuming we're on a 64-bit platform
707    }
708
709    fn finish(mut self) -> Result<RecordBatch> {
710        RecordBatch::try_new(
711            self.schema,
712            vec![
713                Arc::new(self.catalog_names.finish()),
714                Arc::new(self.schema_names.finish()),
715                Arc::new(self.table_names.finish()),
716                Arc::new(self.file_paths.finish()),
717                Arc::new(self.last_modified.finish()),
718                Arc::new(self.size.finish()),
719            ],
720        )
721        .map_err(From::from)
722    }
723}
724
725/// Provides [`ObjectMeta`] data to the [`FileMetadata`] table provider.
726#[async_trait]
727pub trait FileMetadataProvider: std::fmt::Debug + Send + Sync {
728    /// List all files in the store for the given `url` prefix.
729    async fn list_all_files(
730        &self,
731        store: Arc<dyn ObjectStore>,
732        url: ListingTableUrl,
733        file_extension: String,
734        ignore_subdirectory: bool,
735    ) -> BoxStream<'static, Result<ObjectMeta>>;
736}
737
738/// Default implementation of the [`FileMetadataProvider`].
739#[derive(Debug)]
740pub struct DefaultFileMetadataProvider;
741
742#[async_trait]
743impl FileMetadataProvider for DefaultFileMetadataProvider {
744    // Mostly copied from ListingTableUrl::list_all_files, which is private to that crate
745    // Modified to handle empty tables
746    async fn list_all_files(
747        &self,
748        store: Arc<dyn ObjectStore>,
749        url: ListingTableUrl,
750        file_extension: String,
751        ignore_subdirectory: bool,
752    ) -> BoxStream<'static, Result<ObjectMeta>> {
753        // Check if the directory exists yet
754        if let Err(object_store::Error::NotFound { path, .. }) =
755            store.list_with_delimiter(Some(url.prefix())).await
756        {
757            debug!(
758                "attempted to list empty table at {path} during file_metadata listing, returning empty list"
759            );
760            return Box::pin(stream::empty());
761        }
762
763        let is_dir = url.as_str().ends_with('/');
764        let prefix = url.prefix().clone();
765
766        let list = match is_dir {
767            true => store.list(Some(&prefix)),
768            false => futures::stream::once(async move { store.head(&prefix).await }).boxed(),
769        };
770
771        list.map_err(Into::into)
772            .try_filter(move |meta| {
773                let path = &meta.location;
774                let extension_match = path.as_ref().ends_with(&file_extension);
775                let glob_match = url.contains(path, ignore_subdirectory);
776                futures::future::ready(extension_match && glob_match)
777            })
778            .boxed()
779    }
780}
781
782#[cfg(test)]
783mod test {
784    use std::{ops::Deref, sync::Arc};
785
786    use anyhow::{Context, Result};
787    use datafusion::{
788        assert_batches_sorted_eq,
789        catalog::{MemoryCatalogProvider, MemorySchemaProvider},
790        execution::{
791            object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
792            runtime_env::RuntimeEnvBuilder,
793        },
794        prelude::{SessionConfig, SessionContext},
795    };
796    use object_store::local::LocalFileSystem;
797    use tempfile::TempDir;
798    use url::Url;
799
800    use super::{DefaultFileMetadataProvider, FileMetadata};
801
802    struct TestContext {
803        _dir: TempDir,
804        ctx: SessionContext,
805    }
806
807    impl Deref for TestContext {
808        type Target = SessionContext;
809
810        fn deref(&self) -> &Self::Target {
811            &self.ctx
812        }
813    }
814
815    async fn setup() -> Result<TestContext> {
816        let _ = env_logger::builder().is_test(true).try_init();
817
818        let dir = TempDir::new().context("create tempdir")?;
819        let store = LocalFileSystem::new_with_prefix(&dir)
820            .map(Arc::new)
821            .context("create local file system object store")?;
822
823        let registry = Arc::new(DefaultObjectStoreRegistry::new());
824        registry
825            .register_store(&Url::parse("file://").unwrap(), store)
826            .context("register file system store")
827            .expect("should replace existing object store at file://");
828
829        let ctx = SessionContext::new_with_config_rt(
830            SessionConfig::new(),
831            RuntimeEnvBuilder::new()
832                .with_object_store_registry(registry)
833                .build_arc()
834                .context("create RuntimeEnv")?,
835        );
836
837        ctx.catalog("datafusion")
838            .context("get default catalog")?
839            .register_schema("private", Arc::<MemorySchemaProvider>::default())
840            .context("register datafusion.private schema")?;
841
842        ctx.register_catalog("datafusion_mv", Arc::<MemoryCatalogProvider>::default());
843
844        ctx.catalog("datafusion_mv")
845            .context("get datafusion_mv catalog")?
846            .register_schema("public", Arc::<MemorySchemaProvider>::default())
847            .context("register datafusion_mv.public schema")?;
848
849        ctx.sql(
850            "
851            CREATE EXTERNAL TABLE t1 (num INTEGER, year TEXT)
852            STORED AS CSV
853            PARTITIONED BY (year)
854            LOCATION 'file:///t1/'
855        ",
856        )
857        .await?
858        .collect()
859        .await?;
860
861        ctx.sql(
862            "INSERT INTO t1 VALUES
863            (1, '2021'),
864            (2, '2022'),
865            (3, '2023'),
866            (4, '2024')
867        ",
868        )
869        .await?
870        .collect()
871        .await?;
872
873        ctx.sql(
874            "
875            CREATE EXTERNAL TABLE private.t1 (num INTEGER, year TEXT, month TEXT)
876            STORED AS CSV
877            PARTITIONED BY (year, month)
878            LOCATION 'file:///private/t1/'
879        ",
880        )
881        .await?
882        .collect()
883        .await?;
884
885        ctx.sql(
886            "INSERT INTO private.t1 VALUES
887            (1, '2021', '01'),
888            (2, '2022', '02'),
889            (3, '2023', '03'),
890            (4, '2024', '04')
891        ",
892        )
893        .await?
894        .collect()
895        .await?;
896
897        ctx.sql(
898            "
899            CREATE EXTERNAL TABLE datafusion_mv.public.t3 (num INTEGER, date DATE)
900            STORED AS CSV
901            PARTITIONED BY (date)
902            LOCATION 'file:///datafusion_mv/public/t3/'
903        ",
904        )
905        .await?
906        .collect()
907        .await?;
908
909        ctx.sql(
910            "INSERT INTO datafusion_mv.public.t3 VALUES
911            (1, '2021-01-01'),
912            (2, '2022-02-02'),
913            (3, '2023-03-03'),
914            (4, '2024-04-04')
915        ",
916        )
917        .await?
918        .collect()
919        .await?;
920
921        ctx.register_table(
922            "file_metadata",
923            Arc::new(FileMetadata::new(
924                Arc::clone(ctx.state().catalog_list()),
925                Arc::new(DefaultFileMetadataProvider),
926            )),
927        )
928        .context("register file metadata table")?;
929
930        ctx.sql(
931            // Remove timestamps and trim (randomly generated) file names since they're not stable in tests
932            "CREATE VIEW file_metadata_test_view AS SELECT
933                * EXCLUDE(file_path, last_modified),
934                regexp_replace(file_path, '/[^/]*$', '/') AS file_path
935            FROM file_metadata",
936        )
937        .await
938        .context("create file metadata test view")?;
939
940        Ok(TestContext { _dir: dir, ctx })
941    }
942
943    #[tokio::test]
944    async fn test_list_all_files() -> Result<()> {
945        let ctx = setup().await.context("setup")?;
946
947        let results = ctx
948            .sql("SELECT * FROM file_metadata_test_view")
949            .await?
950            .collect()
951            .await?;
952
953        assert_batches_sorted_eq!(&[
954            "+---------------+--------------+------------+------+--------------------------------------------------+",
955            "| table_catalog | table_schema | table_name | size | file_path                                        |",
956            "+---------------+--------------+------------+------+--------------------------------------------------+",
957            "| datafusion    | private      | t1         | 6    | file:///private/t1/year=2021/month=01/           |",
958            "| datafusion    | private      | t1         | 6    | file:///private/t1/year=2022/month=02/           |",
959            "| datafusion    | private      | t1         | 6    | file:///private/t1/year=2023/month=03/           |",
960            "| datafusion    | private      | t1         | 6    | file:///private/t1/year=2024/month=04/           |",
961            "| datafusion    | public       | t1         | 6    | file:///t1/year=2021/                            |",
962            "| datafusion    | public       | t1         | 6    | file:///t1/year=2022/                            |",
963            "| datafusion    | public       | t1         | 6    | file:///t1/year=2023/                            |",
964            "| datafusion    | public       | t1         | 6    | file:///t1/year=2024/                            |",
965            "| datafusion_mv | public       | t3         | 6    | file:///datafusion_mv/public/t3/date=2021-01-01/ |",
966            "| datafusion_mv | public       | t3         | 6    | file:///datafusion_mv/public/t3/date=2022-02-02/ |",
967            "| datafusion_mv | public       | t3         | 6    | file:///datafusion_mv/public/t3/date=2023-03-03/ |",
968            "| datafusion_mv | public       | t3         | 6    | file:///datafusion_mv/public/t3/date=2024-04-04/ |",
969            "+---------------+--------------+------------+------+--------------------------------------------------+",
970        ], &results);
971
972        Ok(())
973    }
974
975    #[tokio::test]
976    async fn test_list_catalog() -> Result<()> {
977        let ctx = setup().await.context("setup")?;
978
979        let results = ctx
980            .sql(
981                "SELECT * FROM file_metadata_test_view
982                WHERE table_catalog = 'datafusion_mv'",
983            )
984            .await?
985            .collect()
986            .await?;
987
988        assert_batches_sorted_eq!(&[
989            "+---------------+--------------+------------+------+--------------------------------------------------+",
990            "| table_catalog | table_schema | table_name | size | file_path                                        |",
991            "+---------------+--------------+------------+------+--------------------------------------------------+",
992            "| datafusion_mv | public       | t3         | 6    | file:///datafusion_mv/public/t3/date=2021-01-01/ |",
993            "| datafusion_mv | public       | t3         | 6    | file:///datafusion_mv/public/t3/date=2022-02-02/ |",
994            "| datafusion_mv | public       | t3         | 6    | file:///datafusion_mv/public/t3/date=2023-03-03/ |",
995            "| datafusion_mv | public       | t3         | 6    | file:///datafusion_mv/public/t3/date=2024-04-04/ |",
996            "+---------------+--------------+------------+------+--------------------------------------------------+",
997        ], &results);
998
999        Ok(())
1000    }
1001
1002    #[tokio::test]
1003    async fn test_list_catalog_and_schema() -> Result<()> {
1004        let ctx = setup().await.context("setup")?;
1005
1006        let results = ctx
1007            .sql(
1008                "SELECT * FROM file_metadata_test_view
1009                WHERE table_catalog = 'datafusion' AND table_schema = 'private'",
1010            )
1011            .await?
1012            .collect()
1013            .await?;
1014
1015        assert_batches_sorted_eq!(&[
1016            "+---------------+--------------+------------+------+----------------------------------------+",
1017            "| table_catalog | table_schema | table_name | size | file_path                              |",
1018            "+---------------+--------------+------------+------+----------------------------------------+",
1019            "| datafusion    | private      | t1         | 6    | file:///private/t1/year=2021/month=01/ |",
1020            "| datafusion    | private      | t1         | 6    | file:///private/t1/year=2022/month=02/ |",
1021            "| datafusion    | private      | t1         | 6    | file:///private/t1/year=2023/month=03/ |",
1022            "| datafusion    | private      | t1         | 6    | file:///private/t1/year=2024/month=04/ |",
1023            "+---------------+--------------+------------+------+----------------------------------------+",
1024        ], &results);
1025
1026        Ok(())
1027    }
1028
1029    #[tokio::test]
1030    async fn test_list_schema_only() -> Result<()> {
1031        let ctx = setup().await.context("setup")?;
1032
1033        let results = ctx
1034            .sql(
1035                "SELECT * FROM file_metadata_test_view
1036                WHERE table_schema = 'public'",
1037            )
1038            .await?
1039            .collect()
1040            .await?;
1041
1042        assert_batches_sorted_eq!(&[
1043            "+---------------+--------------+------------+------+--------------------------------------------------+",
1044            "| table_catalog | table_schema | table_name | size | file_path                                        |",
1045            "+---------------+--------------+------------+------+--------------------------------------------------+",
1046            "| datafusion    | public       | t1         | 6    | file:///t1/year=2021/                            |",
1047            "| datafusion    | public       | t1         | 6    | file:///t1/year=2022/                            |",
1048            "| datafusion    | public       | t1         | 6    | file:///t1/year=2023/                            |",
1049            "| datafusion    | public       | t1         | 6    | file:///t1/year=2024/                            |",
1050            "| datafusion_mv | public       | t3         | 6    | file:///datafusion_mv/public/t3/date=2021-01-01/ |",
1051            "| datafusion_mv | public       | t3         | 6    | file:///datafusion_mv/public/t3/date=2022-02-02/ |",
1052            "| datafusion_mv | public       | t3         | 6    | file:///datafusion_mv/public/t3/date=2023-03-03/ |",
1053            "| datafusion_mv | public       | t3         | 6    | file:///datafusion_mv/public/t3/date=2024-04-04/ |",
1054            "+---------------+--------------+------------+------+--------------------------------------------------+",
1055        ], &results);
1056
1057        Ok(())
1058    }
1059
1060    #[tokio::test]
1061    async fn test_list_catalog_schema_and_table() -> Result<()> {
1062        let ctx = setup().await.context("setup")?;
1063
1064        let results = ctx
1065            .sql(
1066                "SELECT * FROM file_metadata_test_view
1067                WHERE table_catalog = 'datafusion' AND table_schema = 'public' AND table_name = 't1'",
1068            )
1069            .await?
1070            .collect()
1071            .await?;
1072
1073        assert_batches_sorted_eq!(
1074            &[
1075                "+---------------+--------------+------------+------+-----------------------+",
1076                "| table_catalog | table_schema | table_name | size | file_path             |",
1077                "+---------------+--------------+------------+------+-----------------------+",
1078                "| datafusion    | public       | t1         | 6    | file:///t1/year=2021/ |",
1079                "| datafusion    | public       | t1         | 6    | file:///t1/year=2022/ |",
1080                "| datafusion    | public       | t1         | 6    | file:///t1/year=2023/ |",
1081                "| datafusion    | public       | t1         | 6    | file:///t1/year=2024/ |",
1082                "+---------------+--------------+------------+------+-----------------------+",
1083            ],
1084            &results
1085        );
1086
1087        Ok(())
1088    }
1089
1090    #[tokio::test]
1091    async fn test_list_table_only() -> Result<()> {
1092        let ctx = setup().await.context("setup")?;
1093
1094        let results = ctx
1095            .sql(
1096                "SELECT * FROM file_metadata_test_view
1097                WHERE table_name = 't1'",
1098            )
1099            .await?
1100            .collect()
1101            .await?;
1102
1103        assert_batches_sorted_eq!(
1104            &[
1105                "+---------------+--------------+------------+------+----------------------------------------+",
1106                "| table_catalog | table_schema | table_name | size | file_path                              |",
1107                "+---------------+--------------+------------+------+----------------------------------------+",
1108                "| datafusion    | private      | t1         | 6    | file:///private/t1/year=2021/month=01/ |",
1109                "| datafusion    | private      | t1         | 6    | file:///private/t1/year=2022/month=02/ |",
1110                "| datafusion    | private      | t1         | 6    | file:///private/t1/year=2023/month=03/ |",
1111                "| datafusion    | private      | t1         | 6    | file:///private/t1/year=2024/month=04/ |",
1112                "| datafusion    | public       | t1         | 6    | file:///t1/year=2021/                  |",
1113                "| datafusion    | public       | t1         | 6    | file:///t1/year=2022/                  |",
1114                "| datafusion    | public       | t1         | 6    | file:///t1/year=2023/                  |",
1115                "| datafusion    | public       | t1         | 6    | file:///t1/year=2024/                  |",
1116                "+---------------+--------------+------------+------+----------------------------------------+",
1117            ],
1118            &results
1119        );
1120
1121        Ok(())
1122    }
1123}