Skip to main content

paimon/catalog/rest/
rest_catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! REST catalog implementation for Apache Paimon.
19//!
20//! This module provides a REST-based catalog that communicates with
21//! a Paimon REST catalog server for database and table CRUD operations.
22
23use std::collections::HashMap;
24use std::sync::Arc;
25
26use async_trait::async_trait;
27
28use crate::api::rest_api::RESTApi;
29use crate::api::rest_error::RestError;
30use crate::api::PagedList;
31use crate::catalog::{Catalog, Database, Identifier, DB_LOCATION_PROP};
32use crate::common::{CatalogOptions, Options};
33use crate::error::Error;
34use crate::io::FileIO;
35use crate::spec::{Schema, SchemaChange, TableSchema};
36use crate::table::{RESTEnv, Table};
37use crate::Result;
38
39use super::rest_token_file_io::RESTTokenFileIO;
40
41/// REST catalog implementation.
42///
43/// This catalog communicates with a Paimon REST catalog server
44/// for all metadata operations (database and table CRUD).
45///
46/// Corresponds to Python `RESTCatalog` in `pypaimon/catalog/rest/rest_catalog.py`.
47pub struct RESTCatalog {
48    /// The REST API client (shared with RESTEnv).
49    api: Arc<RESTApi>,
50    /// Catalog configuration options.
51    options: Options,
52    /// Warehouse path.
53    warehouse: String,
54    /// Whether data token is enabled for FileIO construction.
55    data_token_enabled: bool,
56}
57
58impl RESTCatalog {
59    /// Create a new REST catalog.
60    ///
61    /// # Arguments
62    /// * `options` - Configuration options containing URI, warehouse, etc.
63    /// * `config_required` - Whether to fetch config from server and merge with options.
64    ///
65    /// # Errors
66    /// Returns an error if required options are missing or if initialization fails.
67    pub async fn new(options: Options, config_required: bool) -> Result<Self> {
68        let warehouse = options
69            .get(CatalogOptions::WAREHOUSE)
70            .cloned()
71            .ok_or_else(|| RestError::BadRequest {
72                message: format!("Missing required option: {}", CatalogOptions::WAREHOUSE),
73            })?;
74
75        let api = Arc::new(RESTApi::new(options.clone(), config_required).await?);
76
77        let data_token_enabled = api
78            .options()
79            .get(CatalogOptions::DATA_TOKEN_ENABLED)
80            .map(|v| v.eq_ignore_ascii_case("true"))
81            .unwrap_or(false);
82
83        let api_options = api.options().clone();
84
85        Ok(Self {
86            api,
87            options: api_options,
88            warehouse,
89            data_token_enabled,
90        })
91    }
92
93    /// Get the warehouse path.
94    pub fn warehouse(&self) -> &str {
95        &self.warehouse
96    }
97
98    /// Get the catalog options.
99    pub fn options(&self) -> &Options {
100        &self.options
101    }
102
103    /// Whether data token is enabled.
104    pub fn data_token_enabled(&self) -> bool {
105        self.data_token_enabled
106    }
107
108    /// List databases with pagination.
109    pub async fn list_databases_paged(
110        &self,
111        max_results: Option<u32>,
112        page_token: Option<&str>,
113        database_name_pattern: Option<&str>,
114    ) -> Result<PagedList<String>> {
115        self.api
116            .list_databases_paged(max_results, page_token, database_name_pattern)
117            .await
118    }
119}
120
121// ============================================================================
122// Catalog trait implementation
123// ============================================================================
124
125#[async_trait]
126impl Catalog for RESTCatalog {
127    // ======================= database methods ===============================
128
129    async fn list_databases(&self) -> Result<Vec<String>> {
130        self.api.list_databases().await
131    }
132
133    async fn create_database(
134        &self,
135        name: &str,
136        ignore_if_exists: bool,
137        properties: HashMap<String, String>,
138    ) -> Result<()> {
139        let result = self
140            .api
141            .create_database(name, Some(properties))
142            .await
143            .map_err(|e| map_rest_error_for_database(e, name));
144        ignore_error_if(result, |e| {
145            ignore_if_exists && matches!(e, Error::DatabaseAlreadyExist { .. })
146        })
147    }
148
149    async fn get_database(&self, name: &str) -> Result<Database> {
150        let response = self
151            .api
152            .get_database(name)
153            .await
154            .map_err(|e| map_rest_error_for_database(e, name))?;
155
156        let mut options = response.options;
157        if let Some(location) = response.location {
158            options.insert(DB_LOCATION_PROP.to_string(), location);
159        }
160
161        Ok(Database::new(name.to_string(), options, None))
162    }
163
164    async fn drop_database(
165        &self,
166        name: &str,
167        ignore_if_not_exists: bool,
168        cascade: bool,
169    ) -> Result<()> {
170        // If not cascade, check if database is empty first
171        if !cascade {
172            let tables = match self.api.list_tables(name).await {
173                Ok(tables) => tables,
174                Err(err) => {
175                    return ignore_error_if(Err(map_rest_error_for_database(err, name)), |e| {
176                        ignore_if_not_exists && matches!(e, Error::DatabaseNotExist { .. })
177                    });
178                }
179            };
180            if !tables.is_empty() {
181                return Err(Error::DatabaseNotEmpty {
182                    database: name.to_string(),
183                });
184            }
185        }
186
187        let result = self
188            .api
189            .drop_database(name)
190            .await
191            .map_err(|e| map_rest_error_for_database(e, name));
192        ignore_error_if(result, |e| {
193            ignore_if_not_exists && matches!(e, Error::DatabaseNotExist { .. })
194        })
195    }
196
197    // ======================= table methods ===============================
198
199    async fn get_table(&self, identifier: &Identifier) -> Result<Table> {
200        let response = self
201            .api
202            .get_table(identifier)
203            .await
204            .map_err(|e| map_rest_error_for_table(e, identifier))?;
205
206        // Extract schema from response
207        let schema = response.schema.ok_or_else(|| Error::DataInvalid {
208            message: format!("Table {} response missing schema", identifier.full_name()),
209            source: None,
210        })?;
211
212        let schema_id = response.schema_id.ok_or_else(|| Error::DataInvalid {
213            message: format!(
214                "Table {} response missing schema_id",
215                identifier.full_name()
216            ),
217            source: None,
218        })?;
219        let table_schema = TableSchema::new(schema_id, &schema);
220
221        // Extract table path from response
222        let table_path = response.path.ok_or_else(|| Error::DataInvalid {
223            message: format!("Table {} response missing path", identifier.full_name()),
224            source: None,
225        })?;
226
227        // Check if the table is external
228        let is_external = response.is_external.ok_or_else(|| Error::DataInvalid {
229            message: format!(
230                "Table {} response missing is_external",
231                identifier.full_name()
232            ),
233            source: None,
234        })?;
235
236        // Extract table uuid for RESTEnv
237        let uuid = response.id.ok_or_else(|| Error::DataInvalid {
238            message: format!(
239                "Table {} response missing id (uuid)",
240                identifier.full_name()
241            ),
242            source: None,
243        })?;
244
245        // Build FileIO based on data_token_enabled and is_external
246        // TODO Support token cache and direct oss access
247        let file_io = if self.data_token_enabled && !is_external {
248            // Use RESTTokenFileIO to get token-based FileIO
249            let token_file_io =
250                RESTTokenFileIO::new(identifier.clone(), table_path.clone(), self.options.clone());
251            token_file_io.build_file_io().await?
252        } else {
253            // Use standard FileIO from path
254            FileIO::from_path(&table_path)?.build()?
255        };
256
257        let rest_env = RESTEnv::new(identifier.clone(), uuid, self.api.clone());
258
259        Ok(Table::new(
260            file_io,
261            identifier.clone(),
262            table_path,
263            table_schema,
264            Some(rest_env),
265        ))
266    }
267
268    async fn list_tables(&self, database_name: &str) -> Result<Vec<String>> {
269        self.api
270            .list_tables(database_name)
271            .await
272            .map_err(|e| map_rest_error_for_database(e, database_name))
273    }
274
275    async fn create_table(
276        &self,
277        identifier: &Identifier,
278        creation: Schema,
279        ignore_if_exists: bool,
280    ) -> Result<()> {
281        let result = self
282            .api
283            .create_table(identifier, creation)
284            .await
285            .map_err(|e| map_rest_error_for_table(e, identifier));
286        ignore_error_if(result, |e| {
287            ignore_if_exists && matches!(e, Error::TableAlreadyExist { .. })
288        })
289    }
290
291    async fn drop_table(&self, identifier: &Identifier, ignore_if_not_exists: bool) -> Result<()> {
292        let result = self
293            .api
294            .drop_table(identifier)
295            .await
296            .map_err(|e| map_rest_error_for_table(e, identifier));
297        ignore_error_if(result, |e| {
298            ignore_if_not_exists && matches!(e, Error::TableNotExist { .. })
299        })
300    }
301
302    async fn rename_table(
303        &self,
304        from: &Identifier,
305        to: &Identifier,
306        ignore_if_not_exists: bool,
307    ) -> Result<()> {
308        let result = self
309            .api
310            .rename_table(from, to)
311            .await
312            .map_err(|e| map_rest_error_for_table(e, from))
313            // Remap TableAlreadyExist to use destination identifier
314            .map_err(|e| match e {
315                Error::TableAlreadyExist { .. } => Error::TableAlreadyExist {
316                    full_name: to.full_name(),
317                },
318                other => other,
319            });
320        ignore_error_if(result, |e| {
321            ignore_if_not_exists && matches!(e, Error::TableNotExist { .. })
322        })
323    }
324
325    async fn alter_table(
326        &self,
327        _identifier: &Identifier,
328        _changes: Vec<SchemaChange>,
329        _ignore_if_not_exists: bool,
330    ) -> Result<()> {
331        // TODO: Implement alter_table when RESTApi supports it
332        Err(Error::Unsupported {
333            message: "Alter table is not yet implemented for REST catalog".to_string(),
334        })
335    }
336}
337// ============================================================================
338// Error mapping helpers
339// ============================================================================
340
341/// Map a REST API error to a catalog-level database error.
342///
343/// Converts `RestError::NoSuchResource` -> `Error::DatabaseNotExist`,
344/// `RestError::AlreadyExists` -> `Error::DatabaseAlreadyExist`,
345/// and passes through other errors via `Error::RestApi`.
346fn map_rest_error_for_database(err: Error, database_name: &str) -> Error {
347    match err {
348        Error::RestApi {
349            source: RestError::NoSuchResource { .. },
350        } => Error::DatabaseNotExist {
351            database: database_name.to_string(),
352        },
353        Error::RestApi {
354            source: RestError::AlreadyExists { .. },
355        } => Error::DatabaseAlreadyExist {
356            database: database_name.to_string(),
357        },
358        other => other,
359    }
360}
361
362/// Map a REST API error to a catalog-level table error.
363///
364/// Converts `RestError::NoSuchResource` -> `Error::TableNotExist`,
365/// `RestError::AlreadyExists` -> `Error::TableAlreadyExist`,
366/// and passes through other errors via `Error::RestApi`.
367fn map_rest_error_for_table(err: Error, identifier: &Identifier) -> Error {
368    match err {
369        Error::RestApi {
370            source: RestError::NoSuchResource { .. },
371        } => Error::TableNotExist {
372            full_name: identifier.full_name(),
373        },
374        Error::RestApi {
375            source: RestError::AlreadyExists { .. },
376        } => Error::TableAlreadyExist {
377            full_name: identifier.full_name(),
378        },
379        other => other,
380    }
381}
382
383/// Execute a fallible operation and ignore a specific error variant.
384///
385/// If the operation succeeds, returns `Ok(())`.
386/// If it fails with an error that `should_ignore` returns `true` for, returns `Ok(())`.
387/// Otherwise, returns the error.
388fn ignore_error_if<F>(result: Result<()>, should_ignore: F) -> Result<()>
389where
390    F: Fn(&Error) -> bool,
391{
392    result.or_else(|err| {
393        if should_ignore(&err) {
394            Ok(())
395        } else {
396            Err(err)
397        }
398    })
399}