datafusion_materialized_views/materialized/
file_metadata.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow::array::{Int64Builder, StringBuilder, UInt64Builder};
19use arrow::record_batch::RecordBatch;
20use arrow_schema::{DataType, Field, Schema, SchemaRef};
21use async_trait::async_trait;
22use datafusion::catalog::SchemaProvider;
23use datafusion::catalog::{CatalogProvider, Session};
24use datafusion::datasource::listing::ListingTableUrl;
25use datafusion::datasource::TableProvider;
26use datafusion::execution::object_store::ObjectStoreUrl;
27use datafusion::physical_expr::{create_physical_expr, EquivalenceProperties};
28use datafusion::physical_plan::expressions::{BinaryExpr, Column, Literal};
29use datafusion::physical_plan::limit::LimitStream;
30use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
31use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
32use datafusion::physical_plan::{
33    DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PhysicalExpr,
34    PlanProperties,
35};
36use datafusion::{
37    catalog::CatalogProviderList, execution::TaskContext, physical_plan::SendableRecordBatchStream,
38};
39use datafusion_common::{DataFusionError, Result, ScalarValue, ToDFSchema};
40use datafusion_expr::{Expr, Operator, TableProviderFilterPushDown, TableType};
41use futures::stream::{self, BoxStream};
42use futures::{future, Future, FutureExt, StreamExt, TryStreamExt};
43use itertools::Itertools;
44use log::debug;
45use object_store::{ObjectMeta, ObjectStore};
46use std::any::Any;
47use std::sync::Arc;
48
49use crate::materialized::cast_to_listing_table;
50
51/// A virtual file metadata table, inspired by the information schema column table.
52#[derive(Debug, Clone)]
53pub struct FileMetadata {
54    table_schema: SchemaRef,
55    catalog_list: Arc<dyn CatalogProviderList>,
56}
57
58impl FileMetadata {
59    /// Construct a new [`FileMetadata`] table provider that lists files for all
60    /// tables in the provided catalog list.
61    pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> Self {
62        Self {
63            table_schema: Arc::new(Schema::new(vec![
64                Field::new("table_catalog", DataType::Utf8, false),
65                Field::new("table_schema", DataType::Utf8, false),
66                Field::new("table_name", DataType::Utf8, false),
67                Field::new("file_path", DataType::Utf8, false),
68                Field::new("last_modified", DataType::Int64, false),
69                Field::new("size", DataType::UInt64, false),
70            ])),
71            catalog_list,
72        }
73    }
74}
75
76#[async_trait]
77impl TableProvider for FileMetadata {
78    fn as_any(&self) -> &dyn Any {
79        self
80    }
81
82    fn schema(&self) -> SchemaRef {
83        self.table_schema.clone()
84    }
85
86    fn table_type(&self) -> TableType {
87        TableType::Base
88    }
89
90    async fn scan(
91        &self,
92        session_state: &dyn Session,
93        projection: Option<&Vec<usize>>,
94        filters: &[Expr],
95        limit: Option<usize>,
96    ) -> Result<Arc<dyn ExecutionPlan>> {
97        let dfschema = self.table_schema.clone().to_dfschema()?;
98
99        let filters = filters
100            .iter()
101            .map(|expr| {
102                create_physical_expr(expr, &dfschema, session_state.execution_props())
103                    .map_err(|e| e.context("failed to create file metadata physical expr"))
104            })
105            .collect::<Result<Vec<_>, _>>()?;
106
107        let exec = FileMetadataExec::try_new(
108            self.table_schema.clone(),
109            projection.cloned(),
110            filters,
111            limit,
112            self.catalog_list.clone(),
113        )?;
114
115        Ok(Arc::new(exec))
116    }
117
118    fn supports_filters_pushdown(
119        &self,
120        filters: &[&Expr],
121    ) -> Result<Vec<TableProviderFilterPushDown>> {
122        Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
123    }
124}
125
126/// An [`ExecutionPlan`] that scans object store metadata.
127pub struct FileMetadataExec {
128    table_schema: SchemaRef,
129    plan_properties: PlanProperties,
130    projection: Option<Vec<usize>>,
131    filters: Vec<Arc<dyn PhysicalExpr>>,
132    limit: Option<usize>,
133    metrics: ExecutionPlanMetricsSet,
134    catalog_list: Arc<dyn CatalogProviderList>,
135}
136
137impl FileMetadataExec {
138    fn try_new(
139        table_schema: SchemaRef,
140        projection: Option<Vec<usize>>,
141        filters: Vec<Arc<dyn PhysicalExpr>>,
142        limit: Option<usize>,
143        catalog_list: Arc<dyn CatalogProviderList>,
144    ) -> Result<Self> {
145        let projected_schema = match projection.as_ref() {
146            Some(projection) => Arc::new(table_schema.project(projection)?),
147            None => table_schema.clone(),
148        };
149        let eq_properties = EquivalenceProperties::new(projected_schema);
150        let partitioning = Partitioning::UnknownPartitioning(1);
151        let execution_mode = ExecutionMode::Bounded;
152        let plan_properties = PlanProperties::new(eq_properties, partitioning, execution_mode);
153
154        let exec = Self {
155            table_schema,
156            plan_properties,
157            projection,
158            filters,
159            limit,
160            metrics: ExecutionPlanMetricsSet::new(),
161            catalog_list,
162        };
163
164        Ok(exec)
165    }
166}
167
168impl ExecutionPlan for FileMetadataExec {
169    fn as_any(&self) -> &dyn Any {
170        self
171    }
172
173    fn name(&self) -> &str {
174        "FileMetadataExec"
175    }
176
177    fn properties(&self) -> &PlanProperties {
178        &self.plan_properties
179    }
180
181    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
182        vec![]
183    }
184
185    fn with_new_children(
186        self: Arc<Self>,
187        _children: Vec<Arc<dyn ExecutionPlan>>,
188    ) -> Result<Arc<dyn ExecutionPlan>> {
189        Ok(self)
190    }
191
192    fn execute(
193        &self,
194        partition: usize,
195        context: Arc<TaskContext>,
196    ) -> Result<SendableRecordBatchStream> {
197        let projection = self.projection.clone();
198        let record_batches = self.build_record_batch(context)?;
199
200        let projected_record_batches = record_batches.map(move |record_batches| {
201            let record_batches = match record_batches {
202                Ok(record_batches) => record_batches,
203                Err(err) => return vec![Err(err)],
204            };
205
206            if let Some(projection) = projection {
207                return record_batches
208                    .into_iter()
209                    .map(|record_batch| {
210                        record_batch
211                            .project(&projection)
212                            .map_err(|e| DataFusionError::ArrowError(e, None))
213                    })
214                    .collect::<Vec<_>>();
215            }
216
217            record_batches.into_iter().map(Ok).collect::<Vec<_>>()
218        });
219
220        let mut record_batch_stream: SendableRecordBatchStream =
221            Box::pin(RecordBatchStreamAdapter::new(
222                self.schema(),
223                futures::stream::once(projected_record_batches)
224                    .map(stream::iter)
225                    .flatten(),
226            ));
227
228        if let Some(limit) = self.limit {
229            let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
230            let limit_stream =
231                LimitStream::new(record_batch_stream, 0, Some(limit), baseline_metrics);
232            record_batch_stream = Box::pin(limit_stream);
233        }
234
235        Ok(record_batch_stream)
236    }
237
238    fn metrics(&self) -> Option<MetricsSet> {
239        Some(self.metrics.clone_inner())
240    }
241}
242
243impl FileMetadataExec {
244    fn get_column_index(&self, column_name: &str) -> Result<usize> {
245        let (index, _) = self
246            .table_schema
247            .column_with_name(column_name)
248            .ok_or_else(|| {
249                DataFusionError::Internal(format!("column '{column_name}' does not exists"))
250            })?;
251        Ok(index)
252    }
253
254    /// Get the string literal value from an 'equals' BinaryExpr with a column.
255    fn get_column_literal(column_idx: usize, filter: &Arc<dyn PhysicalExpr>) -> Option<String> {
256        let binary_expr = filter.as_any().downcast_ref::<BinaryExpr>()?;
257
258        if !matches!(binary_expr.op(), Operator::Eq) {
259            return None;
260        }
261
262        let (column, literal) = if let Some(left_column) =
263            binary_expr.left().as_any().downcast_ref::<Column>()
264        {
265            let right_literal = binary_expr.right().as_any().downcast_ref::<Literal>()?;
266            (left_column, right_literal)
267        } else if let Some(right_column) = binary_expr.right().as_any().downcast_ref::<Column>() {
268            let left_literal = binary_expr.left().as_any().downcast_ref::<Literal>()?;
269            (right_column, left_literal)
270        } else {
271            return None;
272        };
273
274        if column.index() != column_idx {
275            return None;
276        }
277
278        match literal.value() {
279            ScalarValue::Utf8(str) => str.clone(),
280            ScalarValue::LargeUtf8(str) => str.clone(),
281            _ => None,
282        }
283    }
284
285    /// Builds a RecordBatch containing file metadata that satisfies the provided filters.
286    fn build_record_batch(
287        &self,
288        context: Arc<TaskContext>,
289    ) -> Result<impl Future<Output = Result<Vec<RecordBatch>>>> {
290        let catalog_column = self.get_column_index("table_catalog")?;
291        let schema_column = self.get_column_index("table_schema")?;
292        let table_column = self.get_column_index("table_name")?;
293
294        let catalog_name = self
295            .filters
296            .iter()
297            .filter_map(|filter| Self::get_column_literal(catalog_column, filter))
298            .next();
299
300        let schema_name = self
301            .filters
302            .iter()
303            .filter_map(|filter| Self::get_column_literal(schema_column, filter))
304            .next();
305
306        let table_name = self
307            .filters
308            .iter()
309            .filter_map(|filter| Self::get_column_literal(table_column, filter))
310            .next();
311
312        let table_schema = self.table_schema.clone();
313        let catalog_list = self.catalog_list.clone();
314
315        let record_batch = async move {
316            // If we cannot determine the catalog, build from the entire catalog list.
317            let catalog_name = match catalog_name {
318                Some(catalog_name) => catalog_name,
319                None => {
320                    debug!("No catalog filter exists, returning entire catalog list.");
321                    return FileMetadataBuilder::build_from_catalog_list(
322                        catalog_list,
323                        table_schema,
324                        context,
325                    )
326                    .await;
327                }
328            };
329
330            // If the specified catalog doesn't exist, return an empty result;
331            let catalog_provider = match catalog_list.catalog(&catalog_name) {
332                Some(catalog_provider) => catalog_provider,
333                None => {
334                    debug!("No catalog named '{catalog_name}' exists, returning an empty result.");
335                    return Ok(vec![]);
336                }
337            };
338
339            // If we cannot determine the schema, build from the catalog.
340            let schema_name = match schema_name {
341                Some(schema_name) => schema_name,
342                None => {
343                    debug!("No schema filter exists, returning catalog '{catalog_name}'.");
344                    return FileMetadataBuilder::build_from_catalog(
345                        &catalog_name,
346                        catalog_provider,
347                        table_schema,
348                        context,
349                    )
350                    .await;
351                }
352            };
353
354            // If the specified schema doesn't exist, return an empty result.
355            let schema_provider = match catalog_provider.schema(&schema_name) {
356                Some(schema_provider) => schema_provider,
357                None => {
358                    debug!("No schema named '{catalog_name}.{schema_name}' exists, returning an empty result.");
359                    return Ok(vec![]);
360                }
361            };
362
363            // If we cannot determine a table , build from the schema.
364            let table_name = match table_name {
365                Some(table_name) => table_name,
366                None => {
367                    debug!(
368                        "No table filter exists, returning schema '{catalog_name}.{schema_name}'."
369                    );
370                    return FileMetadataBuilder::build_from_schema(
371                        &catalog_name,
372                        &schema_name,
373                        schema_provider,
374                        table_schema,
375                        context,
376                    )
377                    .await;
378                }
379            };
380
381            // If the specified table doesn't exist, return an empty result;
382            let table_provider = match schema_provider.table(&table_name).await? {
383                Some(table_provider) => table_provider,
384                None => {
385                    debug!("No table named '{catalog_name}.{schema_name}.{table_name}' exists, returning an empty result.");
386                    return Ok(vec![]);
387                }
388            };
389
390            debug!("Returning table '{catalog_name}.{schema_name}.{table_name}'.");
391
392            let record_batch = FileMetadataBuilder::build_from_table(
393                &catalog_name,
394                &schema_name,
395                &table_name,
396                table_provider,
397                table_schema,
398                context,
399            )
400            .await?;
401
402            Ok(record_batch.into_iter().collect_vec())
403        };
404
405        Ok(record_batch)
406    }
407}
408
409impl std::fmt::Debug for FileMetadataExec {
410    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
411        f.debug_struct("FileMetadataExec")
412            .field("plan_properties", &self.plan_properties)
413            .field("filters", &self.filters)
414            .field("limit", &self.limit)
415            .finish_non_exhaustive()
416    }
417}
418
419impl DisplayAs for FileMetadataExec {
420    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
421        write!(f, "FileMetadataExec: ")?;
422
423        write!(f, "filters=[")?;
424        let mut filters = self.filters.iter().peekable();
425        while let Some(filter) = filters.next() {
426            std::fmt::Display::fmt(filter, f)?;
427            if filters.peek().is_some() {
428                write!(f, ", ")?;
429            }
430        }
431        write!(f, "]")?;
432
433        if let Some(limit) = &self.limit {
434            write!(f, ", limit={limit}")?;
435        }
436
437        Ok(())
438    }
439}
440
441struct FileMetadataBuilder {
442    schema: SchemaRef,
443    catalog_names: StringBuilder,
444    schema_names: StringBuilder,
445    table_names: StringBuilder,
446    file_paths: StringBuilder,
447    last_modified: Int64Builder,
448    size: UInt64Builder,
449}
450
451impl FileMetadataBuilder {
452    fn new(schema: SchemaRef) -> Self {
453        Self {
454            schema,
455            catalog_names: StringBuilder::new(),
456            schema_names: StringBuilder::new(),
457            table_names: StringBuilder::new(),
458            file_paths: StringBuilder::new(),
459            last_modified: Int64Builder::new(),
460            size: UInt64Builder::new(),
461        }
462    }
463
464    async fn build_from_catalog_list(
465        catalog_list: Arc<dyn CatalogProviderList>,
466        schema: SchemaRef,
467        context: Arc<TaskContext>,
468    ) -> Result<Vec<RecordBatch>> {
469        let mut tasks = vec![];
470
471        for catalog_name in catalog_list.catalog_names() {
472            let catalog_provider = match catalog_list.catalog(&catalog_name) {
473                Some(catalog_provider) => catalog_provider,
474                None => continue,
475            };
476            let schema = schema.clone();
477            let context = context.clone();
478
479            tasks.push(async move {
480                Self::build_from_catalog(&catalog_name, catalog_provider, schema, context).await
481            });
482        }
483
484        let results = future::join_all(tasks).await;
485
486        let record_batches = results
487            .into_iter()
488            .collect::<Result<Vec<_>, _>>()?
489            .into_iter()
490            .flatten()
491            .collect();
492
493        Ok(record_batches)
494    }
495
496    async fn build_from_catalog(
497        catalog_name: &str,
498        catalog_provider: Arc<dyn CatalogProvider>,
499        schema: SchemaRef,
500        context: Arc<TaskContext>,
501    ) -> Result<Vec<RecordBatch>> {
502        let mut tasks = vec![];
503
504        for schema_name in catalog_provider.schema_names() {
505            let schema_provider = match catalog_provider.schema(&schema_name) {
506                Some(schema_provider) => schema_provider,
507                None => continue,
508            };
509            let schema = schema.clone();
510            let context = context.clone();
511
512            tasks.push(async move {
513                Self::build_from_schema(
514                    catalog_name,
515                    &schema_name,
516                    schema_provider,
517                    schema,
518                    context,
519                )
520                .await
521            });
522        }
523
524        let results = future::join_all(tasks).await;
525
526        let record_batches = results
527            .into_iter()
528            .collect::<Result<Vec<_>, _>>()?
529            .into_iter()
530            .flatten()
531            .collect();
532
533        Ok(record_batches)
534    }
535
536    async fn build_from_schema(
537        catalog_name: &str,
538        schema_name: &str,
539        schema_provider: Arc<dyn SchemaProvider>,
540        schema: SchemaRef,
541        context: Arc<TaskContext>,
542    ) -> Result<Vec<RecordBatch>> {
543        let mut tasks = vec![];
544
545        for table_name in schema_provider.table_names() {
546            let table_provider = match schema_provider.table(&table_name).await? {
547                Some(table_provider) => table_provider,
548                None => continue,
549            };
550            let schema = schema.clone();
551            let context = context.clone();
552
553            tasks.push(async move {
554                Self::build_from_table(
555                    catalog_name,
556                    schema_name,
557                    &table_name,
558                    table_provider,
559                    schema,
560                    context,
561                )
562                .await
563            })
564        }
565
566        let results = future::join_all(tasks).await;
567        let record_batches = results
568            .into_iter()
569            .collect::<Result<Vec<_>, _>>()?
570            .into_iter()
571            .flatten()
572            .collect();
573
574        Ok(record_batches)
575    }
576
577    async fn build_from_table(
578        catalog_name: &str,
579        schema_name: &str,
580        table_name: &str,
581        table_provider: Arc<dyn TableProvider>,
582        schema: SchemaRef,
583        context: Arc<TaskContext>,
584    ) -> Result<Option<RecordBatch>> {
585        let mut builder = Self::new(schema.clone());
586
587        let listing_table_like = match cast_to_listing_table(table_provider.as_ref()) {
588            None => return Ok(None),
589            Some(t) => t,
590        };
591
592        let table_paths = listing_table_like.table_paths();
593        let file_extension = listing_table_like.file_ext();
594
595        for table_path in table_paths {
596            builder
597                .read_table_files(
598                    catalog_name,
599                    schema_name,
600                    table_name,
601                    &table_path,
602                    &file_extension,
603                    &context,
604                )
605                .await?;
606        }
607
608        builder.finish().map(Some)
609    }
610
611    async fn read_table_files(
612        &mut self,
613        catalog_name: &str,
614        schema_name: &str,
615        table_name: &str,
616        table_path: &ListingTableUrl,
617        file_ext: &str,
618        context: &TaskContext,
619    ) -> Result<()> {
620        let store_url = table_path.object_store();
621        let store = context.runtime_env().object_store(table_path)?;
622
623        let mut file_stream = list_all_files(
624            store.as_ref(),
625            table_path,
626            file_ext,
627            context
628                .session_config()
629                .options()
630                .execution
631                .listing_table_ignore_subdirectory,
632        )
633        .await;
634
635        while let Some(file_meta) = file_stream.try_next().await? {
636            self.append(
637                catalog_name,
638                schema_name,
639                table_name,
640                &store_url,
641                &file_meta,
642            );
643        }
644
645        Ok(())
646    }
647
648    fn append(
649        &mut self,
650        catalog_name: &str,
651        schema_name: &str,
652        table_name: &str,
653        store_url: &ObjectStoreUrl,
654        meta: &ObjectMeta,
655    ) {
656        self.catalog_names.append_value(catalog_name);
657        self.schema_names.append_value(schema_name);
658        self.table_names.append_value(table_name);
659        self.file_paths
660            .append_value(format!("{store_url}{}", meta.location));
661        self.last_modified
662            .append_option(meta.last_modified.timestamp_nanos_opt());
663        self.size.append_value(meta.size as u64); // this is not lossy assuming we're on a 64-bit platform
664    }
665
666    fn finish(mut self) -> Result<RecordBatch> {
667        RecordBatch::try_new(
668            self.schema,
669            vec![
670                Arc::new(self.catalog_names.finish()),
671                Arc::new(self.schema_names.finish()),
672                Arc::new(self.table_names.finish()),
673                Arc::new(self.file_paths.finish()),
674                Arc::new(self.last_modified.finish()),
675                Arc::new(self.size.finish()),
676            ],
677        )
678        .map_err(From::from)
679    }
680}
681
682// Mostly copied from ListingTableUrl::list_all_files, which is private to that crate
683// Modified to handle empty tables
684async fn list_all_files<'a>(
685    store: &'a dyn ObjectStore,
686    url: &'a ListingTableUrl,
687    file_extension: &'a str,
688    ignore_subdirectory: bool,
689) -> BoxStream<'a, Result<ObjectMeta>> {
690    // Check if the directory exists yet
691    if let Err(object_store::Error::NotFound { path, .. }) =
692        store.list_with_delimiter(Some(url.prefix())).await
693    {
694        debug!(
695                "attempted to list empty table at {path} during file_metadata listing, returning empty list"
696            );
697        return Box::pin(stream::empty());
698    }
699
700    let is_dir = url.as_str().ends_with('/');
701    let list = match is_dir {
702        true => store.list(Some(url.prefix())),
703        false => futures::stream::once(store.head(url.prefix())).boxed(),
704    };
705
706    list.map_err(Into::into)
707        .try_filter(move |meta| {
708            let path = &meta.location;
709            let extension_match = path.as_ref().ends_with(file_extension);
710            let glob_match = url.contains(path, ignore_subdirectory);
711            futures::future::ready(extension_match && glob_match)
712        })
713        .boxed()
714}
715
716#[cfg(test)]
717mod test {
718    use std::{ops::Deref, sync::Arc};
719
720    use anyhow::{Context, Result};
721    use datafusion::{
722        assert_batches_sorted_eq,
723        catalog_common::{MemoryCatalogProvider, MemorySchemaProvider},
724        execution::{
725            object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
726            runtime_env::RuntimeEnvBuilder,
727        },
728        prelude::{SessionConfig, SessionContext},
729    };
730    use object_store::local::LocalFileSystem;
731    use tempfile::TempDir;
732    use url::Url;
733
734    use super::FileMetadata;
735
736    struct TestContext {
737        _dir: TempDir,
738        ctx: SessionContext,
739    }
740
741    impl Deref for TestContext {
742        type Target = SessionContext;
743
744        fn deref(&self) -> &Self::Target {
745            &self.ctx
746        }
747    }
748
749    async fn setup() -> Result<TestContext> {
750        let _ = env_logger::builder().is_test(true).try_init();
751
752        let dir = TempDir::new().context("create tempdir")?;
753        let store = LocalFileSystem::new_with_prefix(&dir)
754            .map(Arc::new)
755            .context("create local file system object store")?;
756
757        let registry = Arc::new(DefaultObjectStoreRegistry::new());
758        registry
759            .register_store(&Url::parse("file://").unwrap(), store)
760            .context("register file system store")
761            .expect("should replace existing object store at file://");
762
763        let ctx = SessionContext::new_with_config_rt(
764            SessionConfig::new(),
765            RuntimeEnvBuilder::new()
766                .with_object_store_registry(registry)
767                .build_arc()
768                .context("create RuntimeEnv")?,
769        );
770
771        ctx.catalog("datafusion")
772            .context("get default catalog")?
773            .register_schema("private", Arc::<MemorySchemaProvider>::default())
774            .context("register datafusion.private schema")?;
775
776        ctx.register_catalog("datafusion_mv", Arc::<MemoryCatalogProvider>::default());
777
778        ctx.catalog("datafusion_mv")
779            .context("get datafusion_mv catalog")?
780            .register_schema("public", Arc::<MemorySchemaProvider>::default())
781            .context("register datafusion_mv.public schema")?;
782
783        ctx.sql(
784            "
785            CREATE EXTERNAL TABLE t1 (num INTEGER, year TEXT)
786            STORED AS CSV
787            PARTITIONED BY (year)
788            LOCATION 'file:///t1/'
789        ",
790        )
791        .await?
792        .collect()
793        .await?;
794
795        ctx.sql(
796            "INSERT INTO t1 VALUES 
797            (1, '2021'),
798            (2, '2022'),
799            (3, '2023'),
800            (4, '2024')
801        ",
802        )
803        .await?
804        .collect()
805        .await?;
806
807        ctx.sql(
808            "
809            CREATE EXTERNAL TABLE private.t1 (num INTEGER, year TEXT, month TEXT)
810            STORED AS CSV
811            PARTITIONED BY (year, month)
812            LOCATION 'file:///private/t1/'
813        ",
814        )
815        .await?
816        .collect()
817        .await?;
818
819        ctx.sql(
820            "INSERT INTO private.t1 VALUES 
821            (1, '2021', '01'),
822            (2, '2022', '02'),
823            (3, '2023', '03'),
824            (4, '2024', '04')
825        ",
826        )
827        .await?
828        .collect()
829        .await?;
830
831        ctx.sql(
832            "
833            CREATE EXTERNAL TABLE datafusion_mv.public.t3 (num INTEGER, date DATE)
834            STORED AS CSV
835            PARTITIONED BY (date)
836            LOCATION 'file:///datafusion_mv/public/t3/'
837        ",
838        )
839        .await?
840        .collect()
841        .await?;
842
843        ctx.sql(
844            "INSERT INTO datafusion_mv.public.t3 VALUES 
845            (1, '2021-01-01'),
846            (2, '2022-02-02'),
847            (3, '2023-03-03'),
848            (4, '2024-04-04')
849        ",
850        )
851        .await?
852        .collect()
853        .await?;
854
855        ctx.register_table(
856            "file_metadata",
857            Arc::new(FileMetadata::new(Arc::clone(ctx.state().catalog_list()))),
858        )
859        .context("register file metadata table")?;
860
861        ctx.sql(
862            // Remove timestamps and trim (randomly generated) file names since they're not stable in tests
863            "CREATE VIEW file_metadata_test_view AS SELECT
864                * EXCLUDE(file_path, last_modified), 
865                regexp_replace(file_path, '/[^/]*$', '/') AS file_path 
866            FROM file_metadata",
867        )
868        .await
869        .context("create file metadata test view")?;
870
871        Ok(TestContext { _dir: dir, ctx })
872    }
873
874    #[tokio::test]
875    async fn test_list_all_files() -> Result<()> {
876        let ctx = setup().await.context("setup")?;
877
878        let results = ctx
879            .sql("SELECT * FROM file_metadata_test_view")
880            .await?
881            .collect()
882            .await?;
883
884        assert_batches_sorted_eq!(&[
885            "+---------------+--------------+------------+------+--------------------------------------------------+",
886            "| table_catalog | table_schema | table_name | size | file_path                                        |",
887            "+---------------+--------------+------------+------+--------------------------------------------------+",
888            "| datafusion    | private      | t1         | 6    | file:///private/t1/year=2021/month=01/           |",
889            "| datafusion    | private      | t1         | 6    | file:///private/t1/year=2022/month=02/           |",
890            "| datafusion    | private      | t1         | 6    | file:///private/t1/year=2023/month=03/           |",
891            "| datafusion    | private      | t1         | 6    | file:///private/t1/year=2024/month=04/           |",
892            "| datafusion    | public       | t1         | 6    | file:///t1/year=2021/                            |",
893            "| datafusion    | public       | t1         | 6    | file:///t1/year=2022/                            |",
894            "| datafusion    | public       | t1         | 6    | file:///t1/year=2023/                            |",
895            "| datafusion    | public       | t1         | 6    | file:///t1/year=2024/                            |",
896            "| datafusion_mv | public       | t3         | 6    | file:///datafusion_mv/public/t3/date=2021-01-01/ |",
897            "| datafusion_mv | public       | t3         | 6    | file:///datafusion_mv/public/t3/date=2022-02-02/ |",
898            "| datafusion_mv | public       | t3         | 6    | file:///datafusion_mv/public/t3/date=2023-03-03/ |",
899            "| datafusion_mv | public       | t3         | 6    | file:///datafusion_mv/public/t3/date=2024-04-04/ |",
900            "+---------------+--------------+------------+------+--------------------------------------------------+",
901        ], &results);
902
903        Ok(())
904    }
905
906    #[tokio::test]
907    async fn test_list_catalog() -> Result<()> {
908        let ctx = setup().await.context("setup")?;
909
910        let results = ctx
911            .sql(
912                "SELECT * FROM file_metadata_test_view
913                WHERE table_catalog = 'datafusion_mv'",
914            )
915            .await?
916            .collect()
917            .await?;
918
919        assert_batches_sorted_eq!(&[
920            "+---------------+--------------+------------+------+--------------------------------------------------+",
921            "| table_catalog | table_schema | table_name | size | file_path                                        |",
922            "+---------------+--------------+------------+------+--------------------------------------------------+",
923            "| datafusion_mv | public       | t3         | 6    | file:///datafusion_mv/public/t3/date=2021-01-01/ |",
924            "| datafusion_mv | public       | t3         | 6    | file:///datafusion_mv/public/t3/date=2022-02-02/ |",
925            "| datafusion_mv | public       | t3         | 6    | file:///datafusion_mv/public/t3/date=2023-03-03/ |",
926            "| datafusion_mv | public       | t3         | 6    | file:///datafusion_mv/public/t3/date=2024-04-04/ |",
927            "+---------------+--------------+------------+------+--------------------------------------------------+",
928        ], &results);
929
930        Ok(())
931    }
932
933    #[tokio::test]
934    async fn test_list_catalog_and_schema() -> Result<()> {
935        let ctx = setup().await.context("setup")?;
936
937        let results = ctx
938            .sql(
939                "SELECT * FROM file_metadata_test_view
940                WHERE table_catalog = 'datafusion' AND table_schema = 'private'",
941            )
942            .await?
943            .collect()
944            .await?;
945
946        assert_batches_sorted_eq!(&[
947            "+---------------+--------------+------------+------+----------------------------------------+",
948            "| table_catalog | table_schema | table_name | size | file_path                              |",
949            "+---------------+--------------+------------+------+----------------------------------------+",
950            "| datafusion    | private      | t1         | 6    | file:///private/t1/year=2021/month=01/ |",
951            "| datafusion    | private      | t1         | 6    | file:///private/t1/year=2022/month=02/ |",
952            "| datafusion    | private      | t1         | 6    | file:///private/t1/year=2023/month=03/ |",
953            "| datafusion    | private      | t1         | 6    | file:///private/t1/year=2024/month=04/ |",
954            "+---------------+--------------+------------+------+----------------------------------------+",
955        ], &results);
956
957        Ok(())
958    }
959
960    #[tokio::test]
961    async fn test_list_schema_only() -> Result<()> {
962        let ctx = setup().await.context("setup")?;
963
964        let results = ctx
965            .sql(
966                "SELECT * FROM file_metadata_test_view
967                WHERE table_schema = 'public'",
968            )
969            .await?
970            .collect()
971            .await?;
972
973        assert_batches_sorted_eq!(&[
974            "+---------------+--------------+------------+------+--------------------------------------------------+",
975            "| table_catalog | table_schema | table_name | size | file_path                                        |",
976            "+---------------+--------------+------------+------+--------------------------------------------------+",
977            "| datafusion    | public       | t1         | 6    | file:///t1/year=2021/                            |",
978            "| datafusion    | public       | t1         | 6    | file:///t1/year=2022/                            |",
979            "| datafusion    | public       | t1         | 6    | file:///t1/year=2023/                            |",
980            "| datafusion    | public       | t1         | 6    | file:///t1/year=2024/                            |",
981            "| datafusion_mv | public       | t3         | 6    | file:///datafusion_mv/public/t3/date=2021-01-01/ |",
982            "| datafusion_mv | public       | t3         | 6    | file:///datafusion_mv/public/t3/date=2022-02-02/ |",
983            "| datafusion_mv | public       | t3         | 6    | file:///datafusion_mv/public/t3/date=2023-03-03/ |",
984            "| datafusion_mv | public       | t3         | 6    | file:///datafusion_mv/public/t3/date=2024-04-04/ |",
985            "+---------------+--------------+------------+------+--------------------------------------------------+",
986        ], &results);
987
988        Ok(())
989    }
990
991    #[tokio::test]
992    async fn test_list_catalog_schema_and_table() -> Result<()> {
993        let ctx = setup().await.context("setup")?;
994
995        let results = ctx
996            .sql(
997                "SELECT * FROM file_metadata_test_view
998                WHERE table_catalog = 'datafusion' AND table_schema = 'public' AND table_name = 't1'",
999            )
1000            .await?
1001            .collect()
1002            .await?;
1003
1004        assert_batches_sorted_eq!(
1005            &[
1006                "+---------------+--------------+------------+------+-----------------------+",
1007                "| table_catalog | table_schema | table_name | size | file_path             |",
1008                "+---------------+--------------+------------+------+-----------------------+",
1009                "| datafusion    | public       | t1         | 6    | file:///t1/year=2021/ |",
1010                "| datafusion    | public       | t1         | 6    | file:///t1/year=2022/ |",
1011                "| datafusion    | public       | t1         | 6    | file:///t1/year=2023/ |",
1012                "| datafusion    | public       | t1         | 6    | file:///t1/year=2024/ |",
1013                "+---------------+--------------+------------+------+-----------------------+",
1014            ],
1015            &results
1016        );
1017
1018        Ok(())
1019    }
1020
1021    #[tokio::test]
1022    async fn test_list_table_only() -> Result<()> {
1023        let ctx = setup().await.context("setup")?;
1024
1025        let results = ctx
1026            .sql(
1027                "SELECT * FROM file_metadata_test_view
1028                WHERE table_name = 't1'",
1029            )
1030            .await?
1031            .collect()
1032            .await?;
1033
1034        assert_batches_sorted_eq!(
1035            &[
1036                "+---------------+--------------+------------+------+----------------------------------------+",
1037                "| table_catalog | table_schema | table_name | size | file_path                              |",
1038                "+---------------+--------------+------------+------+----------------------------------------+",
1039                "| datafusion    | private      | t1         | 6    | file:///private/t1/year=2021/month=01/ |",
1040                "| datafusion    | private      | t1         | 6    | file:///private/t1/year=2022/month=02/ |",
1041                "| datafusion    | private      | t1         | 6    | file:///private/t1/year=2023/month=03/ |",
1042                "| datafusion    | private      | t1         | 6    | file:///private/t1/year=2024/month=04/ |",
1043                "| datafusion    | public       | t1         | 6    | file:///t1/year=2021/                  |",
1044                "| datafusion    | public       | t1         | 6    | file:///t1/year=2022/                  |",
1045                "| datafusion    | public       | t1         | 6    | file:///t1/year=2023/                  |",
1046                "| datafusion    | public       | t1         | 6    | file:///t1/year=2024/                  |",
1047                "+---------------+--------------+------------+------+----------------------------------------+",
1048            ],
1049            &results
1050        );
1051
1052        Ok(())
1053    }
1054}