datafusion_catalog/dynamic_file/
catalog.rs1use crate::{CatalogProvider, CatalogProviderList, SchemaProvider, TableProvider};
21use async_trait::async_trait;
22use std::fmt::Debug;
23use std::sync::Arc;
24
25#[derive(Debug)]
27pub struct DynamicFileCatalog {
28 inner: Arc<dyn CatalogProviderList>,
30 factory: Arc<dyn UrlTableFactory>,
32}
33
34impl DynamicFileCatalog {
35 pub fn new(
36 inner: Arc<dyn CatalogProviderList>,
37 factory: Arc<dyn UrlTableFactory>,
38 ) -> Self {
39 Self { inner, factory }
40 }
41}
42
43impl CatalogProviderList for DynamicFileCatalog {
44 fn register_catalog(
45 &self,
46 name: String,
47 catalog: Arc<dyn CatalogProvider>,
48 ) -> Option<Arc<dyn CatalogProvider>> {
49 self.inner.register_catalog(name, catalog)
50 }
51
52 fn catalog_names(&self) -> Vec<String> {
53 self.inner.catalog_names()
54 }
55
56 fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
57 self.inner.catalog(name).map(|catalog| {
58 Arc::new(DynamicFileCatalogProvider::new(
59 catalog,
60 Arc::clone(&self.factory),
61 )) as _
62 })
63 }
64}
65
66#[derive(Debug)]
68struct DynamicFileCatalogProvider {
69 inner: Arc<dyn CatalogProvider>,
71 factory: Arc<dyn UrlTableFactory>,
73}
74
75impl DynamicFileCatalogProvider {
76 pub fn new(
77 inner: Arc<dyn CatalogProvider>,
78 factory: Arc<dyn UrlTableFactory>,
79 ) -> Self {
80 Self { inner, factory }
81 }
82}
83
84impl CatalogProvider for DynamicFileCatalogProvider {
85 fn schema_names(&self) -> Vec<String> {
86 self.inner.schema_names()
87 }
88
89 fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
90 self.inner.schema(name).map(|schema| {
91 Arc::new(DynamicFileSchemaProvider::new(
92 schema,
93 Arc::clone(&self.factory),
94 )) as _
95 })
96 }
97
98 fn register_schema(
99 &self,
100 name: &str,
101 schema: Arc<dyn SchemaProvider>,
102 ) -> datafusion_common::Result<Option<Arc<dyn SchemaProvider>>> {
103 self.inner.register_schema(name, schema)
104 }
105}
106
107#[derive(Debug)]
112pub struct DynamicFileSchemaProvider {
113 inner: Arc<dyn SchemaProvider>,
115 factory: Arc<dyn UrlTableFactory>,
117}
118
119impl DynamicFileSchemaProvider {
120 pub fn new(
122 inner: Arc<dyn SchemaProvider>,
123 factory: Arc<dyn UrlTableFactory>,
124 ) -> Self {
125 Self { inner, factory }
126 }
127}
128
129#[async_trait]
130impl SchemaProvider for DynamicFileSchemaProvider {
131 fn table_names(&self) -> Vec<String> {
132 self.inner.table_names()
133 }
134
135 async fn table(
136 &self,
137 name: &str,
138 ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
139 if let Some(table) = self.inner.table(name).await? {
140 return Ok(Some(table));
141 };
142
143 self.factory.try_new(name).await
144 }
145
146 fn register_table(
147 &self,
148 name: String,
149 table: Arc<dyn TableProvider>,
150 ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
151 self.inner.register_table(name, table)
152 }
153
154 fn deregister_table(
155 &self,
156 name: &str,
157 ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
158 self.inner.deregister_table(name)
159 }
160
161 fn table_exist(&self, name: &str) -> bool {
162 self.inner.table_exist(name)
163 }
164}
165
166#[async_trait]
168pub trait UrlTableFactory: Debug + Sync + Send {
169 async fn try_new(
171 &self,
172 url: &str,
173 ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>>;
174}