Skip to main content

paimon_datafusion/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Paimon catalog integration for DataFusion.
19
20use std::any::Any;
21use std::collections::HashMap;
22use std::fmt::Debug;
23use std::sync::Arc;
24
25use async_trait::async_trait;
26use datafusion::catalog::{CatalogProvider, SchemaProvider};
27use datafusion::datasource::TableProvider;
28use datafusion::error::Result as DFResult;
29use paimon::catalog::{Catalog, Identifier};
30
31use crate::error::to_datafusion_error;
32use crate::runtime::{await_with_runtime, block_on_with_runtime};
33use crate::system_tables;
34use crate::table::PaimonTableProvider;
35
36/// Provides an interface to manage and access multiple schemas (databases)
37/// within a Paimon [`Catalog`].
38///
39/// This provider uses lazy loading - databases and tables are fetched
40/// on-demand from the catalog, ensuring data is always fresh.
41pub struct PaimonCatalogProvider {
42    /// Reference to the Paimon catalog.
43    catalog: Arc<dyn Catalog>,
44}
45
46impl Debug for PaimonCatalogProvider {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        f.debug_struct("PaimonCatalogProvider").finish()
49    }
50}
51
52impl PaimonCatalogProvider {
53    /// Creates a new [`PaimonCatalogProvider`].
54    ///
55    /// All data is loaded lazily when accessed.
56    pub fn new(catalog: Arc<dyn Catalog>) -> Self {
57        PaimonCatalogProvider { catalog }
58    }
59}
60
61impl CatalogProvider for PaimonCatalogProvider {
62    fn as_any(&self) -> &dyn Any {
63        self
64    }
65
66    fn schema_names(&self) -> Vec<String> {
67        let catalog = Arc::clone(&self.catalog);
68        block_on_with_runtime(
69            async move { catalog.list_databases().await.unwrap_or_default() },
70            "paimon catalog access thread panicked",
71        )
72    }
73
74    fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
75        let catalog = Arc::clone(&self.catalog);
76        let name = name.to_string();
77        block_on_with_runtime(
78            async move {
79                match catalog.get_database(&name).await {
80                    Ok(_) => Some(
81                        Arc::new(PaimonSchemaProvider::new(Arc::clone(&catalog), name))
82                            as Arc<dyn SchemaProvider>,
83                    ),
84                    Err(paimon::Error::DatabaseNotExist { .. }) => None,
85                    Err(_) => None,
86                }
87            },
88            "paimon catalog access thread panicked",
89        )
90    }
91
92    fn register_schema(
93        &self,
94        name: &str,
95        _schema: Arc<dyn SchemaProvider>,
96    ) -> DFResult<Option<Arc<dyn SchemaProvider>>> {
97        let catalog = Arc::clone(&self.catalog);
98        let name = name.to_string();
99        block_on_with_runtime(
100            async move {
101                catalog
102                    .create_database(&name, false, HashMap::new())
103                    .await
104                    .map_err(to_datafusion_error)?;
105                Ok(Some(
106                    Arc::new(PaimonSchemaProvider::new(Arc::clone(&catalog), name))
107                        as Arc<dyn SchemaProvider>,
108                ))
109            },
110            "paimon catalog access thread panicked",
111        )
112    }
113
114    fn deregister_schema(
115        &self,
116        name: &str,
117        cascade: bool,
118    ) -> DFResult<Option<Arc<dyn SchemaProvider>>> {
119        let catalog = Arc::clone(&self.catalog);
120        let name = name.to_string();
121        block_on_with_runtime(
122            async move {
123                catalog
124                    .drop_database(&name, false, cascade)
125                    .await
126                    .map_err(to_datafusion_error)?;
127                Ok(Some(
128                    Arc::new(PaimonSchemaProvider::new(Arc::clone(&catalog), name))
129                        as Arc<dyn SchemaProvider>,
130                ))
131            },
132            "paimon catalog access thread panicked",
133        )
134    }
135}
136
137/// Represents a [`SchemaProvider`] for the Paimon [`Catalog`], managing
138/// access to table providers within a specific database.
139///
140/// Tables are loaded lazily when accessed via the `table()` method.
141pub struct PaimonSchemaProvider {
142    /// Reference to the Paimon catalog.
143    catalog: Arc<dyn Catalog>,
144    /// Database name this schema represents.
145    database: String,
146}
147
148impl Debug for PaimonSchemaProvider {
149    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150        f.debug_struct("PaimonSchemaProvider")
151            .field("database", &self.database)
152            .finish()
153    }
154}
155
156impl PaimonSchemaProvider {
157    /// Creates a new [`PaimonSchemaProvider`] for the given database.
158    pub fn new(catalog: Arc<dyn Catalog>, database: String) -> Self {
159        PaimonSchemaProvider { catalog, database }
160    }
161}
162
163#[async_trait]
164impl SchemaProvider for PaimonSchemaProvider {
165    fn as_any(&self) -> &dyn Any {
166        self
167    }
168
169    fn table_names(&self) -> Vec<String> {
170        let catalog = Arc::clone(&self.catalog);
171        let database = self.database.clone();
172        block_on_with_runtime(
173            async move { catalog.list_tables(&database).await.unwrap_or_default() },
174            "paimon catalog access thread panicked",
175        )
176    }
177
178    async fn table(&self, name: &str) -> DFResult<Option<Arc<dyn TableProvider>>> {
179        let (base, system_name) = system_tables::split_object_name(name);
180        if let Some(system_name) = system_name {
181            return await_with_runtime(system_tables::load(
182                Arc::clone(&self.catalog),
183                self.database.clone(),
184                base.to_string(),
185                system_name.to_string(),
186            ))
187            .await;
188        }
189
190        let catalog = Arc::clone(&self.catalog);
191        let identifier = Identifier::new(self.database.clone(), base);
192        await_with_runtime(async move {
193            match catalog.get_table(&identifier).await {
194                Ok(table) => {
195                    let provider = PaimonTableProvider::try_new(table)?;
196                    Ok(Some(Arc::new(provider) as Arc<dyn TableProvider>))
197                }
198                Err(paimon::Error::TableNotExist { .. }) => Ok(None),
199                Err(e) => Err(to_datafusion_error(e)),
200            }
201        })
202        .await
203    }
204
205    fn table_exist(&self, name: &str) -> bool {
206        let (base, system_name) = system_tables::split_object_name(name);
207        if let Some(system_name) = system_name {
208            if !system_tables::is_registered(system_name) {
209                return false;
210            }
211        }
212
213        let catalog = Arc::clone(&self.catalog);
214        let identifier = Identifier::new(self.database.clone(), base.to_string());
215        block_on_with_runtime(
216            async move {
217                match catalog.get_table(&identifier).await {
218                    Ok(_) => true,
219                    Err(paimon::Error::TableNotExist { .. }) => false,
220                    Err(_) => false,
221                }
222            },
223            "paimon catalog access thread panicked",
224        )
225    }
226
227    fn register_table(
228        &self,
229        _name: String,
230        table: Arc<dyn TableProvider>,
231    ) -> DFResult<Option<Arc<dyn TableProvider>>> {
232        // DataFusion calls register_table after table creation, so we just
233        // acknowledge it here.
234        Ok(Some(table))
235    }
236
237    fn deregister_table(&self, name: &str) -> DFResult<Option<Arc<dyn TableProvider>>> {
238        let catalog = Arc::clone(&self.catalog);
239        let identifier = Identifier::new(self.database.clone(), name);
240        block_on_with_runtime(
241            async move {
242                // Try to get the table first so we can return it.
243                let table = match catalog.get_table(&identifier).await {
244                    Ok(t) => t,
245                    Err(paimon::Error::TableNotExist { .. }) => return Ok(None),
246                    Err(e) => return Err(to_datafusion_error(e)),
247                };
248                let provider = PaimonTableProvider::try_new(table)?;
249                catalog
250                    .drop_table(&identifier, false)
251                    .await
252                    .map_err(to_datafusion_error)?;
253                Ok(Some(Arc::new(provider) as Arc<dyn TableProvider>))
254            },
255            "paimon catalog access thread panicked",
256        )
257    }
258}