Skip to main content

datafusion_catalog/dynamic_file/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DynamicFileCatalog`] that creates tables from file paths
19
20use crate::{CatalogProvider, CatalogProviderList, SchemaProvider, TableProvider};
21use async_trait::async_trait;
22use std::fmt::Debug;
23use std::sync::Arc;
24
25/// Wrap another catalog provider list
26#[derive(Debug)]
27pub struct DynamicFileCatalog {
28    /// The inner catalog provider list
29    inner: Arc<dyn CatalogProviderList>,
30    /// The factory that can create a table provider from the file path
31    factory: Arc<dyn UrlTableFactory>,
32}
33
34impl DynamicFileCatalog {
35    pub fn new(
36        inner: Arc<dyn CatalogProviderList>,
37        factory: Arc<dyn UrlTableFactory>,
38    ) -> Self {
39        Self { inner, factory }
40    }
41}
42
43impl CatalogProviderList for DynamicFileCatalog {
44    fn register_catalog(
45        &self,
46        name: String,
47        catalog: Arc<dyn CatalogProvider>,
48    ) -> Option<Arc<dyn CatalogProvider>> {
49        self.inner.register_catalog(name, catalog)
50    }
51
52    fn catalog_names(&self) -> Vec<String> {
53        self.inner.catalog_names()
54    }
55
56    fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
57        self.inner.catalog(name).map(|catalog| {
58            Arc::new(DynamicFileCatalogProvider::new(
59                catalog,
60                Arc::clone(&self.factory),
61            )) as _
62        })
63    }
64}
65
66/// Wraps another catalog provider
67#[derive(Debug)]
68struct DynamicFileCatalogProvider {
69    /// The inner catalog provider
70    inner: Arc<dyn CatalogProvider>,
71    /// The factory that can create a table provider from the file path
72    factory: Arc<dyn UrlTableFactory>,
73}
74
75impl DynamicFileCatalogProvider {
76    pub fn new(
77        inner: Arc<dyn CatalogProvider>,
78        factory: Arc<dyn UrlTableFactory>,
79    ) -> Self {
80        Self { inner, factory }
81    }
82}
83
84impl CatalogProvider for DynamicFileCatalogProvider {
85    fn schema_names(&self) -> Vec<String> {
86        self.inner.schema_names()
87    }
88
89    fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
90        self.inner.schema(name).map(|schema| {
91            Arc::new(DynamicFileSchemaProvider::new(
92                schema,
93                Arc::clone(&self.factory),
94            )) as _
95        })
96    }
97
98    fn register_schema(
99        &self,
100        name: &str,
101        schema: Arc<dyn SchemaProvider>,
102    ) -> datafusion_common::Result<Option<Arc<dyn SchemaProvider>>> {
103        self.inner.register_schema(name, schema)
104    }
105}
106
107/// Implements the [DynamicFileSchemaProvider] that can create tables provider from the file path.
108///
109/// The provider will try to create a table provider from the file path if the table provider
110/// isn't exist in the inner schema provider.
111#[derive(Debug)]
112pub struct DynamicFileSchemaProvider {
113    /// The inner schema provider
114    inner: Arc<dyn SchemaProvider>,
115    /// The factory that can create a table provider from the file path
116    factory: Arc<dyn UrlTableFactory>,
117}
118
119impl DynamicFileSchemaProvider {
120    /// Create a new [DynamicFileSchemaProvider] with the given inner schema provider.
121    pub fn new(
122        inner: Arc<dyn SchemaProvider>,
123        factory: Arc<dyn UrlTableFactory>,
124    ) -> Self {
125        Self { inner, factory }
126    }
127}
128
129#[async_trait]
130impl SchemaProvider for DynamicFileSchemaProvider {
131    fn table_names(&self) -> Vec<String> {
132        self.inner.table_names()
133    }
134
135    async fn table(
136        &self,
137        name: &str,
138    ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
139        if let Some(table) = self.inner.table(name).await? {
140            return Ok(Some(table));
141        };
142
143        self.factory.try_new(name).await
144    }
145
146    fn register_table(
147        &self,
148        name: String,
149        table: Arc<dyn TableProvider>,
150    ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
151        self.inner.register_table(name, table)
152    }
153
154    fn deregister_table(
155        &self,
156        name: &str,
157    ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
158        self.inner.deregister_table(name)
159    }
160
161    fn table_exist(&self, name: &str) -> bool {
162        self.inner.table_exist(name)
163    }
164}
165
166/// [UrlTableFactory] is a factory that can create a table provider from the given url.
167#[async_trait]
168pub trait UrlTableFactory: Debug + Sync + Send {
169    /// create a new table provider from the provided url
170    async fn try_new(
171        &self,
172        url: &str,
173    ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>>;
174}