paimon_datafusion/
catalog.rs1use std::any::Any;
21use std::collections::HashMap;
22use std::fmt::Debug;
23use std::sync::Arc;
24
25use async_trait::async_trait;
26use datafusion::catalog::{CatalogProvider, SchemaProvider};
27use datafusion::datasource::TableProvider;
28use datafusion::error::Result as DFResult;
29use paimon::catalog::{Catalog, Identifier};
30
31use crate::error::to_datafusion_error;
32use crate::runtime::{await_with_runtime, block_on_with_runtime};
33use crate::system_tables;
34use crate::table::PaimonTableProvider;
35
36pub struct PaimonCatalogProvider {
42 catalog: Arc<dyn Catalog>,
44}
45
46impl Debug for PaimonCatalogProvider {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 f.debug_struct("PaimonCatalogProvider").finish()
49 }
50}
51
52impl PaimonCatalogProvider {
53 pub fn new(catalog: Arc<dyn Catalog>) -> Self {
57 PaimonCatalogProvider { catalog }
58 }
59}
60
61impl CatalogProvider for PaimonCatalogProvider {
62 fn as_any(&self) -> &dyn Any {
63 self
64 }
65
66 fn schema_names(&self) -> Vec<String> {
67 let catalog = Arc::clone(&self.catalog);
68 block_on_with_runtime(
69 async move { catalog.list_databases().await.unwrap_or_default() },
70 "paimon catalog access thread panicked",
71 )
72 }
73
74 fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
75 let catalog = Arc::clone(&self.catalog);
76 let name = name.to_string();
77 block_on_with_runtime(
78 async move {
79 match catalog.get_database(&name).await {
80 Ok(_) => Some(
81 Arc::new(PaimonSchemaProvider::new(Arc::clone(&catalog), name))
82 as Arc<dyn SchemaProvider>,
83 ),
84 Err(paimon::Error::DatabaseNotExist { .. }) => None,
85 Err(_) => None,
86 }
87 },
88 "paimon catalog access thread panicked",
89 )
90 }
91
92 fn register_schema(
93 &self,
94 name: &str,
95 _schema: Arc<dyn SchemaProvider>,
96 ) -> DFResult<Option<Arc<dyn SchemaProvider>>> {
97 let catalog = Arc::clone(&self.catalog);
98 let name = name.to_string();
99 block_on_with_runtime(
100 async move {
101 catalog
102 .create_database(&name, false, HashMap::new())
103 .await
104 .map_err(to_datafusion_error)?;
105 Ok(Some(
106 Arc::new(PaimonSchemaProvider::new(Arc::clone(&catalog), name))
107 as Arc<dyn SchemaProvider>,
108 ))
109 },
110 "paimon catalog access thread panicked",
111 )
112 }
113
114 fn deregister_schema(
115 &self,
116 name: &str,
117 cascade: bool,
118 ) -> DFResult<Option<Arc<dyn SchemaProvider>>> {
119 let catalog = Arc::clone(&self.catalog);
120 let name = name.to_string();
121 block_on_with_runtime(
122 async move {
123 catalog
124 .drop_database(&name, false, cascade)
125 .await
126 .map_err(to_datafusion_error)?;
127 Ok(Some(
128 Arc::new(PaimonSchemaProvider::new(Arc::clone(&catalog), name))
129 as Arc<dyn SchemaProvider>,
130 ))
131 },
132 "paimon catalog access thread panicked",
133 )
134 }
135}
136
137pub struct PaimonSchemaProvider {
142 catalog: Arc<dyn Catalog>,
144 database: String,
146}
147
148impl Debug for PaimonSchemaProvider {
149 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150 f.debug_struct("PaimonSchemaProvider")
151 .field("database", &self.database)
152 .finish()
153 }
154}
155
156impl PaimonSchemaProvider {
157 pub fn new(catalog: Arc<dyn Catalog>, database: String) -> Self {
159 PaimonSchemaProvider { catalog, database }
160 }
161}
162
163#[async_trait]
164impl SchemaProvider for PaimonSchemaProvider {
165 fn as_any(&self) -> &dyn Any {
166 self
167 }
168
169 fn table_names(&self) -> Vec<String> {
170 let catalog = Arc::clone(&self.catalog);
171 let database = self.database.clone();
172 block_on_with_runtime(
173 async move { catalog.list_tables(&database).await.unwrap_or_default() },
174 "paimon catalog access thread panicked",
175 )
176 }
177
178 async fn table(&self, name: &str) -> DFResult<Option<Arc<dyn TableProvider>>> {
179 let (base, system_name) = system_tables::split_object_name(name);
180 if let Some(system_name) = system_name {
181 return await_with_runtime(system_tables::load(
182 Arc::clone(&self.catalog),
183 self.database.clone(),
184 base.to_string(),
185 system_name.to_string(),
186 ))
187 .await;
188 }
189
190 let catalog = Arc::clone(&self.catalog);
191 let identifier = Identifier::new(self.database.clone(), base);
192 await_with_runtime(async move {
193 match catalog.get_table(&identifier).await {
194 Ok(table) => {
195 let provider = PaimonTableProvider::try_new(table)?;
196 Ok(Some(Arc::new(provider) as Arc<dyn TableProvider>))
197 }
198 Err(paimon::Error::TableNotExist { .. }) => Ok(None),
199 Err(e) => Err(to_datafusion_error(e)),
200 }
201 })
202 .await
203 }
204
205 fn table_exist(&self, name: &str) -> bool {
206 let (base, system_name) = system_tables::split_object_name(name);
207 if let Some(system_name) = system_name {
208 if !system_tables::is_registered(system_name) {
209 return false;
210 }
211 }
212
213 let catalog = Arc::clone(&self.catalog);
214 let identifier = Identifier::new(self.database.clone(), base.to_string());
215 block_on_with_runtime(
216 async move {
217 match catalog.get_table(&identifier).await {
218 Ok(_) => true,
219 Err(paimon::Error::TableNotExist { .. }) => false,
220 Err(_) => false,
221 }
222 },
223 "paimon catalog access thread panicked",
224 )
225 }
226
227 fn register_table(
228 &self,
229 _name: String,
230 table: Arc<dyn TableProvider>,
231 ) -> DFResult<Option<Arc<dyn TableProvider>>> {
232 Ok(Some(table))
235 }
236
237 fn deregister_table(&self, name: &str) -> DFResult<Option<Arc<dyn TableProvider>>> {
238 let catalog = Arc::clone(&self.catalog);
239 let identifier = Identifier::new(self.database.clone(), name);
240 block_on_with_runtime(
241 async move {
242 let table = match catalog.get_table(&identifier).await {
244 Ok(t) => t,
245 Err(paimon::Error::TableNotExist { .. }) => return Ok(None),
246 Err(e) => return Err(to_datafusion_error(e)),
247 };
248 let provider = PaimonTableProvider::try_new(table)?;
249 catalog
250 .drop_table(&identifier, false)
251 .await
252 .map_err(to_datafusion_error)?;
253 Ok(Some(Arc::new(provider) as Arc<dyn TableProvider>))
254 },
255 "paimon catalog access thread panicked",
256 )
257 }
258}