1use std::collections::HashMap;
24use std::sync::Arc;
25
26use async_trait::async_trait;
27
28use crate::api::rest_api::RESTApi;
29use crate::api::rest_error::RestError;
30use crate::api::PagedList;
31use crate::catalog::{Catalog, Database, Identifier, DB_LOCATION_PROP};
32use crate::common::{CatalogOptions, Options};
33use crate::error::Error;
34use crate::io::FileIO;
35use crate::spec::{Schema, SchemaChange, TableSchema};
36use crate::table::{RESTEnv, Table};
37use crate::Result;
38
39use super::rest_token_file_io::RESTTokenFileIO;
40
41pub struct RESTCatalog {
48 api: Arc<RESTApi>,
50 options: Options,
52 warehouse: String,
54 data_token_enabled: bool,
56}
57
58impl RESTCatalog {
59 pub async fn new(options: Options, config_required: bool) -> Result<Self> {
68 let warehouse = options
69 .get(CatalogOptions::WAREHOUSE)
70 .cloned()
71 .ok_or_else(|| RestError::BadRequest {
72 message: format!("Missing required option: {}", CatalogOptions::WAREHOUSE),
73 })?;
74
75 let api = Arc::new(RESTApi::new(options.clone(), config_required).await?);
76
77 let data_token_enabled = api
78 .options()
79 .get(CatalogOptions::DATA_TOKEN_ENABLED)
80 .map(|v| v.eq_ignore_ascii_case("true"))
81 .unwrap_or(false);
82
83 let api_options = api.options().clone();
84
85 Ok(Self {
86 api,
87 options: api_options,
88 warehouse,
89 data_token_enabled,
90 })
91 }
92
93 pub fn warehouse(&self) -> &str {
95 &self.warehouse
96 }
97
98 pub fn options(&self) -> &Options {
100 &self.options
101 }
102
103 pub fn data_token_enabled(&self) -> bool {
105 self.data_token_enabled
106 }
107
108 pub async fn list_databases_paged(
110 &self,
111 max_results: Option<u32>,
112 page_token: Option<&str>,
113 database_name_pattern: Option<&str>,
114 ) -> Result<PagedList<String>> {
115 self.api
116 .list_databases_paged(max_results, page_token, database_name_pattern)
117 .await
118 }
119}
120
121#[async_trait]
126impl Catalog for RESTCatalog {
127 async fn list_databases(&self) -> Result<Vec<String>> {
130 self.api.list_databases().await
131 }
132
133 async fn create_database(
134 &self,
135 name: &str,
136 ignore_if_exists: bool,
137 properties: HashMap<String, String>,
138 ) -> Result<()> {
139 let result = self
140 .api
141 .create_database(name, Some(properties))
142 .await
143 .map_err(|e| map_rest_error_for_database(e, name));
144 ignore_error_if(result, |e| {
145 ignore_if_exists && matches!(e, Error::DatabaseAlreadyExist { .. })
146 })
147 }
148
149 async fn get_database(&self, name: &str) -> Result<Database> {
150 let response = self
151 .api
152 .get_database(name)
153 .await
154 .map_err(|e| map_rest_error_for_database(e, name))?;
155
156 let mut options = response.options;
157 if let Some(location) = response.location {
158 options.insert(DB_LOCATION_PROP.to_string(), location);
159 }
160
161 Ok(Database::new(name.to_string(), options, None))
162 }
163
164 async fn drop_database(
165 &self,
166 name: &str,
167 ignore_if_not_exists: bool,
168 cascade: bool,
169 ) -> Result<()> {
170 if !cascade {
172 let tables = match self.api.list_tables(name).await {
173 Ok(tables) => tables,
174 Err(err) => {
175 return ignore_error_if(Err(map_rest_error_for_database(err, name)), |e| {
176 ignore_if_not_exists && matches!(e, Error::DatabaseNotExist { .. })
177 });
178 }
179 };
180 if !tables.is_empty() {
181 return Err(Error::DatabaseNotEmpty {
182 database: name.to_string(),
183 });
184 }
185 }
186
187 let result = self
188 .api
189 .drop_database(name)
190 .await
191 .map_err(|e| map_rest_error_for_database(e, name));
192 ignore_error_if(result, |e| {
193 ignore_if_not_exists && matches!(e, Error::DatabaseNotExist { .. })
194 })
195 }
196
197 async fn get_table(&self, identifier: &Identifier) -> Result<Table> {
200 let response = self
201 .api
202 .get_table(identifier)
203 .await
204 .map_err(|e| map_rest_error_for_table(e, identifier))?;
205
206 let schema = response.schema.ok_or_else(|| Error::DataInvalid {
208 message: format!("Table {} response missing schema", identifier.full_name()),
209 source: None,
210 })?;
211
212 let schema_id = response.schema_id.ok_or_else(|| Error::DataInvalid {
213 message: format!(
214 "Table {} response missing schema_id",
215 identifier.full_name()
216 ),
217 source: None,
218 })?;
219 let table_schema = TableSchema::new(schema_id, &schema);
220
221 let table_path = response.path.ok_or_else(|| Error::DataInvalid {
223 message: format!("Table {} response missing path", identifier.full_name()),
224 source: None,
225 })?;
226
227 let is_external = response.is_external.ok_or_else(|| Error::DataInvalid {
229 message: format!(
230 "Table {} response missing is_external",
231 identifier.full_name()
232 ),
233 source: None,
234 })?;
235
236 let uuid = response.id.ok_or_else(|| Error::DataInvalid {
238 message: format!(
239 "Table {} response missing id (uuid)",
240 identifier.full_name()
241 ),
242 source: None,
243 })?;
244
245 let file_io = if self.data_token_enabled && !is_external {
248 let token_file_io =
250 RESTTokenFileIO::new(identifier.clone(), table_path.clone(), self.options.clone());
251 token_file_io.build_file_io().await?
252 } else {
253 FileIO::from_path(&table_path)?.build()?
255 };
256
257 let rest_env = RESTEnv::new(identifier.clone(), uuid, self.api.clone());
258
259 Ok(Table::new(
260 file_io,
261 identifier.clone(),
262 table_path,
263 table_schema,
264 Some(rest_env),
265 ))
266 }
267
268 async fn list_tables(&self, database_name: &str) -> Result<Vec<String>> {
269 self.api
270 .list_tables(database_name)
271 .await
272 .map_err(|e| map_rest_error_for_database(e, database_name))
273 }
274
275 async fn create_table(
276 &self,
277 identifier: &Identifier,
278 creation: Schema,
279 ignore_if_exists: bool,
280 ) -> Result<()> {
281 let result = self
282 .api
283 .create_table(identifier, creation)
284 .await
285 .map_err(|e| map_rest_error_for_table(e, identifier));
286 ignore_error_if(result, |e| {
287 ignore_if_exists && matches!(e, Error::TableAlreadyExist { .. })
288 })
289 }
290
291 async fn drop_table(&self, identifier: &Identifier, ignore_if_not_exists: bool) -> Result<()> {
292 let result = self
293 .api
294 .drop_table(identifier)
295 .await
296 .map_err(|e| map_rest_error_for_table(e, identifier));
297 ignore_error_if(result, |e| {
298 ignore_if_not_exists && matches!(e, Error::TableNotExist { .. })
299 })
300 }
301
302 async fn rename_table(
303 &self,
304 from: &Identifier,
305 to: &Identifier,
306 ignore_if_not_exists: bool,
307 ) -> Result<()> {
308 let result = self
309 .api
310 .rename_table(from, to)
311 .await
312 .map_err(|e| map_rest_error_for_table(e, from))
313 .map_err(|e| match e {
315 Error::TableAlreadyExist { .. } => Error::TableAlreadyExist {
316 full_name: to.full_name(),
317 },
318 other => other,
319 });
320 ignore_error_if(result, |e| {
321 ignore_if_not_exists && matches!(e, Error::TableNotExist { .. })
322 })
323 }
324
325 async fn alter_table(
326 &self,
327 _identifier: &Identifier,
328 _changes: Vec<SchemaChange>,
329 _ignore_if_not_exists: bool,
330 ) -> Result<()> {
331 Err(Error::Unsupported {
333 message: "Alter table is not yet implemented for REST catalog".to_string(),
334 })
335 }
336}
337fn map_rest_error_for_database(err: Error, database_name: &str) -> Error {
347 match err {
348 Error::RestApi {
349 source: RestError::NoSuchResource { .. },
350 } => Error::DatabaseNotExist {
351 database: database_name.to_string(),
352 },
353 Error::RestApi {
354 source: RestError::AlreadyExists { .. },
355 } => Error::DatabaseAlreadyExist {
356 database: database_name.to_string(),
357 },
358 other => other,
359 }
360}
361
362fn map_rest_error_for_table(err: Error, identifier: &Identifier) -> Error {
368 match err {
369 Error::RestApi {
370 source: RestError::NoSuchResource { .. },
371 } => Error::TableNotExist {
372 full_name: identifier.full_name(),
373 },
374 Error::RestApi {
375 source: RestError::AlreadyExists { .. },
376 } => Error::TableAlreadyExist {
377 full_name: identifier.full_name(),
378 },
379 other => other,
380 }
381}
382
383fn ignore_error_if<F>(result: Result<()>, should_ignore: F) -> Result<()>
389where
390 F: Fn(&Error) -> bool,
391{
392 result.or_else(|err| {
393 if should_ignore(&err) {
394 Ok(())
395 } else {
396 Err(err)
397 }
398 })
399}