1use arrow::array::{StringBuilder, TimestampNanosecondBuilder, UInt64Builder};
19use arrow::record_batch::RecordBatch;
20use arrow_schema::{DataType, Field, TimeUnit};
21use async_trait::async_trait;
22use datafusion::arrow::datatypes::{Schema, SchemaRef};
23use datafusion::catalog::SchemaProvider;
24use datafusion::catalog::{CatalogProvider, Session};
25use datafusion::datasource::listing::ListingTableUrl;
26use datafusion::datasource::TableProvider;
27use datafusion::execution::object_store::ObjectStoreUrl;
28use datafusion::physical_expr::{create_physical_expr, EquivalenceProperties};
29use datafusion::physical_plan::expressions::{BinaryExpr, Column, Literal};
30use datafusion::physical_plan::limit::LimitStream;
31use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
32use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
33use datafusion::physical_plan::{
34 DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr, PlanProperties,
35};
36use datafusion::{
37 catalog::CatalogProviderList, execution::TaskContext, physical_plan::SendableRecordBatchStream,
38};
39use datafusion_common::{DFSchema, DataFusionError, Result, ScalarValue};
40use datafusion_expr::{Expr, Operator, TableProviderFilterPushDown, TableType};
41use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
42use futures::stream::{self, BoxStream};
43use futures::{future, Future, FutureExt, StreamExt, TryStreamExt};
44use itertools::Itertools;
45use log::debug;
46use object_store::{ObjectMeta, ObjectStore};
47use std::any::Any;
48use std::sync::Arc;
49
50use crate::materialized::cast_to_listing_table;
51
52#[derive(Debug, Clone)]
54pub struct FileMetadata {
55 table_schema: SchemaRef,
56 catalog_list: Arc<dyn CatalogProviderList>,
57 metadata_provider: Arc<dyn FileMetadataProvider>,
58}
59
60impl FileMetadata {
61 pub fn new(
64 catalog_list: Arc<dyn CatalogProviderList>,
65 metadata_provider: Arc<dyn FileMetadataProvider>,
66 ) -> Self {
67 Self {
68 table_schema: Arc::new(Schema::new(vec![
69 Field::new("table_catalog", DataType::Utf8, false),
70 Field::new("table_schema", DataType::Utf8, false),
71 Field::new("table_name", DataType::Utf8, false),
72 Field::new("file_path", DataType::Utf8, false),
73 Field::new(
74 "last_modified",
75 DataType::Timestamp(TimeUnit::Nanosecond, Some(Arc::from("UTC"))),
76 false,
77 ),
78 Field::new("size", DataType::UInt64, false),
79 ])),
80 catalog_list,
81 metadata_provider,
82 }
83 }
84}
85
86#[async_trait]
87impl TableProvider for FileMetadata {
88 fn as_any(&self) -> &dyn Any {
89 self
90 }
91
92 fn schema(&self) -> SchemaRef {
93 self.table_schema.clone()
94 }
95
96 fn table_type(&self) -> TableType {
97 TableType::Base
98 }
99
100 async fn scan(
101 &self,
102 session_state: &dyn Session,
103 projection: Option<&Vec<usize>>,
104 filters: &[Expr],
105 limit: Option<usize>,
106 ) -> Result<Arc<dyn ExecutionPlan>> {
107 let dfschema = DFSchema::try_from(self.table_schema.as_ref().clone())?;
108
109 let filters = filters
110 .iter()
111 .map(|expr| {
112 create_physical_expr(expr, &dfschema, session_state.execution_props())
113 .map_err(|e| e.context("failed to create file metadata physical expr"))
114 })
115 .collect::<Result<Vec<_>, _>>()?;
116
117 let exec = FileMetadataExec::try_new(
118 self.table_schema.clone(),
119 projection.cloned(),
120 filters,
121 limit,
122 self.catalog_list.clone(),
123 self.metadata_provider.clone(),
124 )?;
125
126 Ok(Arc::new(exec))
127 }
128
129 fn supports_filters_pushdown(
130 &self,
131 filters: &[&Expr],
132 ) -> Result<Vec<TableProviderFilterPushDown>> {
133 Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
134 }
135}
136
137pub struct FileMetadataExec {
139 table_schema: SchemaRef,
140 plan_properties: PlanProperties,
141 projection: Option<Vec<usize>>,
142 filters: Vec<Arc<dyn PhysicalExpr>>,
143 limit: Option<usize>,
144 metrics: ExecutionPlanMetricsSet,
145 catalog_list: Arc<dyn CatalogProviderList>,
146 metadata_provider: Arc<dyn FileMetadataProvider>,
147}
148
149impl FileMetadataExec {
150 fn try_new(
151 table_schema: SchemaRef,
152 projection: Option<Vec<usize>>,
153 filters: Vec<Arc<dyn PhysicalExpr>>,
154 limit: Option<usize>,
155 catalog_list: Arc<dyn CatalogProviderList>,
156 metadata_provider: Arc<dyn FileMetadataProvider>,
157 ) -> Result<Self> {
158 let projected_schema = match projection.as_ref() {
159 Some(projection) => Arc::new(table_schema.project(projection)?),
160 None => table_schema.clone(),
161 };
162 let eq_properties = EquivalenceProperties::new(projected_schema);
163 let partitioning = Partitioning::UnknownPartitioning(1);
164 let plan_properties = PlanProperties::new(
165 eq_properties,
166 partitioning,
167 EmissionType::Final,
168 Boundedness::Bounded,
169 );
170
171 let exec = Self {
172 table_schema,
173 plan_properties,
174 projection,
175 filters,
176 limit,
177 metrics: ExecutionPlanMetricsSet::new(),
178 catalog_list,
179 metadata_provider,
180 };
181
182 Ok(exec)
183 }
184}
185
186impl ExecutionPlan for FileMetadataExec {
187 fn as_any(&self) -> &dyn Any {
188 self
189 }
190
191 fn name(&self) -> &str {
192 "FileMetadataExec"
193 }
194
195 fn properties(&self) -> &PlanProperties {
196 &self.plan_properties
197 }
198
199 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
200 vec![]
201 }
202
203 fn with_new_children(
204 self: Arc<Self>,
205 _children: Vec<Arc<dyn ExecutionPlan>>,
206 ) -> Result<Arc<dyn ExecutionPlan>> {
207 Ok(self)
208 }
209
210 fn execute(
211 &self,
212 partition: usize,
213 context: Arc<TaskContext>,
214 ) -> Result<SendableRecordBatchStream> {
215 let projection = self.projection.clone();
216 let record_batches = self.build_record_batch(context)?;
217
218 let projected_record_batches = record_batches.map(move |record_batches| {
219 let record_batches = match record_batches {
220 Ok(record_batches) => record_batches,
221 Err(err) => return vec![Err(err)],
222 };
223
224 if let Some(projection) = projection {
225 return record_batches
226 .into_iter()
227 .map(|record_batch| {
228 record_batch
229 .project(&projection)
230 .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))
231 })
232 .collect::<Vec<_>>();
233 }
234
235 record_batches.into_iter().map(Ok).collect::<Vec<_>>()
236 });
237
238 let mut record_batch_stream: SendableRecordBatchStream =
239 Box::pin(RecordBatchStreamAdapter::new(
240 self.schema(),
241 futures::stream::once(projected_record_batches)
242 .map(stream::iter)
243 .flatten(),
244 ));
245
246 if let Some(limit) = self.limit {
247 let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
248 let limit_stream =
249 LimitStream::new(record_batch_stream, 0, Some(limit), baseline_metrics);
250 record_batch_stream = Box::pin(limit_stream);
251 }
252
253 Ok(record_batch_stream)
254 }
255
256 fn metrics(&self) -> Option<MetricsSet> {
257 Some(self.metrics.clone_inner())
258 }
259}
260
261impl FileMetadataExec {
262 fn get_column_index(&self, column_name: &str) -> Result<usize> {
263 let (index, _) = self
264 .table_schema
265 .column_with_name(column_name)
266 .ok_or_else(|| {
267 DataFusionError::Internal(format!("column '{column_name}' does not exists"))
268 })?;
269 Ok(index)
270 }
271
272 fn get_column_literal(column_idx: usize, filter: &Arc<dyn PhysicalExpr>) -> Option<String> {
274 let binary_expr = filter.as_any().downcast_ref::<BinaryExpr>()?;
275
276 if !matches!(binary_expr.op(), Operator::Eq) {
277 return None;
278 }
279
280 let (column, literal) = if let Some(left_column) =
281 binary_expr.left().as_any().downcast_ref::<Column>()
282 {
283 let right_literal = binary_expr.right().as_any().downcast_ref::<Literal>()?;
284 (left_column, right_literal)
285 } else if let Some(right_column) = binary_expr.right().as_any().downcast_ref::<Column>() {
286 let left_literal = binary_expr.left().as_any().downcast_ref::<Literal>()?;
287 (right_column, left_literal)
288 } else {
289 return None;
290 };
291
292 if column.index() != column_idx {
293 return None;
294 }
295
296 match literal.value() {
297 ScalarValue::Utf8(str) => str.clone(),
298 ScalarValue::LargeUtf8(str) => str.clone(),
299 _ => None,
300 }
301 }
302
303 fn build_record_batch(
305 &self,
306 context: Arc<TaskContext>,
307 ) -> Result<impl Future<Output = Result<Vec<RecordBatch>>>> {
308 let catalog_column = self.get_column_index("table_catalog")?;
309 let schema_column = self.get_column_index("table_schema")?;
310 let table_column = self.get_column_index("table_name")?;
311
312 let catalog_name = self
313 .filters
314 .iter()
315 .filter_map(|filter| Self::get_column_literal(catalog_column, filter))
316 .next();
317
318 let schema_name = self
319 .filters
320 .iter()
321 .filter_map(|filter| Self::get_column_literal(schema_column, filter))
322 .next();
323
324 let table_name = self
325 .filters
326 .iter()
327 .filter_map(|filter| Self::get_column_literal(table_column, filter))
328 .next();
329
330 let table_schema = self.table_schema.clone();
331 let catalog_list = self.catalog_list.clone();
332 let metadata_provider = self.metadata_provider.clone();
333
334 let record_batch = async move {
335 let catalog_name = match catalog_name {
337 Some(catalog_name) => catalog_name,
338 None => {
339 debug!("No catalog filter exists, returning entire catalog list.");
340 return FileMetadataBuilder::build_from_catalog_list(
341 catalog_list,
342 metadata_provider,
343 table_schema,
344 context,
345 )
346 .await;
347 }
348 };
349
350 let catalog_provider = match catalog_list.catalog(&catalog_name) {
352 Some(catalog_provider) => catalog_provider,
353 None => {
354 debug!("No catalog named '{catalog_name}' exists, returning an empty result.");
355 return Ok(vec![]);
356 }
357 };
358
359 let schema_name = match schema_name {
361 Some(schema_name) => schema_name,
362 None => {
363 debug!("No schema filter exists, returning catalog '{catalog_name}'.");
364 return FileMetadataBuilder::build_from_catalog(
365 &catalog_name,
366 catalog_provider,
367 metadata_provider,
368 table_schema,
369 context,
370 )
371 .await;
372 }
373 };
374
375 let schema_provider = match catalog_provider.schema(&schema_name) {
377 Some(schema_provider) => schema_provider,
378 None => {
379 debug!("No schema named '{catalog_name}.{schema_name}' exists, returning an empty result.");
380 return Ok(vec![]);
381 }
382 };
383
384 let table_name = match table_name {
386 Some(table_name) => table_name,
387 None => {
388 debug!(
389 "No table filter exists, returning schema '{catalog_name}.{schema_name}'."
390 );
391 return FileMetadataBuilder::build_from_schema(
392 &catalog_name,
393 &schema_name,
394 schema_provider,
395 metadata_provider,
396 table_schema,
397 context,
398 )
399 .await;
400 }
401 };
402
403 let table_provider = match schema_provider.table(&table_name).await? {
405 Some(table_provider) => table_provider,
406 None => {
407 debug!("No table named '{catalog_name}.{schema_name}.{table_name}' exists, returning an empty result.");
408 return Ok(vec![]);
409 }
410 };
411
412 debug!("Returning table '{catalog_name}.{schema_name}.{table_name}'.");
413
414 let record_batch = FileMetadataBuilder::build_from_table(
415 &catalog_name,
416 &schema_name,
417 &table_name,
418 table_provider,
419 metadata_provider,
420 table_schema,
421 context,
422 )
423 .await?;
424
425 Ok(record_batch.into_iter().collect_vec())
426 };
427
428 Ok(record_batch)
429 }
430}
431
432impl std::fmt::Debug for FileMetadataExec {
433 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
434 f.debug_struct("FileMetadataExec")
435 .field("plan_properties", &self.plan_properties)
436 .field("filters", &self.filters)
437 .field("limit", &self.limit)
438 .finish_non_exhaustive()
439 }
440}
441
442impl DisplayAs for FileMetadataExec {
443 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
444 write!(f, "FileMetadataExec: ")?;
445
446 write!(f, "filters=[")?;
447 let mut filters = self.filters.iter().peekable();
448 while let Some(filter) = filters.next() {
449 std::fmt::Display::fmt(filter, f)?;
450 if filters.peek().is_some() {
451 write!(f, ", ")?;
452 }
453 }
454 write!(f, "]")?;
455
456 if let Some(limit) = &self.limit {
457 write!(f, ", limit={limit}")?;
458 }
459
460 Ok(())
461 }
462}
463
464struct FileMetadataBuilder {
465 schema: SchemaRef,
466 metadata_provider: Arc<dyn FileMetadataProvider>,
467 catalog_names: StringBuilder,
468 schema_names: StringBuilder,
469 table_names: StringBuilder,
470 file_paths: StringBuilder,
471 last_modified: TimestampNanosecondBuilder,
472 size: UInt64Builder,
473}
474
475impl FileMetadataBuilder {
476 fn new(schema: SchemaRef, metadata_provider: Arc<dyn FileMetadataProvider>) -> Self {
477 Self {
478 schema,
479 metadata_provider,
480 catalog_names: StringBuilder::new(),
481 schema_names: StringBuilder::new(),
482 table_names: StringBuilder::new(),
483 file_paths: StringBuilder::new(),
484 last_modified: TimestampNanosecondBuilder::new().with_timezone("UTC"),
485 size: UInt64Builder::new(),
486 }
487 }
488
489 async fn build_from_catalog_list(
490 catalog_list: Arc<dyn CatalogProviderList>,
491 metadata_provider: Arc<dyn FileMetadataProvider>,
492 schema: SchemaRef,
493 context: Arc<TaskContext>,
494 ) -> Result<Vec<RecordBatch>> {
495 let mut tasks = vec![];
496
497 for catalog_name in catalog_list.catalog_names() {
498 let catalog_provider = match catalog_list.catalog(&catalog_name) {
499 Some(catalog_provider) => catalog_provider,
500 None => continue,
501 };
502 let metadata_provider = metadata_provider.clone();
503 let schema = schema.clone();
504 let context = context.clone();
505
506 tasks.push(async move {
507 Self::build_from_catalog(
508 &catalog_name,
509 catalog_provider,
510 metadata_provider,
511 schema,
512 context,
513 )
514 .await
515 });
516 }
517
518 let results = future::join_all(tasks).await;
519
520 let record_batches = results
521 .into_iter()
522 .collect::<Result<Vec<_>, _>>()?
523 .into_iter()
524 .flatten()
525 .collect();
526
527 Ok(record_batches)
528 }
529
530 async fn build_from_catalog(
531 catalog_name: &str,
532 catalog_provider: Arc<dyn CatalogProvider>,
533 metadata_provider: Arc<dyn FileMetadataProvider>,
534 schema: SchemaRef,
535 context: Arc<TaskContext>,
536 ) -> Result<Vec<RecordBatch>> {
537 let mut tasks = vec![];
538
539 for schema_name in catalog_provider.schema_names() {
540 let schema_provider = match catalog_provider.schema(&schema_name) {
541 Some(schema_provider) => schema_provider,
542 None => continue,
543 };
544 let metadata_provider = metadata_provider.clone();
545 let schema = schema.clone();
546 let context = context.clone();
547
548 tasks.push(async move {
549 Self::build_from_schema(
550 catalog_name,
551 &schema_name,
552 schema_provider,
553 metadata_provider,
554 schema,
555 context,
556 )
557 .await
558 });
559 }
560
561 let results = future::join_all(tasks).await;
562
563 let record_batches = results
564 .into_iter()
565 .collect::<Result<Vec<_>, _>>()?
566 .into_iter()
567 .flatten()
568 .collect();
569
570 Ok(record_batches)
571 }
572
573 async fn build_from_schema(
574 catalog_name: &str,
575 schema_name: &str,
576 schema_provider: Arc<dyn SchemaProvider>,
577 metadata_provider: Arc<dyn FileMetadataProvider>,
578 schema: SchemaRef,
579 context: Arc<TaskContext>,
580 ) -> Result<Vec<RecordBatch>> {
581 let mut tasks = vec![];
582
583 for table_name in schema_provider.table_names() {
584 let table_provider = match schema_provider.table(&table_name).await? {
585 Some(table_provider) => table_provider,
586 None => continue,
587 };
588 let metadata_provider = metadata_provider.clone();
589 let schema = schema.clone();
590 let context = context.clone();
591
592 tasks.push(async move {
593 Self::build_from_table(
594 catalog_name,
595 schema_name,
596 &table_name,
597 table_provider,
598 metadata_provider,
599 schema,
600 context,
601 )
602 .await
603 })
604 }
605
606 let results = future::join_all(tasks).await;
607 let record_batches = results
608 .into_iter()
609 .collect::<Result<Vec<_>, _>>()?
610 .into_iter()
611 .flatten()
612 .collect();
613
614 Ok(record_batches)
615 }
616
617 async fn build_from_table(
618 catalog_name: &str,
619 schema_name: &str,
620 table_name: &str,
621 table_provider: Arc<dyn TableProvider>,
622 metadata_provider: Arc<dyn FileMetadataProvider>,
623 schema: SchemaRef,
624 context: Arc<TaskContext>,
625 ) -> Result<Option<RecordBatch>> {
626 let mut builder = Self::new(schema.clone(), metadata_provider.clone());
627
628 let listing_table_like = match cast_to_listing_table(table_provider.as_ref()) {
629 None => return Ok(None),
630 Some(t) => t,
631 };
632
633 let table_paths = listing_table_like.table_paths();
634 let file_extension = listing_table_like.file_ext();
635
636 for table_path in table_paths {
637 builder
638 .read_table_files(
639 catalog_name,
640 schema_name,
641 table_name,
642 &table_path,
643 &file_extension,
644 &context,
645 )
646 .await?;
647 }
648
649 builder.finish().map(Some)
650 }
651
652 async fn read_table_files(
653 &mut self,
654 catalog_name: &str,
655 schema_name: &str,
656 table_name: &str,
657 table_path: &ListingTableUrl,
658 file_ext: &str,
659 context: &TaskContext,
660 ) -> Result<()> {
661 let store_url = table_path.object_store();
662 let store = context.runtime_env().object_store(table_path)?;
663
664 let mut file_stream = self
665 .metadata_provider
666 .list_all_files(
667 store.clone(),
668 table_path.clone(),
669 file_ext.to_string(),
670 context
671 .session_config()
672 .options()
673 .execution
674 .listing_table_ignore_subdirectory,
675 )
676 .await;
677
678 while let Some(file_meta) = file_stream.try_next().await? {
679 self.append(
680 catalog_name,
681 schema_name,
682 table_name,
683 &store_url,
684 &file_meta,
685 );
686 }
687
688 Ok(())
689 }
690
691 fn append(
692 &mut self,
693 catalog_name: &str,
694 schema_name: &str,
695 table_name: &str,
696 store_url: &ObjectStoreUrl,
697 meta: &ObjectMeta,
698 ) {
699 self.catalog_names.append_value(catalog_name);
700 self.schema_names.append_value(schema_name);
701 self.table_names.append_value(table_name);
702 self.file_paths
703 .append_value(format!("{store_url}{}", meta.location));
704 self.last_modified
705 .append_option(meta.last_modified.timestamp_nanos_opt());
706 self.size.append_value(meta.size); }
708
709 fn finish(mut self) -> Result<RecordBatch> {
710 RecordBatch::try_new(
711 self.schema,
712 vec![
713 Arc::new(self.catalog_names.finish()),
714 Arc::new(self.schema_names.finish()),
715 Arc::new(self.table_names.finish()),
716 Arc::new(self.file_paths.finish()),
717 Arc::new(self.last_modified.finish()),
718 Arc::new(self.size.finish()),
719 ],
720 )
721 .map_err(From::from)
722 }
723}
724
725#[async_trait]
727pub trait FileMetadataProvider: std::fmt::Debug + Send + Sync {
728 async fn list_all_files(
730 &self,
731 store: Arc<dyn ObjectStore>,
732 url: ListingTableUrl,
733 file_extension: String,
734 ignore_subdirectory: bool,
735 ) -> BoxStream<'static, Result<ObjectMeta>>;
736}
737
738#[derive(Debug)]
740pub struct DefaultFileMetadataProvider;
741
742#[async_trait]
743impl FileMetadataProvider for DefaultFileMetadataProvider {
744 async fn list_all_files(
747 &self,
748 store: Arc<dyn ObjectStore>,
749 url: ListingTableUrl,
750 file_extension: String,
751 ignore_subdirectory: bool,
752 ) -> BoxStream<'static, Result<ObjectMeta>> {
753 if let Err(object_store::Error::NotFound { path, .. }) =
755 store.list_with_delimiter(Some(url.prefix())).await
756 {
757 debug!(
758 "attempted to list empty table at {path} during file_metadata listing, returning empty list"
759 );
760 return Box::pin(stream::empty());
761 }
762
763 let is_dir = url.as_str().ends_with('/');
764 let prefix = url.prefix().clone();
765
766 let list = match is_dir {
767 true => store.list(Some(&prefix)),
768 false => futures::stream::once(async move { store.head(&prefix).await }).boxed(),
769 };
770
771 list.map_err(Into::into)
772 .try_filter(move |meta| {
773 let path = &meta.location;
774 let extension_match = path.as_ref().ends_with(&file_extension);
775 let glob_match = url.contains(path, ignore_subdirectory);
776 futures::future::ready(extension_match && glob_match)
777 })
778 .boxed()
779 }
780}
781
782#[cfg(test)]
783mod test {
784 use std::{ops::Deref, sync::Arc};
785
786 use anyhow::{Context, Result};
787 use datafusion::{
788 assert_batches_sorted_eq,
789 catalog::{MemoryCatalogProvider, MemorySchemaProvider},
790 execution::{
791 object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
792 runtime_env::RuntimeEnvBuilder,
793 },
794 prelude::{SessionConfig, SessionContext},
795 };
796 use object_store::local::LocalFileSystem;
797 use tempfile::TempDir;
798 use url::Url;
799
800 use super::{DefaultFileMetadataProvider, FileMetadata};
801
802 struct TestContext {
803 _dir: TempDir,
804 ctx: SessionContext,
805 }
806
807 impl Deref for TestContext {
808 type Target = SessionContext;
809
810 fn deref(&self) -> &Self::Target {
811 &self.ctx
812 }
813 }
814
815 async fn setup() -> Result<TestContext> {
816 let _ = env_logger::builder().is_test(true).try_init();
817
818 let dir = TempDir::new().context("create tempdir")?;
819 let store = LocalFileSystem::new_with_prefix(&dir)
820 .map(Arc::new)
821 .context("create local file system object store")?;
822
823 let registry = Arc::new(DefaultObjectStoreRegistry::new());
824 registry
825 .register_store(&Url::parse("file://").unwrap(), store)
826 .context("register file system store")
827 .expect("should replace existing object store at file://");
828
829 let ctx = SessionContext::new_with_config_rt(
830 SessionConfig::new(),
831 RuntimeEnvBuilder::new()
832 .with_object_store_registry(registry)
833 .build_arc()
834 .context("create RuntimeEnv")?,
835 );
836
837 ctx.catalog("datafusion")
838 .context("get default catalog")?
839 .register_schema("private", Arc::<MemorySchemaProvider>::default())
840 .context("register datafusion.private schema")?;
841
842 ctx.register_catalog("datafusion_mv", Arc::<MemoryCatalogProvider>::default());
843
844 ctx.catalog("datafusion_mv")
845 .context("get datafusion_mv catalog")?
846 .register_schema("public", Arc::<MemorySchemaProvider>::default())
847 .context("register datafusion_mv.public schema")?;
848
849 ctx.sql(
850 "
851 CREATE EXTERNAL TABLE t1 (num INTEGER, year TEXT)
852 STORED AS CSV
853 PARTITIONED BY (year)
854 LOCATION 'file:///t1/'
855 ",
856 )
857 .await?
858 .collect()
859 .await?;
860
861 ctx.sql(
862 "INSERT INTO t1 VALUES
863 (1, '2021'),
864 (2, '2022'),
865 (3, '2023'),
866 (4, '2024')
867 ",
868 )
869 .await?
870 .collect()
871 .await?;
872
873 ctx.sql(
874 "
875 CREATE EXTERNAL TABLE private.t1 (num INTEGER, year TEXT, month TEXT)
876 STORED AS CSV
877 PARTITIONED BY (year, month)
878 LOCATION 'file:///private/t1/'
879 ",
880 )
881 .await?
882 .collect()
883 .await?;
884
885 ctx.sql(
886 "INSERT INTO private.t1 VALUES
887 (1, '2021', '01'),
888 (2, '2022', '02'),
889 (3, '2023', '03'),
890 (4, '2024', '04')
891 ",
892 )
893 .await?
894 .collect()
895 .await?;
896
897 ctx.sql(
898 "
899 CREATE EXTERNAL TABLE datafusion_mv.public.t3 (num INTEGER, date DATE)
900 STORED AS CSV
901 PARTITIONED BY (date)
902 LOCATION 'file:///datafusion_mv/public/t3/'
903 ",
904 )
905 .await?
906 .collect()
907 .await?;
908
909 ctx.sql(
910 "INSERT INTO datafusion_mv.public.t3 VALUES
911 (1, '2021-01-01'),
912 (2, '2022-02-02'),
913 (3, '2023-03-03'),
914 (4, '2024-04-04')
915 ",
916 )
917 .await?
918 .collect()
919 .await?;
920
921 ctx.register_table(
922 "file_metadata",
923 Arc::new(FileMetadata::new(
924 Arc::clone(ctx.state().catalog_list()),
925 Arc::new(DefaultFileMetadataProvider),
926 )),
927 )
928 .context("register file metadata table")?;
929
930 ctx.sql(
931 "CREATE VIEW file_metadata_test_view AS SELECT
933 * EXCLUDE(file_path, last_modified),
934 regexp_replace(file_path, '/[^/]*$', '/') AS file_path
935 FROM file_metadata",
936 )
937 .await
938 .context("create file metadata test view")?;
939
940 Ok(TestContext { _dir: dir, ctx })
941 }
942
943 #[tokio::test]
944 async fn test_list_all_files() -> Result<()> {
945 let ctx = setup().await.context("setup")?;
946
947 let results = ctx
948 .sql("SELECT * FROM file_metadata_test_view")
949 .await?
950 .collect()
951 .await?;
952
953 assert_batches_sorted_eq!(&[
954 "+---------------+--------------+------------+------+--------------------------------------------------+",
955 "| table_catalog | table_schema | table_name | size | file_path |",
956 "+---------------+--------------+------------+------+--------------------------------------------------+",
957 "| datafusion | private | t1 | 6 | file:///private/t1/year=2021/month=01/ |",
958 "| datafusion | private | t1 | 6 | file:///private/t1/year=2022/month=02/ |",
959 "| datafusion | private | t1 | 6 | file:///private/t1/year=2023/month=03/ |",
960 "| datafusion | private | t1 | 6 | file:///private/t1/year=2024/month=04/ |",
961 "| datafusion | public | t1 | 6 | file:///t1/year=2021/ |",
962 "| datafusion | public | t1 | 6 | file:///t1/year=2022/ |",
963 "| datafusion | public | t1 | 6 | file:///t1/year=2023/ |",
964 "| datafusion | public | t1 | 6 | file:///t1/year=2024/ |",
965 "| datafusion_mv | public | t3 | 6 | file:///datafusion_mv/public/t3/date=2021-01-01/ |",
966 "| datafusion_mv | public | t3 | 6 | file:///datafusion_mv/public/t3/date=2022-02-02/ |",
967 "| datafusion_mv | public | t3 | 6 | file:///datafusion_mv/public/t3/date=2023-03-03/ |",
968 "| datafusion_mv | public | t3 | 6 | file:///datafusion_mv/public/t3/date=2024-04-04/ |",
969 "+---------------+--------------+------------+------+--------------------------------------------------+",
970 ], &results);
971
972 Ok(())
973 }
974
975 #[tokio::test]
976 async fn test_list_catalog() -> Result<()> {
977 let ctx = setup().await.context("setup")?;
978
979 let results = ctx
980 .sql(
981 "SELECT * FROM file_metadata_test_view
982 WHERE table_catalog = 'datafusion_mv'",
983 )
984 .await?
985 .collect()
986 .await?;
987
988 assert_batches_sorted_eq!(&[
989 "+---------------+--------------+------------+------+--------------------------------------------------+",
990 "| table_catalog | table_schema | table_name | size | file_path |",
991 "+---------------+--------------+------------+------+--------------------------------------------------+",
992 "| datafusion_mv | public | t3 | 6 | file:///datafusion_mv/public/t3/date=2021-01-01/ |",
993 "| datafusion_mv | public | t3 | 6 | file:///datafusion_mv/public/t3/date=2022-02-02/ |",
994 "| datafusion_mv | public | t3 | 6 | file:///datafusion_mv/public/t3/date=2023-03-03/ |",
995 "| datafusion_mv | public | t3 | 6 | file:///datafusion_mv/public/t3/date=2024-04-04/ |",
996 "+---------------+--------------+------------+------+--------------------------------------------------+",
997 ], &results);
998
999 Ok(())
1000 }
1001
1002 #[tokio::test]
1003 async fn test_list_catalog_and_schema() -> Result<()> {
1004 let ctx = setup().await.context("setup")?;
1005
1006 let results = ctx
1007 .sql(
1008 "SELECT * FROM file_metadata_test_view
1009 WHERE table_catalog = 'datafusion' AND table_schema = 'private'",
1010 )
1011 .await?
1012 .collect()
1013 .await?;
1014
1015 assert_batches_sorted_eq!(&[
1016 "+---------------+--------------+------------+------+----------------------------------------+",
1017 "| table_catalog | table_schema | table_name | size | file_path |",
1018 "+---------------+--------------+------------+------+----------------------------------------+",
1019 "| datafusion | private | t1 | 6 | file:///private/t1/year=2021/month=01/ |",
1020 "| datafusion | private | t1 | 6 | file:///private/t1/year=2022/month=02/ |",
1021 "| datafusion | private | t1 | 6 | file:///private/t1/year=2023/month=03/ |",
1022 "| datafusion | private | t1 | 6 | file:///private/t1/year=2024/month=04/ |",
1023 "+---------------+--------------+------------+------+----------------------------------------+",
1024 ], &results);
1025
1026 Ok(())
1027 }
1028
1029 #[tokio::test]
1030 async fn test_list_schema_only() -> Result<()> {
1031 let ctx = setup().await.context("setup")?;
1032
1033 let results = ctx
1034 .sql(
1035 "SELECT * FROM file_metadata_test_view
1036 WHERE table_schema = 'public'",
1037 )
1038 .await?
1039 .collect()
1040 .await?;
1041
1042 assert_batches_sorted_eq!(&[
1043 "+---------------+--------------+------------+------+--------------------------------------------------+",
1044 "| table_catalog | table_schema | table_name | size | file_path |",
1045 "+---------------+--------------+------------+------+--------------------------------------------------+",
1046 "| datafusion | public | t1 | 6 | file:///t1/year=2021/ |",
1047 "| datafusion | public | t1 | 6 | file:///t1/year=2022/ |",
1048 "| datafusion | public | t1 | 6 | file:///t1/year=2023/ |",
1049 "| datafusion | public | t1 | 6 | file:///t1/year=2024/ |",
1050 "| datafusion_mv | public | t3 | 6 | file:///datafusion_mv/public/t3/date=2021-01-01/ |",
1051 "| datafusion_mv | public | t3 | 6 | file:///datafusion_mv/public/t3/date=2022-02-02/ |",
1052 "| datafusion_mv | public | t3 | 6 | file:///datafusion_mv/public/t3/date=2023-03-03/ |",
1053 "| datafusion_mv | public | t3 | 6 | file:///datafusion_mv/public/t3/date=2024-04-04/ |",
1054 "+---------------+--------------+------------+------+--------------------------------------------------+",
1055 ], &results);
1056
1057 Ok(())
1058 }
1059
1060 #[tokio::test]
1061 async fn test_list_catalog_schema_and_table() -> Result<()> {
1062 let ctx = setup().await.context("setup")?;
1063
1064 let results = ctx
1065 .sql(
1066 "SELECT * FROM file_metadata_test_view
1067 WHERE table_catalog = 'datafusion' AND table_schema = 'public' AND table_name = 't1'",
1068 )
1069 .await?
1070 .collect()
1071 .await?;
1072
1073 assert_batches_sorted_eq!(
1074 &[
1075 "+---------------+--------------+------------+------+-----------------------+",
1076 "| table_catalog | table_schema | table_name | size | file_path |",
1077 "+---------------+--------------+------------+------+-----------------------+",
1078 "| datafusion | public | t1 | 6 | file:///t1/year=2021/ |",
1079 "| datafusion | public | t1 | 6 | file:///t1/year=2022/ |",
1080 "| datafusion | public | t1 | 6 | file:///t1/year=2023/ |",
1081 "| datafusion | public | t1 | 6 | file:///t1/year=2024/ |",
1082 "+---------------+--------------+------------+------+-----------------------+",
1083 ],
1084 &results
1085 );
1086
1087 Ok(())
1088 }
1089
1090 #[tokio::test]
1091 async fn test_list_table_only() -> Result<()> {
1092 let ctx = setup().await.context("setup")?;
1093
1094 let results = ctx
1095 .sql(
1096 "SELECT * FROM file_metadata_test_view
1097 WHERE table_name = 't1'",
1098 )
1099 .await?
1100 .collect()
1101 .await?;
1102
1103 assert_batches_sorted_eq!(
1104 &[
1105 "+---------------+--------------+------------+------+----------------------------------------+",
1106 "| table_catalog | table_schema | table_name | size | file_path |",
1107 "+---------------+--------------+------------+------+----------------------------------------+",
1108 "| datafusion | private | t1 | 6 | file:///private/t1/year=2021/month=01/ |",
1109 "| datafusion | private | t1 | 6 | file:///private/t1/year=2022/month=02/ |",
1110 "| datafusion | private | t1 | 6 | file:///private/t1/year=2023/month=03/ |",
1111 "| datafusion | private | t1 | 6 | file:///private/t1/year=2024/month=04/ |",
1112 "| datafusion | public | t1 | 6 | file:///t1/year=2021/ |",
1113 "| datafusion | public | t1 | 6 | file:///t1/year=2022/ |",
1114 "| datafusion | public | t1 | 6 | file:///t1/year=2023/ |",
1115 "| datafusion | public | t1 | 6 | file:///t1/year=2024/ |",
1116 "+---------------+--------------+------------+------+----------------------------------------+",
1117 ],
1118 &results
1119 );
1120
1121 Ok(())
1122 }
1123}