1use arrow::array::{Int64Builder, StringBuilder, UInt64Builder};
19use arrow::record_batch::RecordBatch;
20use arrow_schema::{DataType, Field, Schema, SchemaRef};
21use async_trait::async_trait;
22use datafusion::catalog::SchemaProvider;
23use datafusion::catalog::{CatalogProvider, Session};
24use datafusion::datasource::listing::ListingTableUrl;
25use datafusion::datasource::TableProvider;
26use datafusion::execution::object_store::ObjectStoreUrl;
27use datafusion::physical_expr::{create_physical_expr, EquivalenceProperties};
28use datafusion::physical_plan::expressions::{BinaryExpr, Column, Literal};
29use datafusion::physical_plan::limit::LimitStream;
30use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
31use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
32use datafusion::physical_plan::{
33 DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PhysicalExpr,
34 PlanProperties,
35};
36use datafusion::{
37 catalog::CatalogProviderList, execution::TaskContext, physical_plan::SendableRecordBatchStream,
38};
39use datafusion_common::{DataFusionError, Result, ScalarValue, ToDFSchema};
40use datafusion_expr::{Expr, Operator, TableProviderFilterPushDown, TableType};
41use futures::stream::{self, BoxStream};
42use futures::{future, Future, FutureExt, StreamExt, TryStreamExt};
43use itertools::Itertools;
44use log::debug;
45use object_store::{ObjectMeta, ObjectStore};
46use std::any::Any;
47use std::sync::Arc;
48
49use crate::materialized::cast_to_listing_table;
50
51#[derive(Debug, Clone)]
53pub struct FileMetadata {
54 table_schema: SchemaRef,
55 catalog_list: Arc<dyn CatalogProviderList>,
56}
57
58impl FileMetadata {
59 pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> Self {
62 Self {
63 table_schema: Arc::new(Schema::new(vec![
64 Field::new("table_catalog", DataType::Utf8, false),
65 Field::new("table_schema", DataType::Utf8, false),
66 Field::new("table_name", DataType::Utf8, false),
67 Field::new("file_path", DataType::Utf8, false),
68 Field::new("last_modified", DataType::Int64, false),
69 Field::new("size", DataType::UInt64, false),
70 ])),
71 catalog_list,
72 }
73 }
74}
75
76#[async_trait]
77impl TableProvider for FileMetadata {
78 fn as_any(&self) -> &dyn Any {
79 self
80 }
81
82 fn schema(&self) -> SchemaRef {
83 self.table_schema.clone()
84 }
85
86 fn table_type(&self) -> TableType {
87 TableType::Base
88 }
89
90 async fn scan(
91 &self,
92 session_state: &dyn Session,
93 projection: Option<&Vec<usize>>,
94 filters: &[Expr],
95 limit: Option<usize>,
96 ) -> Result<Arc<dyn ExecutionPlan>> {
97 let dfschema = self.table_schema.clone().to_dfschema()?;
98
99 let filters = filters
100 .iter()
101 .map(|expr| {
102 create_physical_expr(expr, &dfschema, session_state.execution_props())
103 .map_err(|e| e.context("failed to create file metadata physical expr"))
104 })
105 .collect::<Result<Vec<_>, _>>()?;
106
107 let exec = FileMetadataExec::try_new(
108 self.table_schema.clone(),
109 projection.cloned(),
110 filters,
111 limit,
112 self.catalog_list.clone(),
113 )?;
114
115 Ok(Arc::new(exec))
116 }
117
118 fn supports_filters_pushdown(
119 &self,
120 filters: &[&Expr],
121 ) -> Result<Vec<TableProviderFilterPushDown>> {
122 Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
123 }
124}
125
126pub struct FileMetadataExec {
128 table_schema: SchemaRef,
129 plan_properties: PlanProperties,
130 projection: Option<Vec<usize>>,
131 filters: Vec<Arc<dyn PhysicalExpr>>,
132 limit: Option<usize>,
133 metrics: ExecutionPlanMetricsSet,
134 catalog_list: Arc<dyn CatalogProviderList>,
135}
136
137impl FileMetadataExec {
138 fn try_new(
139 table_schema: SchemaRef,
140 projection: Option<Vec<usize>>,
141 filters: Vec<Arc<dyn PhysicalExpr>>,
142 limit: Option<usize>,
143 catalog_list: Arc<dyn CatalogProviderList>,
144 ) -> Result<Self> {
145 let projected_schema = match projection.as_ref() {
146 Some(projection) => Arc::new(table_schema.project(projection)?),
147 None => table_schema.clone(),
148 };
149 let eq_properties = EquivalenceProperties::new(projected_schema);
150 let partitioning = Partitioning::UnknownPartitioning(1);
151 let execution_mode = ExecutionMode::Bounded;
152 let plan_properties = PlanProperties::new(eq_properties, partitioning, execution_mode);
153
154 let exec = Self {
155 table_schema,
156 plan_properties,
157 projection,
158 filters,
159 limit,
160 metrics: ExecutionPlanMetricsSet::new(),
161 catalog_list,
162 };
163
164 Ok(exec)
165 }
166}
167
168impl ExecutionPlan for FileMetadataExec {
169 fn as_any(&self) -> &dyn Any {
170 self
171 }
172
173 fn name(&self) -> &str {
174 "FileMetadataExec"
175 }
176
177 fn properties(&self) -> &PlanProperties {
178 &self.plan_properties
179 }
180
181 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
182 vec![]
183 }
184
185 fn with_new_children(
186 self: Arc<Self>,
187 _children: Vec<Arc<dyn ExecutionPlan>>,
188 ) -> Result<Arc<dyn ExecutionPlan>> {
189 Ok(self)
190 }
191
192 fn execute(
193 &self,
194 partition: usize,
195 context: Arc<TaskContext>,
196 ) -> Result<SendableRecordBatchStream> {
197 let projection = self.projection.clone();
198 let record_batches = self.build_record_batch(context)?;
199
200 let projected_record_batches = record_batches.map(move |record_batches| {
201 let record_batches = match record_batches {
202 Ok(record_batches) => record_batches,
203 Err(err) => return vec![Err(err)],
204 };
205
206 if let Some(projection) = projection {
207 return record_batches
208 .into_iter()
209 .map(|record_batch| {
210 record_batch
211 .project(&projection)
212 .map_err(|e| DataFusionError::ArrowError(e, None))
213 })
214 .collect::<Vec<_>>();
215 }
216
217 record_batches.into_iter().map(Ok).collect::<Vec<_>>()
218 });
219
220 let mut record_batch_stream: SendableRecordBatchStream =
221 Box::pin(RecordBatchStreamAdapter::new(
222 self.schema(),
223 futures::stream::once(projected_record_batches)
224 .map(stream::iter)
225 .flatten(),
226 ));
227
228 if let Some(limit) = self.limit {
229 let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
230 let limit_stream =
231 LimitStream::new(record_batch_stream, 0, Some(limit), baseline_metrics);
232 record_batch_stream = Box::pin(limit_stream);
233 }
234
235 Ok(record_batch_stream)
236 }
237
238 fn metrics(&self) -> Option<MetricsSet> {
239 Some(self.metrics.clone_inner())
240 }
241}
242
243impl FileMetadataExec {
244 fn get_column_index(&self, column_name: &str) -> Result<usize> {
245 let (index, _) = self
246 .table_schema
247 .column_with_name(column_name)
248 .ok_or_else(|| {
249 DataFusionError::Internal(format!("column '{column_name}' does not exists"))
250 })?;
251 Ok(index)
252 }
253
254 fn get_column_literal(column_idx: usize, filter: &Arc<dyn PhysicalExpr>) -> Option<String> {
256 let binary_expr = filter.as_any().downcast_ref::<BinaryExpr>()?;
257
258 if !matches!(binary_expr.op(), Operator::Eq) {
259 return None;
260 }
261
262 let (column, literal) = if let Some(left_column) =
263 binary_expr.left().as_any().downcast_ref::<Column>()
264 {
265 let right_literal = binary_expr.right().as_any().downcast_ref::<Literal>()?;
266 (left_column, right_literal)
267 } else if let Some(right_column) = binary_expr.right().as_any().downcast_ref::<Column>() {
268 let left_literal = binary_expr.left().as_any().downcast_ref::<Literal>()?;
269 (right_column, left_literal)
270 } else {
271 return None;
272 };
273
274 if column.index() != column_idx {
275 return None;
276 }
277
278 match literal.value() {
279 ScalarValue::Utf8(str) => str.clone(),
280 ScalarValue::LargeUtf8(str) => str.clone(),
281 _ => None,
282 }
283 }
284
285 fn build_record_batch(
287 &self,
288 context: Arc<TaskContext>,
289 ) -> Result<impl Future<Output = Result<Vec<RecordBatch>>>> {
290 let catalog_column = self.get_column_index("table_catalog")?;
291 let schema_column = self.get_column_index("table_schema")?;
292 let table_column = self.get_column_index("table_name")?;
293
294 let catalog_name = self
295 .filters
296 .iter()
297 .filter_map(|filter| Self::get_column_literal(catalog_column, filter))
298 .next();
299
300 let schema_name = self
301 .filters
302 .iter()
303 .filter_map(|filter| Self::get_column_literal(schema_column, filter))
304 .next();
305
306 let table_name = self
307 .filters
308 .iter()
309 .filter_map(|filter| Self::get_column_literal(table_column, filter))
310 .next();
311
312 let table_schema = self.table_schema.clone();
313 let catalog_list = self.catalog_list.clone();
314
315 let record_batch = async move {
316 let catalog_name = match catalog_name {
318 Some(catalog_name) => catalog_name,
319 None => {
320 debug!("No catalog filter exists, returning entire catalog list.");
321 return FileMetadataBuilder::build_from_catalog_list(
322 catalog_list,
323 table_schema,
324 context,
325 )
326 .await;
327 }
328 };
329
330 let catalog_provider = match catalog_list.catalog(&catalog_name) {
332 Some(catalog_provider) => catalog_provider,
333 None => {
334 debug!("No catalog named '{catalog_name}' exists, returning an empty result.");
335 return Ok(vec![]);
336 }
337 };
338
339 let schema_name = match schema_name {
341 Some(schema_name) => schema_name,
342 None => {
343 debug!("No schema filter exists, returning catalog '{catalog_name}'.");
344 return FileMetadataBuilder::build_from_catalog(
345 &catalog_name,
346 catalog_provider,
347 table_schema,
348 context,
349 )
350 .await;
351 }
352 };
353
354 let schema_provider = match catalog_provider.schema(&schema_name) {
356 Some(schema_provider) => schema_provider,
357 None => {
358 debug!("No schema named '{catalog_name}.{schema_name}' exists, returning an empty result.");
359 return Ok(vec![]);
360 }
361 };
362
363 let table_name = match table_name {
365 Some(table_name) => table_name,
366 None => {
367 debug!(
368 "No table filter exists, returning schema '{catalog_name}.{schema_name}'."
369 );
370 return FileMetadataBuilder::build_from_schema(
371 &catalog_name,
372 &schema_name,
373 schema_provider,
374 table_schema,
375 context,
376 )
377 .await;
378 }
379 };
380
381 let table_provider = match schema_provider.table(&table_name).await? {
383 Some(table_provider) => table_provider,
384 None => {
385 debug!("No table named '{catalog_name}.{schema_name}.{table_name}' exists, returning an empty result.");
386 return Ok(vec![]);
387 }
388 };
389
390 debug!("Returning table '{catalog_name}.{schema_name}.{table_name}'.");
391
392 let record_batch = FileMetadataBuilder::build_from_table(
393 &catalog_name,
394 &schema_name,
395 &table_name,
396 table_provider,
397 table_schema,
398 context,
399 )
400 .await?;
401
402 Ok(record_batch.into_iter().collect_vec())
403 };
404
405 Ok(record_batch)
406 }
407}
408
409impl std::fmt::Debug for FileMetadataExec {
410 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
411 f.debug_struct("FileMetadataExec")
412 .field("plan_properties", &self.plan_properties)
413 .field("filters", &self.filters)
414 .field("limit", &self.limit)
415 .finish_non_exhaustive()
416 }
417}
418
419impl DisplayAs for FileMetadataExec {
420 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
421 write!(f, "FileMetadataExec: ")?;
422
423 write!(f, "filters=[")?;
424 let mut filters = self.filters.iter().peekable();
425 while let Some(filter) = filters.next() {
426 std::fmt::Display::fmt(filter, f)?;
427 if filters.peek().is_some() {
428 write!(f, ", ")?;
429 }
430 }
431 write!(f, "]")?;
432
433 if let Some(limit) = &self.limit {
434 write!(f, ", limit={limit}")?;
435 }
436
437 Ok(())
438 }
439}
440
441struct FileMetadataBuilder {
442 schema: SchemaRef,
443 catalog_names: StringBuilder,
444 schema_names: StringBuilder,
445 table_names: StringBuilder,
446 file_paths: StringBuilder,
447 last_modified: Int64Builder,
448 size: UInt64Builder,
449}
450
451impl FileMetadataBuilder {
452 fn new(schema: SchemaRef) -> Self {
453 Self {
454 schema,
455 catalog_names: StringBuilder::new(),
456 schema_names: StringBuilder::new(),
457 table_names: StringBuilder::new(),
458 file_paths: StringBuilder::new(),
459 last_modified: Int64Builder::new(),
460 size: UInt64Builder::new(),
461 }
462 }
463
464 async fn build_from_catalog_list(
465 catalog_list: Arc<dyn CatalogProviderList>,
466 schema: SchemaRef,
467 context: Arc<TaskContext>,
468 ) -> Result<Vec<RecordBatch>> {
469 let mut tasks = vec![];
470
471 for catalog_name in catalog_list.catalog_names() {
472 let catalog_provider = match catalog_list.catalog(&catalog_name) {
473 Some(catalog_provider) => catalog_provider,
474 None => continue,
475 };
476 let schema = schema.clone();
477 let context = context.clone();
478
479 tasks.push(async move {
480 Self::build_from_catalog(&catalog_name, catalog_provider, schema, context).await
481 });
482 }
483
484 let results = future::join_all(tasks).await;
485
486 let record_batches = results
487 .into_iter()
488 .collect::<Result<Vec<_>, _>>()?
489 .into_iter()
490 .flatten()
491 .collect();
492
493 Ok(record_batches)
494 }
495
496 async fn build_from_catalog(
497 catalog_name: &str,
498 catalog_provider: Arc<dyn CatalogProvider>,
499 schema: SchemaRef,
500 context: Arc<TaskContext>,
501 ) -> Result<Vec<RecordBatch>> {
502 let mut tasks = vec![];
503
504 for schema_name in catalog_provider.schema_names() {
505 let schema_provider = match catalog_provider.schema(&schema_name) {
506 Some(schema_provider) => schema_provider,
507 None => continue,
508 };
509 let schema = schema.clone();
510 let context = context.clone();
511
512 tasks.push(async move {
513 Self::build_from_schema(
514 catalog_name,
515 &schema_name,
516 schema_provider,
517 schema,
518 context,
519 )
520 .await
521 });
522 }
523
524 let results = future::join_all(tasks).await;
525
526 let record_batches = results
527 .into_iter()
528 .collect::<Result<Vec<_>, _>>()?
529 .into_iter()
530 .flatten()
531 .collect();
532
533 Ok(record_batches)
534 }
535
536 async fn build_from_schema(
537 catalog_name: &str,
538 schema_name: &str,
539 schema_provider: Arc<dyn SchemaProvider>,
540 schema: SchemaRef,
541 context: Arc<TaskContext>,
542 ) -> Result<Vec<RecordBatch>> {
543 let mut tasks = vec![];
544
545 for table_name in schema_provider.table_names() {
546 let table_provider = match schema_provider.table(&table_name).await? {
547 Some(table_provider) => table_provider,
548 None => continue,
549 };
550 let schema = schema.clone();
551 let context = context.clone();
552
553 tasks.push(async move {
554 Self::build_from_table(
555 catalog_name,
556 schema_name,
557 &table_name,
558 table_provider,
559 schema,
560 context,
561 )
562 .await
563 })
564 }
565
566 let results = future::join_all(tasks).await;
567 let record_batches = results
568 .into_iter()
569 .collect::<Result<Vec<_>, _>>()?
570 .into_iter()
571 .flatten()
572 .collect();
573
574 Ok(record_batches)
575 }
576
577 async fn build_from_table(
578 catalog_name: &str,
579 schema_name: &str,
580 table_name: &str,
581 table_provider: Arc<dyn TableProvider>,
582 schema: SchemaRef,
583 context: Arc<TaskContext>,
584 ) -> Result<Option<RecordBatch>> {
585 let mut builder = Self::new(schema.clone());
586
587 let listing_table_like = match cast_to_listing_table(table_provider.as_ref()) {
588 None => return Ok(None),
589 Some(t) => t,
590 };
591
592 let table_paths = listing_table_like.table_paths();
593 let file_extension = listing_table_like.file_ext();
594
595 for table_path in table_paths {
596 builder
597 .read_table_files(
598 catalog_name,
599 schema_name,
600 table_name,
601 &table_path,
602 &file_extension,
603 &context,
604 )
605 .await?;
606 }
607
608 builder.finish().map(Some)
609 }
610
611 async fn read_table_files(
612 &mut self,
613 catalog_name: &str,
614 schema_name: &str,
615 table_name: &str,
616 table_path: &ListingTableUrl,
617 file_ext: &str,
618 context: &TaskContext,
619 ) -> Result<()> {
620 let store_url = table_path.object_store();
621 let store = context.runtime_env().object_store(table_path)?;
622
623 let mut file_stream = list_all_files(
624 store.as_ref(),
625 table_path,
626 file_ext,
627 context
628 .session_config()
629 .options()
630 .execution
631 .listing_table_ignore_subdirectory,
632 )
633 .await;
634
635 while let Some(file_meta) = file_stream.try_next().await? {
636 self.append(
637 catalog_name,
638 schema_name,
639 table_name,
640 &store_url,
641 &file_meta,
642 );
643 }
644
645 Ok(())
646 }
647
648 fn append(
649 &mut self,
650 catalog_name: &str,
651 schema_name: &str,
652 table_name: &str,
653 store_url: &ObjectStoreUrl,
654 meta: &ObjectMeta,
655 ) {
656 self.catalog_names.append_value(catalog_name);
657 self.schema_names.append_value(schema_name);
658 self.table_names.append_value(table_name);
659 self.file_paths
660 .append_value(format!("{store_url}{}", meta.location));
661 self.last_modified
662 .append_option(meta.last_modified.timestamp_nanos_opt());
663 self.size.append_value(meta.size as u64); }
665
666 fn finish(mut self) -> Result<RecordBatch> {
667 RecordBatch::try_new(
668 self.schema,
669 vec![
670 Arc::new(self.catalog_names.finish()),
671 Arc::new(self.schema_names.finish()),
672 Arc::new(self.table_names.finish()),
673 Arc::new(self.file_paths.finish()),
674 Arc::new(self.last_modified.finish()),
675 Arc::new(self.size.finish()),
676 ],
677 )
678 .map_err(From::from)
679 }
680}
681
682async fn list_all_files<'a>(
685 store: &'a dyn ObjectStore,
686 url: &'a ListingTableUrl,
687 file_extension: &'a str,
688 ignore_subdirectory: bool,
689) -> BoxStream<'a, Result<ObjectMeta>> {
690 if let Err(object_store::Error::NotFound { path, .. }) =
692 store.list_with_delimiter(Some(url.prefix())).await
693 {
694 debug!(
695 "attempted to list empty table at {path} during file_metadata listing, returning empty list"
696 );
697 return Box::pin(stream::empty());
698 }
699
700 let is_dir = url.as_str().ends_with('/');
701 let list = match is_dir {
702 true => store.list(Some(url.prefix())),
703 false => futures::stream::once(store.head(url.prefix())).boxed(),
704 };
705
706 list.map_err(Into::into)
707 .try_filter(move |meta| {
708 let path = &meta.location;
709 let extension_match = path.as_ref().ends_with(file_extension);
710 let glob_match = url.contains(path, ignore_subdirectory);
711 futures::future::ready(extension_match && glob_match)
712 })
713 .boxed()
714}
715
716#[cfg(test)]
717mod test {
718 use std::{ops::Deref, sync::Arc};
719
720 use anyhow::{Context, Result};
721 use datafusion::{
722 assert_batches_sorted_eq,
723 catalog_common::{MemoryCatalogProvider, MemorySchemaProvider},
724 execution::{
725 object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
726 runtime_env::RuntimeEnvBuilder,
727 },
728 prelude::{SessionConfig, SessionContext},
729 };
730 use object_store::local::LocalFileSystem;
731 use tempfile::TempDir;
732 use url::Url;
733
734 use super::FileMetadata;
735
736 struct TestContext {
737 _dir: TempDir,
738 ctx: SessionContext,
739 }
740
741 impl Deref for TestContext {
742 type Target = SessionContext;
743
744 fn deref(&self) -> &Self::Target {
745 &self.ctx
746 }
747 }
748
749 async fn setup() -> Result<TestContext> {
750 let _ = env_logger::builder().is_test(true).try_init();
751
752 let dir = TempDir::new().context("create tempdir")?;
753 let store = LocalFileSystem::new_with_prefix(&dir)
754 .map(Arc::new)
755 .context("create local file system object store")?;
756
757 let registry = Arc::new(DefaultObjectStoreRegistry::new());
758 registry
759 .register_store(&Url::parse("file://").unwrap(), store)
760 .context("register file system store")
761 .expect("should replace existing object store at file://");
762
763 let ctx = SessionContext::new_with_config_rt(
764 SessionConfig::new(),
765 RuntimeEnvBuilder::new()
766 .with_object_store_registry(registry)
767 .build_arc()
768 .context("create RuntimeEnv")?,
769 );
770
771 ctx.catalog("datafusion")
772 .context("get default catalog")?
773 .register_schema("private", Arc::<MemorySchemaProvider>::default())
774 .context("register datafusion.private schema")?;
775
776 ctx.register_catalog("datafusion_mv", Arc::<MemoryCatalogProvider>::default());
777
778 ctx.catalog("datafusion_mv")
779 .context("get datafusion_mv catalog")?
780 .register_schema("public", Arc::<MemorySchemaProvider>::default())
781 .context("register datafusion_mv.public schema")?;
782
783 ctx.sql(
784 "
785 CREATE EXTERNAL TABLE t1 (num INTEGER, year TEXT)
786 STORED AS CSV
787 PARTITIONED BY (year)
788 LOCATION 'file:///t1/'
789 ",
790 )
791 .await?
792 .collect()
793 .await?;
794
795 ctx.sql(
796 "INSERT INTO t1 VALUES
797 (1, '2021'),
798 (2, '2022'),
799 (3, '2023'),
800 (4, '2024')
801 ",
802 )
803 .await?
804 .collect()
805 .await?;
806
807 ctx.sql(
808 "
809 CREATE EXTERNAL TABLE private.t1 (num INTEGER, year TEXT, month TEXT)
810 STORED AS CSV
811 PARTITIONED BY (year, month)
812 LOCATION 'file:///private/t1/'
813 ",
814 )
815 .await?
816 .collect()
817 .await?;
818
819 ctx.sql(
820 "INSERT INTO private.t1 VALUES
821 (1, '2021', '01'),
822 (2, '2022', '02'),
823 (3, '2023', '03'),
824 (4, '2024', '04')
825 ",
826 )
827 .await?
828 .collect()
829 .await?;
830
831 ctx.sql(
832 "
833 CREATE EXTERNAL TABLE datafusion_mv.public.t3 (num INTEGER, date DATE)
834 STORED AS CSV
835 PARTITIONED BY (date)
836 LOCATION 'file:///datafusion_mv/public/t3/'
837 ",
838 )
839 .await?
840 .collect()
841 .await?;
842
843 ctx.sql(
844 "INSERT INTO datafusion_mv.public.t3 VALUES
845 (1, '2021-01-01'),
846 (2, '2022-02-02'),
847 (3, '2023-03-03'),
848 (4, '2024-04-04')
849 ",
850 )
851 .await?
852 .collect()
853 .await?;
854
855 ctx.register_table(
856 "file_metadata",
857 Arc::new(FileMetadata::new(Arc::clone(ctx.state().catalog_list()))),
858 )
859 .context("register file metadata table")?;
860
861 ctx.sql(
862 "CREATE VIEW file_metadata_test_view AS SELECT
864 * EXCLUDE(file_path, last_modified),
865 regexp_replace(file_path, '/[^/]*$', '/') AS file_path
866 FROM file_metadata",
867 )
868 .await
869 .context("create file metadata test view")?;
870
871 Ok(TestContext { _dir: dir, ctx })
872 }
873
874 #[tokio::test]
875 async fn test_list_all_files() -> Result<()> {
876 let ctx = setup().await.context("setup")?;
877
878 let results = ctx
879 .sql("SELECT * FROM file_metadata_test_view")
880 .await?
881 .collect()
882 .await?;
883
884 assert_batches_sorted_eq!(&[
885 "+---------------+--------------+------------+------+--------------------------------------------------+",
886 "| table_catalog | table_schema | table_name | size | file_path |",
887 "+---------------+--------------+------------+------+--------------------------------------------------+",
888 "| datafusion | private | t1 | 6 | file:///private/t1/year=2021/month=01/ |",
889 "| datafusion | private | t1 | 6 | file:///private/t1/year=2022/month=02/ |",
890 "| datafusion | private | t1 | 6 | file:///private/t1/year=2023/month=03/ |",
891 "| datafusion | private | t1 | 6 | file:///private/t1/year=2024/month=04/ |",
892 "| datafusion | public | t1 | 6 | file:///t1/year=2021/ |",
893 "| datafusion | public | t1 | 6 | file:///t1/year=2022/ |",
894 "| datafusion | public | t1 | 6 | file:///t1/year=2023/ |",
895 "| datafusion | public | t1 | 6 | file:///t1/year=2024/ |",
896 "| datafusion_mv | public | t3 | 6 | file:///datafusion_mv/public/t3/date=2021-01-01/ |",
897 "| datafusion_mv | public | t3 | 6 | file:///datafusion_mv/public/t3/date=2022-02-02/ |",
898 "| datafusion_mv | public | t3 | 6 | file:///datafusion_mv/public/t3/date=2023-03-03/ |",
899 "| datafusion_mv | public | t3 | 6 | file:///datafusion_mv/public/t3/date=2024-04-04/ |",
900 "+---------------+--------------+------------+------+--------------------------------------------------+",
901 ], &results);
902
903 Ok(())
904 }
905
906 #[tokio::test]
907 async fn test_list_catalog() -> Result<()> {
908 let ctx = setup().await.context("setup")?;
909
910 let results = ctx
911 .sql(
912 "SELECT * FROM file_metadata_test_view
913 WHERE table_catalog = 'datafusion_mv'",
914 )
915 .await?
916 .collect()
917 .await?;
918
919 assert_batches_sorted_eq!(&[
920 "+---------------+--------------+------------+------+--------------------------------------------------+",
921 "| table_catalog | table_schema | table_name | size | file_path |",
922 "+---------------+--------------+------------+------+--------------------------------------------------+",
923 "| datafusion_mv | public | t3 | 6 | file:///datafusion_mv/public/t3/date=2021-01-01/ |",
924 "| datafusion_mv | public | t3 | 6 | file:///datafusion_mv/public/t3/date=2022-02-02/ |",
925 "| datafusion_mv | public | t3 | 6 | file:///datafusion_mv/public/t3/date=2023-03-03/ |",
926 "| datafusion_mv | public | t3 | 6 | file:///datafusion_mv/public/t3/date=2024-04-04/ |",
927 "+---------------+--------------+------------+------+--------------------------------------------------+",
928 ], &results);
929
930 Ok(())
931 }
932
933 #[tokio::test]
934 async fn test_list_catalog_and_schema() -> Result<()> {
935 let ctx = setup().await.context("setup")?;
936
937 let results = ctx
938 .sql(
939 "SELECT * FROM file_metadata_test_view
940 WHERE table_catalog = 'datafusion' AND table_schema = 'private'",
941 )
942 .await?
943 .collect()
944 .await?;
945
946 assert_batches_sorted_eq!(&[
947 "+---------------+--------------+------------+------+----------------------------------------+",
948 "| table_catalog | table_schema | table_name | size | file_path |",
949 "+---------------+--------------+------------+------+----------------------------------------+",
950 "| datafusion | private | t1 | 6 | file:///private/t1/year=2021/month=01/ |",
951 "| datafusion | private | t1 | 6 | file:///private/t1/year=2022/month=02/ |",
952 "| datafusion | private | t1 | 6 | file:///private/t1/year=2023/month=03/ |",
953 "| datafusion | private | t1 | 6 | file:///private/t1/year=2024/month=04/ |",
954 "+---------------+--------------+------------+------+----------------------------------------+",
955 ], &results);
956
957 Ok(())
958 }
959
960 #[tokio::test]
961 async fn test_list_schema_only() -> Result<()> {
962 let ctx = setup().await.context("setup")?;
963
964 let results = ctx
965 .sql(
966 "SELECT * FROM file_metadata_test_view
967 WHERE table_schema = 'public'",
968 )
969 .await?
970 .collect()
971 .await?;
972
973 assert_batches_sorted_eq!(&[
974 "+---------------+--------------+------------+------+--------------------------------------------------+",
975 "| table_catalog | table_schema | table_name | size | file_path |",
976 "+---------------+--------------+------------+------+--------------------------------------------------+",
977 "| datafusion | public | t1 | 6 | file:///t1/year=2021/ |",
978 "| datafusion | public | t1 | 6 | file:///t1/year=2022/ |",
979 "| datafusion | public | t1 | 6 | file:///t1/year=2023/ |",
980 "| datafusion | public | t1 | 6 | file:///t1/year=2024/ |",
981 "| datafusion_mv | public | t3 | 6 | file:///datafusion_mv/public/t3/date=2021-01-01/ |",
982 "| datafusion_mv | public | t3 | 6 | file:///datafusion_mv/public/t3/date=2022-02-02/ |",
983 "| datafusion_mv | public | t3 | 6 | file:///datafusion_mv/public/t3/date=2023-03-03/ |",
984 "| datafusion_mv | public | t3 | 6 | file:///datafusion_mv/public/t3/date=2024-04-04/ |",
985 "+---------------+--------------+------------+------+--------------------------------------------------+",
986 ], &results);
987
988 Ok(())
989 }
990
991 #[tokio::test]
992 async fn test_list_catalog_schema_and_table() -> Result<()> {
993 let ctx = setup().await.context("setup")?;
994
995 let results = ctx
996 .sql(
997 "SELECT * FROM file_metadata_test_view
998 WHERE table_catalog = 'datafusion' AND table_schema = 'public' AND table_name = 't1'",
999 )
1000 .await?
1001 .collect()
1002 .await?;
1003
1004 assert_batches_sorted_eq!(
1005 &[
1006 "+---------------+--------------+------------+------+-----------------------+",
1007 "| table_catalog | table_schema | table_name | size | file_path |",
1008 "+---------------+--------------+------------+------+-----------------------+",
1009 "| datafusion | public | t1 | 6 | file:///t1/year=2021/ |",
1010 "| datafusion | public | t1 | 6 | file:///t1/year=2022/ |",
1011 "| datafusion | public | t1 | 6 | file:///t1/year=2023/ |",
1012 "| datafusion | public | t1 | 6 | file:///t1/year=2024/ |",
1013 "+---------------+--------------+------------+------+-----------------------+",
1014 ],
1015 &results
1016 );
1017
1018 Ok(())
1019 }
1020
1021 #[tokio::test]
1022 async fn test_list_table_only() -> Result<()> {
1023 let ctx = setup().await.context("setup")?;
1024
1025 let results = ctx
1026 .sql(
1027 "SELECT * FROM file_metadata_test_view
1028 WHERE table_name = 't1'",
1029 )
1030 .await?
1031 .collect()
1032 .await?;
1033
1034 assert_batches_sorted_eq!(
1035 &[
1036 "+---------------+--------------+------------+------+----------------------------------------+",
1037 "| table_catalog | table_schema | table_name | size | file_path |",
1038 "+---------------+--------------+------------+------+----------------------------------------+",
1039 "| datafusion | private | t1 | 6 | file:///private/t1/year=2021/month=01/ |",
1040 "| datafusion | private | t1 | 6 | file:///private/t1/year=2022/month=02/ |",
1041 "| datafusion | private | t1 | 6 | file:///private/t1/year=2023/month=03/ |",
1042 "| datafusion | private | t1 | 6 | file:///private/t1/year=2024/month=04/ |",
1043 "| datafusion | public | t1 | 6 | file:///t1/year=2021/ |",
1044 "| datafusion | public | t1 | 6 | file:///t1/year=2022/ |",
1045 "| datafusion | public | t1 | 6 | file:///t1/year=2023/ |",
1046 "| datafusion | public | t1 | 6 | file:///t1/year=2024/ |",
1047 "+---------------+--------------+------------+------+----------------------------------------+",
1048 ],
1049 &results
1050 );
1051
1052 Ok(())
1053 }
1054}