1use std::collections::{HashMap, HashSet};
19use std::time::Duration;
20
21use async_trait::async_trait;
22use iceberg::io::FileIO;
23use iceberg::spec::{TableMetadata, TableMetadataBuilder};
24use iceberg::table::Table;
25use iceberg::{
26 Catalog, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, TableCommit,
27 TableCreation, TableIdent,
28};
29use sqlx::any::{AnyPoolOptions, AnyQueryResult, AnyRow, install_default_drivers};
30use sqlx::{Any, AnyPool, Row, Transaction};
31use typed_builder::TypedBuilder;
32
33use crate::error::{
34 from_sqlx_error, no_such_namespace_err, no_such_table_err, table_already_exists_err,
35};
36
37static CATALOG_TABLE_NAME: &str = "iceberg_tables";
38static CATALOG_FIELD_CATALOG_NAME: &str = "catalog_name";
39static CATALOG_FIELD_TABLE_NAME: &str = "table_name";
40static CATALOG_FIELD_TABLE_NAMESPACE: &str = "table_namespace";
41static CATALOG_FIELD_METADATA_LOCATION_PROP: &str = "metadata_location";
42static CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location";
43static CATALOG_FIELD_RECORD_TYPE: &str = "iceberg_type";
44static CATALOG_FIELD_TABLE_RECORD_TYPE: &str = "TABLE";
45
46static NAMESPACE_TABLE_NAME: &str = "iceberg_namespace_properties";
47static NAMESPACE_FIELD_NAME: &str = "namespace";
48static NAMESPACE_FIELD_PROPERTY_KEY: &str = "property_key";
49static NAMESPACE_FIELD_PROPERTY_VALUE: &str = "property_value";
50
51static NAMESPACE_LOCATION_PROPERTY_KEY: &str = "location";
52
53static MAX_CONNECTIONS: u32 = 10; static IDLE_TIMEOUT: u64 = 10; static TEST_BEFORE_ACQUIRE: bool = true; #[derive(Debug, TypedBuilder)]
66pub struct SqlCatalogConfig {
67 uri: String,
68 name: String,
69 warehouse_location: String,
70 file_io: FileIO,
71 sql_bind_style: SqlBindStyle,
72 #[builder(default)]
73 props: HashMap<String, String>,
74}
75
76#[derive(Debug)]
77pub struct SqlCatalog {
79 name: String,
80 connection: AnyPool,
81 warehouse_location: String,
82 fileio: FileIO,
83 sql_bind_style: SqlBindStyle,
84}
85
86#[derive(Debug, PartialEq)]
87pub enum SqlBindStyle {
89 DollarNumeric,
91 QMark,
93}
94
95impl SqlCatalog {
96 pub async fn new(config: SqlCatalogConfig) -> Result<Self> {
98 install_default_drivers();
99 let max_connections: u32 = config
100 .props
101 .get("pool.max-connections")
102 .map(|v| v.parse().unwrap())
103 .unwrap_or(MAX_CONNECTIONS);
104 let idle_timeout: u64 = config
105 .props
106 .get("pool.idle-timeout")
107 .map(|v| v.parse().unwrap())
108 .unwrap_or(IDLE_TIMEOUT);
109 let test_before_acquire: bool = config
110 .props
111 .get("pool.test-before-acquire")
112 .map(|v| v.parse().unwrap())
113 .unwrap_or(TEST_BEFORE_ACQUIRE);
114
115 let pool = AnyPoolOptions::new()
116 .max_connections(max_connections)
117 .idle_timeout(Duration::from_secs(idle_timeout))
118 .test_before_acquire(test_before_acquire)
119 .connect(&config.uri)
120 .await
121 .map_err(from_sqlx_error)?;
122
123 sqlx::query(&format!(
124 "CREATE TABLE IF NOT EXISTS {CATALOG_TABLE_NAME} (
125 {CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL,
126 {CATALOG_FIELD_TABLE_NAMESPACE} VARCHAR(255) NOT NULL,
127 {CATALOG_FIELD_TABLE_NAME} VARCHAR(255) NOT NULL,
128 {CATALOG_FIELD_METADATA_LOCATION_PROP} VARCHAR(1000),
129 {CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} VARCHAR(1000),
130 {CATALOG_FIELD_RECORD_TYPE} VARCHAR(5),
131 PRIMARY KEY ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}))"
132 ))
133 .execute(&pool)
134 .await
135 .map_err(from_sqlx_error)?;
136
137 sqlx::query(&format!(
138 "CREATE TABLE IF NOT EXISTS {NAMESPACE_TABLE_NAME} (
139 {CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL,
140 {NAMESPACE_FIELD_NAME} VARCHAR(255) NOT NULL,
141 {NAMESPACE_FIELD_PROPERTY_KEY} VARCHAR(255),
142 {NAMESPACE_FIELD_PROPERTY_VALUE} VARCHAR(1000),
143 PRIMARY KEY ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}))"
144 ))
145 .execute(&pool)
146 .await
147 .map_err(from_sqlx_error)?;
148
149 Ok(SqlCatalog {
150 name: config.name.to_owned(),
151 connection: pool,
152 warehouse_location: config.warehouse_location,
153 fileio: config.file_io,
154 sql_bind_style: config.sql_bind_style,
155 })
156 }
157
158 fn replace_placeholders(&self, query: &str) -> String {
160 match self.sql_bind_style {
161 SqlBindStyle::DollarNumeric => {
162 let mut count = 1;
163 query
164 .chars()
165 .fold(String::with_capacity(query.len()), |mut acc, c| {
166 if c == '?' {
167 acc.push('$');
168 acc.push_str(&count.to_string());
169 count += 1;
170 } else {
171 acc.push(c);
172 }
173 acc
174 })
175 }
176 _ => query.to_owned(),
177 }
178 }
179
180 async fn fetch_rows(&self, query: &str, args: Vec<Option<&str>>) -> Result<Vec<AnyRow>> {
182 let query_with_placeholders = self.replace_placeholders(query);
183
184 let mut sqlx_query = sqlx::query(&query_with_placeholders);
185 for arg in args {
186 sqlx_query = sqlx_query.bind(arg);
187 }
188
189 sqlx_query
190 .fetch_all(&self.connection)
191 .await
192 .map_err(from_sqlx_error)
193 }
194
195 async fn execute(
197 &self,
198 query: &str,
199 args: Vec<Option<&str>>,
200 transaction: Option<&mut Transaction<'_, Any>>,
201 ) -> Result<AnyQueryResult> {
202 let query_with_placeholders = self.replace_placeholders(query);
203
204 let mut sqlx_query = sqlx::query(&query_with_placeholders);
205 for arg in args {
206 sqlx_query = sqlx_query.bind(arg);
207 }
208
209 match transaction {
210 Some(t) => sqlx_query.execute(&mut **t).await.map_err(from_sqlx_error),
211 None => {
212 let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?;
213 let result = sqlx_query.execute(&mut *tx).await.map_err(from_sqlx_error);
214 let _ = tx.commit().await.map_err(from_sqlx_error);
215 result
216 }
217 }
218 }
219}
220
221#[async_trait]
222impl Catalog for SqlCatalog {
223 async fn list_namespaces(
224 &self,
225 parent: Option<&NamespaceIdent>,
226 ) -> Result<Vec<NamespaceIdent>> {
227 let all_namespaces_stmt = format!(
229 "SELECT {CATALOG_FIELD_TABLE_NAMESPACE}
230 FROM {CATALOG_TABLE_NAME}
231 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
232 UNION
233 SELECT {NAMESPACE_FIELD_NAME}
234 FROM {NAMESPACE_TABLE_NAME}
235 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?"
236 );
237
238 let namespace_rows = self
239 .fetch_rows(&all_namespaces_stmt, vec![
240 Some(&self.name),
241 Some(&self.name),
242 ])
243 .await?;
244
245 let mut namespaces = HashSet::<NamespaceIdent>::with_capacity(namespace_rows.len());
246
247 if let Some(parent) = parent {
248 if self.namespace_exists(parent).await? {
249 let parent_str = parent.join(".");
250
251 for row in namespace_rows.iter() {
252 let nsp = row.try_get::<String, _>(0).map_err(from_sqlx_error)?;
253 if nsp != parent_str && nsp.starts_with(&parent_str) {
255 namespaces.insert(NamespaceIdent::from_strs(nsp.split("."))?);
256 }
257 }
258
259 Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
260 } else {
261 no_such_namespace_err(parent)
262 }
263 } else {
264 for row in namespace_rows.iter() {
265 let nsp = row.try_get::<String, _>(0).map_err(from_sqlx_error)?;
266 let mut levels = nsp.split(".").collect::<Vec<&str>>();
267 if !levels.is_empty() {
268 let first_level = levels.drain(..1).collect::<Vec<&str>>();
269 namespaces.insert(NamespaceIdent::from_strs(first_level)?);
270 }
271 }
272
273 Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
274 }
275 }
276
277 async fn create_namespace(
278 &self,
279 namespace: &NamespaceIdent,
280 properties: HashMap<String, String>,
281 ) -> Result<Namespace> {
282 let exists = self.namespace_exists(namespace).await?;
283
284 if exists {
285 return Err(Error::new(
286 iceberg::ErrorKind::Unexpected,
287 format!("Namespace {:?} already exists", namespace),
288 ));
289 }
290
291 let namespace_str = namespace.join(".");
292 let insert = format!(
293 "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE})
294 VALUES (?, ?, ?, ?)");
295 if !properties.is_empty() {
296 let mut insert_properties = properties.clone();
297 insert_properties.insert("exists".to_string(), "true".to_string());
298
299 let mut query_args = Vec::with_capacity(insert_properties.len() * 4);
300 let mut insert_stmt = insert.clone();
301 for (index, (key, value)) in insert_properties.iter().enumerate() {
302 query_args.extend_from_slice(&[
303 Some(self.name.as_str()),
304 Some(namespace_str.as_str()),
305 Some(key.as_str()),
306 Some(value.as_str()),
307 ]);
308 if index > 0 {
309 insert_stmt.push_str(", (?, ?, ?, ?)");
310 }
311 }
312
313 self.execute(&insert_stmt, query_args, None).await?;
314
315 Ok(Namespace::with_properties(
316 namespace.clone(),
317 insert_properties,
318 ))
319 } else {
320 self.execute(
322 &insert,
323 vec![
324 Some(&self.name),
325 Some(&namespace_str),
326 Some("exists"),
327 Some("true"),
328 ],
329 None,
330 )
331 .await?;
332 Ok(Namespace::with_properties(namespace.clone(), properties))
333 }
334 }
335
336 async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
337 let exists = self.namespace_exists(namespace).await?;
338 if exists {
339 let namespace_props = self
340 .fetch_rows(
341 &format!(
342 "SELECT
343 {NAMESPACE_FIELD_NAME},
344 {NAMESPACE_FIELD_PROPERTY_KEY},
345 {NAMESPACE_FIELD_PROPERTY_VALUE}
346 FROM {NAMESPACE_TABLE_NAME}
347 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
348 AND {NAMESPACE_FIELD_NAME} = ?"
349 ),
350 vec![Some(&self.name), Some(&namespace.join("."))],
351 )
352 .await?;
353
354 let mut properties = HashMap::with_capacity(namespace_props.len());
355
356 for row in namespace_props {
357 let key = row
358 .try_get::<String, _>(NAMESPACE_FIELD_PROPERTY_KEY)
359 .map_err(from_sqlx_error)?;
360 let value = row
361 .try_get::<String, _>(NAMESPACE_FIELD_PROPERTY_VALUE)
362 .map_err(from_sqlx_error)?;
363
364 properties.insert(key, value);
365 }
366
367 Ok(Namespace::with_properties(namespace.clone(), properties))
368 } else {
369 no_such_namespace_err(namespace)
370 }
371 }
372
373 async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
374 let namespace_str = namespace.join(".");
375
376 let table_namespaces = self
377 .fetch_rows(
378 &format!(
379 "SELECT 1 FROM {CATALOG_TABLE_NAME}
380 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
381 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
382 LIMIT 1"
383 ),
384 vec![Some(&self.name), Some(&namespace_str)],
385 )
386 .await?;
387
388 if !table_namespaces.is_empty() {
389 Ok(true)
390 } else {
391 let namespaces = self
392 .fetch_rows(
393 &format!(
394 "SELECT 1 FROM {NAMESPACE_TABLE_NAME}
395 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
396 AND {NAMESPACE_FIELD_NAME} = ?
397 LIMIT 1"
398 ),
399 vec![Some(&self.name), Some(&namespace_str)],
400 )
401 .await?;
402 if !namespaces.is_empty() {
403 Ok(true)
404 } else {
405 Ok(false)
406 }
407 }
408 }
409
410 async fn update_namespace(
411 &self,
412 namespace: &NamespaceIdent,
413 properties: HashMap<String, String>,
414 ) -> Result<()> {
415 let exists = self.namespace_exists(namespace).await?;
416 if exists {
417 let existing_properties = self.get_namespace(namespace).await?.properties().clone();
418 let namespace_str = namespace.join(".");
419
420 let mut updates = vec![];
421 let mut inserts = vec![];
422
423 for (key, value) in properties.iter() {
424 if existing_properties.contains_key(key) {
425 if existing_properties.get(key) != Some(value) {
426 updates.push((key, value));
427 }
428 } else {
429 inserts.push((key, value));
430 }
431 }
432
433 let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?;
434 let update_stmt = format!(
435 "UPDATE {NAMESPACE_TABLE_NAME} SET {NAMESPACE_FIELD_PROPERTY_VALUE} = ?
436 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
437 AND {NAMESPACE_FIELD_NAME} = ?
438 AND {NAMESPACE_FIELD_PROPERTY_KEY} = ?"
439 );
440
441 let insert_stmt = format!(
442 "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE})
443 VALUES (?, ?, ?, ?)"
444 );
445
446 for (key, value) in updates {
447 self.execute(
448 &update_stmt,
449 vec![
450 Some(value),
451 Some(&self.name),
452 Some(&namespace_str),
453 Some(key),
454 ],
455 Some(&mut tx),
456 )
457 .await?;
458 }
459
460 for (key, value) in inserts {
461 self.execute(
462 &insert_stmt,
463 vec![
464 Some(&self.name),
465 Some(&namespace_str),
466 Some(key),
467 Some(value),
468 ],
469 Some(&mut tx),
470 )
471 .await?;
472 }
473
474 let _ = tx.commit().await.map_err(from_sqlx_error)?;
475
476 Ok(())
477 } else {
478 no_such_namespace_err(namespace)
479 }
480 }
481
482 async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
483 let exists = self.namespace_exists(namespace).await?;
484 if exists {
485 let tables = self.list_tables(namespace).await?;
487 if !tables.is_empty() {
488 return Err(Error::new(
489 iceberg::ErrorKind::Unexpected,
490 format!(
491 "Namespace {:?} is not empty. {} tables exist.",
492 namespace,
493 tables.len()
494 ),
495 ));
496 }
497
498 self.execute(
499 &format!(
500 "DELETE FROM {NAMESPACE_TABLE_NAME}
501 WHERE {NAMESPACE_FIELD_NAME} = ?
502 AND {CATALOG_FIELD_CATALOG_NAME} = ?"
503 ),
504 vec![Some(&namespace.join(".")), Some(&self.name)],
505 None,
506 )
507 .await?;
508
509 Ok(())
510 } else {
511 no_such_namespace_err(namespace)
512 }
513 }
514
515 async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
516 let exists = self.namespace_exists(namespace).await?;
517 if exists {
518 let rows = self
519 .fetch_rows(
520 &format!(
521 "SELECT {CATALOG_FIELD_TABLE_NAME},
522 {CATALOG_FIELD_TABLE_NAMESPACE}
523 FROM {CATALOG_TABLE_NAME}
524 WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
525 AND {CATALOG_FIELD_CATALOG_NAME} = ?
526 AND (
527 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
528 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
529 )",
530 ),
531 vec![Some(&namespace.join(".")), Some(&self.name)],
532 )
533 .await?;
534
535 let mut tables = HashSet::<TableIdent>::with_capacity(rows.len());
536
537 for row in rows.iter() {
538 let tbl = row
539 .try_get::<String, _>(CATALOG_FIELD_TABLE_NAME)
540 .map_err(from_sqlx_error)?;
541 let ns_strs = row
542 .try_get::<String, _>(CATALOG_FIELD_TABLE_NAMESPACE)
543 .map_err(from_sqlx_error)?;
544 let ns = NamespaceIdent::from_strs(ns_strs.split("."))?;
545 tables.insert(TableIdent::new(ns, tbl));
546 }
547
548 Ok(tables.into_iter().collect::<Vec<TableIdent>>())
549 } else {
550 no_such_namespace_err(namespace)
551 }
552 }
553
554 async fn table_exists(&self, identifier: &TableIdent) -> Result<bool> {
555 let namespace = identifier.namespace().join(".");
556 let table_name = identifier.name();
557 let table_counts = self
558 .fetch_rows(
559 &format!(
560 "SELECT 1
561 FROM {CATALOG_TABLE_NAME}
562 WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
563 AND {CATALOG_FIELD_CATALOG_NAME} = ?
564 AND {CATALOG_FIELD_TABLE_NAME} = ?
565 AND (
566 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
567 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
568 )"
569 ),
570 vec![Some(&namespace), Some(&self.name), Some(table_name)],
571 )
572 .await?;
573
574 if !table_counts.is_empty() {
575 Ok(true)
576 } else {
577 Ok(false)
578 }
579 }
580
581 async fn drop_table(&self, identifier: &TableIdent) -> Result<()> {
582 if !self.table_exists(identifier).await? {
583 return no_such_table_err(identifier);
584 }
585
586 self.execute(
587 &format!(
588 "DELETE FROM {CATALOG_TABLE_NAME}
589 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
590 AND {CATALOG_FIELD_TABLE_NAME} = ?
591 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
592 AND (
593 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
594 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
595 )"
596 ),
597 vec![
598 Some(&self.name),
599 Some(identifier.name()),
600 Some(&identifier.namespace().join(".")),
601 ],
602 None,
603 )
604 .await?;
605
606 Ok(())
607 }
608
609 async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
610 if !self.table_exists(identifier).await? {
611 return no_such_table_err(identifier);
612 }
613
614 let rows = self
615 .fetch_rows(
616 &format!(
617 "SELECT {CATALOG_FIELD_METADATA_LOCATION_PROP}
618 FROM {CATALOG_TABLE_NAME}
619 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
620 AND {CATALOG_FIELD_TABLE_NAME} = ?
621 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
622 AND (
623 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
624 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
625 )"
626 ),
627 vec![
628 Some(&self.name),
629 Some(identifier.name()),
630 Some(&identifier.namespace().join(".")),
631 ],
632 )
633 .await?;
634
635 if rows.is_empty() {
636 return no_such_table_err(identifier);
637 }
638
639 let row = &rows[0];
640 let tbl_metadata_location = row
641 .try_get::<String, _>(CATALOG_FIELD_METADATA_LOCATION_PROP)
642 .map_err(from_sqlx_error)?;
643
644 let metadata = TableMetadata::read_from(&self.fileio, &tbl_metadata_location).await?;
645
646 Ok(Table::builder()
647 .file_io(self.fileio.clone())
648 .identifier(identifier.clone())
649 .metadata_location(tbl_metadata_location)
650 .metadata(metadata)
651 .build()?)
652 }
653
654 async fn create_table(
655 &self,
656 namespace: &NamespaceIdent,
657 creation: TableCreation,
658 ) -> Result<Table> {
659 if !self.namespace_exists(namespace).await? {
660 return no_such_namespace_err(namespace);
661 }
662
663 let tbl_name = creation.name.clone();
664 let tbl_ident = TableIdent::new(namespace.clone(), tbl_name.clone());
665
666 if self.table_exists(&tbl_ident).await? {
667 return table_already_exists_err(&tbl_ident);
668 }
669
670 let (tbl_creation, location) = match creation.location.clone() {
671 Some(location) => (creation, location),
672 None => {
673 let nsp_properties = self.get_namespace(namespace).await?.properties().clone();
676 let nsp_location = match nsp_properties.get(NAMESPACE_LOCATION_PROPERTY_KEY) {
677 Some(location) => location.clone(),
678 None => {
679 format!(
680 "{}/{}",
681 self.warehouse_location.clone(),
682 namespace.join("/")
683 )
684 }
685 };
686
687 let tbl_location = format!("{}/{}", nsp_location, tbl_ident.name());
688
689 (
690 TableCreation {
691 location: Some(tbl_location.clone()),
692 ..creation
693 },
694 tbl_location,
695 )
696 }
697 };
698
699 let tbl_metadata = TableMetadataBuilder::from_table_creation(tbl_creation)?
700 .build()?
701 .metadata;
702 let tbl_metadata_location =
703 MetadataLocation::new_with_table_location(location.clone()).to_string();
704
705 tbl_metadata
706 .write_to(&self.fileio, &tbl_metadata_location)
707 .await?;
708
709 self.execute(&format!(
710 "INSERT INTO {CATALOG_TABLE_NAME}
711 ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE})
712 VALUES (?, ?, ?, ?, ?)
713 "), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name.clone()), Some(&tbl_metadata_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?;
714
715 Ok(Table::builder()
716 .file_io(self.fileio.clone())
717 .metadata_location(tbl_metadata_location)
718 .identifier(tbl_ident)
719 .metadata(tbl_metadata)
720 .build()?)
721 }
722
723 async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
724 if src == dest {
725 return Ok(());
726 }
727
728 if !self.table_exists(src).await? {
729 return no_such_table_err(src);
730 }
731
732 if !self.namespace_exists(dest.namespace()).await? {
733 return no_such_namespace_err(dest.namespace());
734 }
735
736 if self.table_exists(dest).await? {
737 return table_already_exists_err(dest);
738 }
739
740 self.execute(
741 &format!(
742 "UPDATE {CATALOG_TABLE_NAME}
743 SET {CATALOG_FIELD_TABLE_NAME} = ?, {CATALOG_FIELD_TABLE_NAMESPACE} = ?
744 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
745 AND {CATALOG_FIELD_TABLE_NAME} = ?
746 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
747 AND (
748 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
749 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
750 )"
751 ),
752 vec![
753 Some(dest.name()),
754 Some(&dest.namespace().join(".")),
755 Some(&self.name),
756 Some(src.name()),
757 Some(&src.namespace().join(".")),
758 ],
759 None,
760 )
761 .await?;
762
763 Ok(())
764 }
765
766 async fn register_table(
767 &self,
768 _table_ident: &TableIdent,
769 _metadata_location: String,
770 ) -> Result<Table> {
771 Err(Error::new(
772 ErrorKind::FeatureUnsupported,
773 "Registering a table is not supported yet",
774 ))
775 }
776
777 async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
778 Err(Error::new(
779 ErrorKind::FeatureUnsupported,
780 "Updating a table is not supported yet",
781 ))
782 }
783}
784
785#[cfg(test)]
786mod tests {
787 use std::collections::{HashMap, HashSet};
788 use std::hash::Hash;
789
790 use iceberg::io::FileIOBuilder;
791 use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
792 use iceberg::table::Table;
793 use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation, TableIdent};
794 use itertools::Itertools;
795 use regex::Regex;
796 use sqlx::migrate::MigrateDatabase;
797 use tempfile::TempDir;
798
799 use crate::catalog::NAMESPACE_LOCATION_PROPERTY_KEY;
800 use crate::{SqlBindStyle, SqlCatalog, SqlCatalogConfig};
801
802 const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
803
804 fn temp_path() -> String {
805 let temp_dir = TempDir::new().unwrap();
806 temp_dir.path().to_str().unwrap().to_string()
807 }
808
809 fn to_set<T: std::cmp::Eq + Hash>(vec: Vec<T>) -> HashSet<T> {
810 HashSet::from_iter(vec)
811 }
812
813 fn default_properties() -> HashMap<String, String> {
814 HashMap::from([("exists".to_string(), "true".to_string())])
815 }
816
817 async fn new_sql_catalog(warehouse_location: String) -> impl Catalog {
818 let sql_lite_uri = format!("sqlite:{}", temp_path());
819 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
820
821 let config = SqlCatalogConfig::builder()
822 .uri(sql_lite_uri.to_string())
823 .name("iceberg".to_string())
824 .warehouse_location(warehouse_location)
825 .file_io(FileIOBuilder::new_fs_io().build().unwrap())
826 .sql_bind_style(SqlBindStyle::QMark)
827 .build();
828
829 SqlCatalog::new(config).await.unwrap()
830 }
831
832 async fn create_namespace<C: Catalog>(catalog: &C, namespace_ident: &NamespaceIdent) {
833 let _ = catalog
834 .create_namespace(namespace_ident, HashMap::new())
835 .await
836 .unwrap();
837 }
838
839 async fn create_namespaces<C: Catalog>(catalog: &C, namespace_idents: &Vec<&NamespaceIdent>) {
840 for namespace_ident in namespace_idents {
841 let _ = create_namespace(catalog, namespace_ident).await;
842 }
843 }
844
845 fn simple_table_schema() -> Schema {
846 Schema::builder()
847 .with_fields(vec![
848 NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
849 ])
850 .build()
851 .unwrap()
852 }
853
854 async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) {
855 let _ = catalog
856 .create_table(
857 &table_ident.namespace,
858 TableCreation::builder()
859 .name(table_ident.name().into())
860 .schema(simple_table_schema())
861 .location(temp_path())
862 .build(),
863 )
864 .await
865 .unwrap();
866 }
867
868 async fn create_tables<C: Catalog>(catalog: &C, table_idents: Vec<&TableIdent>) {
869 for table_ident in table_idents {
870 create_table(catalog, table_ident).await;
871 }
872 }
873
874 fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) {
875 assert_eq!(table.identifier(), expected_table_ident);
876
877 let metadata = table.metadata();
878
879 assert_eq!(metadata.current_schema().as_ref(), expected_schema);
880
881 let expected_partition_spec = PartitionSpec::builder(expected_schema.clone())
882 .with_spec_id(0)
883 .build()
884 .unwrap();
885
886 assert_eq!(
887 metadata
888 .partition_specs_iter()
889 .map(|p| p.as_ref())
890 .collect_vec(),
891 vec![&expected_partition_spec]
892 );
893
894 let expected_sorted_order = SortOrder::builder()
895 .with_order_id(0)
896 .with_fields(vec![])
897 .build(expected_schema)
898 .unwrap();
899
900 assert_eq!(
901 metadata
902 .sort_orders_iter()
903 .map(|s| s.as_ref())
904 .collect_vec(),
905 vec![&expected_sorted_order]
906 );
907
908 assert_eq!(metadata.properties(), &HashMap::new());
909
910 assert!(!table.readonly());
911 }
912
913 fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) {
914 let actual = table.metadata_location().unwrap().to_string();
915 let regex = Regex::new(regex_str).unwrap();
916 assert!(regex.is_match(&actual))
917 }
918
919 #[tokio::test]
920 async fn test_initialized() {
921 let warehouse_loc = temp_path();
922 new_sql_catalog(warehouse_loc.clone()).await;
923 new_sql_catalog(warehouse_loc.clone()).await;
925 new_sql_catalog(warehouse_loc.clone()).await;
926 }
927
928 #[tokio::test]
929 async fn test_list_namespaces_returns_empty_vector() {
930 let warehouse_loc = temp_path();
931 let catalog = new_sql_catalog(warehouse_loc).await;
932
933 assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
934 }
935
936 #[tokio::test]
937 async fn test_list_namespaces_returns_multiple_namespaces() {
938 let warehouse_loc = temp_path();
939 let catalog = new_sql_catalog(warehouse_loc).await;
940 let namespace_ident_1 = NamespaceIdent::new("a".into());
941 let namespace_ident_2 = NamespaceIdent::new("b".into());
942 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
943
944 assert_eq!(
945 to_set(catalog.list_namespaces(None).await.unwrap()),
946 to_set(vec![namespace_ident_1, namespace_ident_2])
947 );
948 }
949
950 #[tokio::test]
951 async fn test_list_namespaces_returns_only_top_level_namespaces() {
952 let warehouse_loc = temp_path();
953 let catalog = new_sql_catalog(warehouse_loc).await;
954 let namespace_ident_1 = NamespaceIdent::new("a".into());
955 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
956 let namespace_ident_3 = NamespaceIdent::new("b".into());
957 create_namespaces(&catalog, &vec![
958 &namespace_ident_1,
959 &namespace_ident_2,
960 &namespace_ident_3,
961 ])
962 .await;
963
964 assert_eq!(
965 to_set(catalog.list_namespaces(None).await.unwrap()),
966 to_set(vec![namespace_ident_1, namespace_ident_3])
967 );
968 }
969
970 #[tokio::test]
971 async fn test_list_namespaces_returns_no_namespaces_under_parent() {
972 let warehouse_loc = temp_path();
973 let catalog = new_sql_catalog(warehouse_loc).await;
974 let namespace_ident_1 = NamespaceIdent::new("a".into());
975 let namespace_ident_2 = NamespaceIdent::new("b".into());
976 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
977
978 assert_eq!(
979 catalog
980 .list_namespaces(Some(&namespace_ident_1))
981 .await
982 .unwrap(),
983 vec![]
984 );
985 }
986
987 #[tokio::test]
988 async fn test_list_namespaces_returns_namespace_under_parent() {
989 let warehouse_loc = temp_path();
990 let catalog = new_sql_catalog(warehouse_loc).await;
991 let namespace_ident_1 = NamespaceIdent::new("a".into());
992 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
993 let namespace_ident_3 = NamespaceIdent::new("c".into());
994 create_namespaces(&catalog, &vec![
995 &namespace_ident_1,
996 &namespace_ident_2,
997 &namespace_ident_3,
998 ])
999 .await;
1000
1001 assert_eq!(
1002 to_set(catalog.list_namespaces(None).await.unwrap()),
1003 to_set(vec![namespace_ident_1.clone(), namespace_ident_3])
1004 );
1005
1006 assert_eq!(
1007 catalog
1008 .list_namespaces(Some(&namespace_ident_1))
1009 .await
1010 .unwrap(),
1011 vec![NamespaceIdent::from_strs(vec!["a", "b"]).unwrap()]
1012 );
1013 }
1014
1015 #[tokio::test]
1016 async fn test_list_namespaces_returns_multiple_namespaces_under_parent() {
1017 let warehouse_loc = temp_path();
1018 let catalog = new_sql_catalog(warehouse_loc).await;
1019 let namespace_ident_1 = NamespaceIdent::new("a".to_string());
1020 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap();
1021 let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1022 let namespace_ident_4 = NamespaceIdent::from_strs(vec!["a", "c"]).unwrap();
1023 let namespace_ident_5 = NamespaceIdent::new("b".into());
1024 create_namespaces(&catalog, &vec![
1025 &namespace_ident_1,
1026 &namespace_ident_2,
1027 &namespace_ident_3,
1028 &namespace_ident_4,
1029 &namespace_ident_5,
1030 ])
1031 .await;
1032
1033 assert_eq!(
1034 to_set(
1035 catalog
1036 .list_namespaces(Some(&namespace_ident_1))
1037 .await
1038 .unwrap()
1039 ),
1040 to_set(vec![
1041 NamespaceIdent::from_strs(vec!["a", "a"]).unwrap(),
1042 NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(),
1043 NamespaceIdent::from_strs(vec!["a", "c"]).unwrap(),
1044 ])
1045 );
1046 }
1047
1048 #[tokio::test]
1049 async fn test_namespace_exists_returns_false() {
1050 let warehouse_loc = temp_path();
1051 let catalog = new_sql_catalog(warehouse_loc).await;
1052 let namespace_ident = NamespaceIdent::new("a".into());
1053 create_namespace(&catalog, &namespace_ident).await;
1054
1055 assert!(
1056 !catalog
1057 .namespace_exists(&NamespaceIdent::new("b".into()))
1058 .await
1059 .unwrap()
1060 );
1061 }
1062
1063 #[tokio::test]
1064 async fn test_namespace_exists_returns_true() {
1065 let warehouse_loc = temp_path();
1066 let catalog = new_sql_catalog(warehouse_loc).await;
1067 let namespace_ident = NamespaceIdent::new("a".into());
1068 create_namespace(&catalog, &namespace_ident).await;
1069
1070 assert!(catalog.namespace_exists(&namespace_ident).await.unwrap());
1071 }
1072
1073 #[tokio::test]
1074 async fn test_create_namespace_with_properties() {
1075 let warehouse_loc = temp_path();
1076 let catalog = new_sql_catalog(warehouse_loc).await;
1077 let namespace_ident = NamespaceIdent::new("abc".into());
1078
1079 let mut properties = default_properties();
1080 properties.insert("k".into(), "v".into());
1081
1082 assert_eq!(
1083 catalog
1084 .create_namespace(&namespace_ident, properties.clone())
1085 .await
1086 .unwrap(),
1087 Namespace::with_properties(namespace_ident.clone(), properties.clone())
1088 );
1089
1090 assert_eq!(
1091 catalog.get_namespace(&namespace_ident).await.unwrap(),
1092 Namespace::with_properties(namespace_ident, properties)
1093 );
1094 }
1095
1096 #[tokio::test]
1097 async fn test_create_namespace_throws_error_if_namespace_already_exists() {
1098 let warehouse_loc = temp_path();
1099 let catalog = new_sql_catalog(warehouse_loc).await;
1100 let namespace_ident = NamespaceIdent::new("a".into());
1101 create_namespace(&catalog, &namespace_ident).await;
1102
1103 assert_eq!(
1104 catalog
1105 .create_namespace(&namespace_ident, HashMap::new())
1106 .await
1107 .unwrap_err()
1108 .to_string(),
1109 format!(
1110 "Unexpected => Namespace {:?} already exists",
1111 &namespace_ident
1112 )
1113 );
1114
1115 assert_eq!(
1116 catalog.get_namespace(&namespace_ident).await.unwrap(),
1117 Namespace::with_properties(namespace_ident, default_properties())
1118 );
1119 }
1120
1121 #[tokio::test]
1122 async fn test_create_nested_namespace() {
1123 let warehouse_loc = temp_path();
1124 let catalog = new_sql_catalog(warehouse_loc).await;
1125 let parent_namespace_ident = NamespaceIdent::new("a".into());
1126 create_namespace(&catalog, &parent_namespace_ident).await;
1127
1128 let child_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1129
1130 assert_eq!(
1131 catalog
1132 .create_namespace(&child_namespace_ident, HashMap::new())
1133 .await
1134 .unwrap(),
1135 Namespace::new(child_namespace_ident.clone())
1136 );
1137
1138 assert_eq!(
1139 catalog.get_namespace(&child_namespace_ident).await.unwrap(),
1140 Namespace::with_properties(child_namespace_ident, default_properties())
1141 );
1142 }
1143
1144 #[tokio::test]
1145 async fn test_create_deeply_nested_namespace() {
1146 let warehouse_loc = temp_path();
1147 let catalog = new_sql_catalog(warehouse_loc).await;
1148 let namespace_ident_a = NamespaceIdent::new("a".into());
1149 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1150 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1151
1152 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1153
1154 assert_eq!(
1155 catalog
1156 .create_namespace(&namespace_ident_a_b_c, HashMap::new())
1157 .await
1158 .unwrap(),
1159 Namespace::new(namespace_ident_a_b_c.clone())
1160 );
1161
1162 assert_eq!(
1163 catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
1164 Namespace::with_properties(namespace_ident_a_b_c, default_properties())
1165 );
1166 }
1167
1168 #[tokio::test]
1169 async fn test_update_namespace_noop() {
1170 let warehouse_loc = temp_path();
1171 let catalog = new_sql_catalog(warehouse_loc).await;
1172 let namespace_ident = NamespaceIdent::new("a".into());
1173 create_namespace(&catalog, &namespace_ident).await;
1174
1175 catalog
1176 .update_namespace(&namespace_ident, HashMap::new())
1177 .await
1178 .unwrap();
1179
1180 assert_eq!(
1181 *catalog
1182 .get_namespace(&namespace_ident)
1183 .await
1184 .unwrap()
1185 .properties(),
1186 HashMap::from_iter([("exists".to_string(), "true".to_string())])
1187 )
1188 }
1189
1190 #[tokio::test]
1191 async fn test_update_namespace() {
1192 let warehouse_loc = temp_path();
1193 let catalog = new_sql_catalog(warehouse_loc).await;
1194 let namespace_ident = NamespaceIdent::new("a".into());
1195 create_namespace(&catalog, &namespace_ident).await;
1196
1197 let mut props = HashMap::from_iter([
1198 ("prop1".to_string(), "val1".to_string()),
1199 ("prop2".into(), "val2".into()),
1200 ]);
1201
1202 catalog
1203 .update_namespace(&namespace_ident, props.clone())
1204 .await
1205 .unwrap();
1206
1207 props.insert("exists".into(), "true".into());
1208
1209 assert_eq!(
1210 *catalog
1211 .get_namespace(&namespace_ident)
1212 .await
1213 .unwrap()
1214 .properties(),
1215 props
1216 )
1217 }
1218
1219 #[tokio::test]
1220 async fn test_update_nested_namespace() {
1221 let warehouse_loc = temp_path();
1222 let catalog = new_sql_catalog(warehouse_loc).await;
1223 let namespace_ident = NamespaceIdent::from_strs(["a", "b"]).unwrap();
1224 create_namespace(&catalog, &namespace_ident).await;
1225
1226 let mut props = HashMap::from_iter([
1227 ("prop1".to_string(), "val1".to_string()),
1228 ("prop2".into(), "val2".into()),
1229 ]);
1230
1231 catalog
1232 .update_namespace(&namespace_ident, props.clone())
1233 .await
1234 .unwrap();
1235
1236 props.insert("exists".into(), "true".into());
1237
1238 assert_eq!(
1239 *catalog
1240 .get_namespace(&namespace_ident)
1241 .await
1242 .unwrap()
1243 .properties(),
1244 props
1245 )
1246 }
1247
1248 #[tokio::test]
1249 async fn test_update_namespace_errors_if_namespace_doesnt_exist() {
1250 let warehouse_loc = temp_path();
1251 let catalog = new_sql_catalog(warehouse_loc).await;
1252 let namespace_ident = NamespaceIdent::new("a".into());
1253
1254 let props = HashMap::from_iter([
1255 ("prop1".to_string(), "val1".to_string()),
1256 ("prop2".into(), "val2".into()),
1257 ]);
1258
1259 let err = catalog
1260 .update_namespace(&namespace_ident, props)
1261 .await
1262 .unwrap_err();
1263
1264 assert_eq!(
1265 err.message(),
1266 format!("No such namespace: {:?}", namespace_ident)
1267 );
1268 }
1269
1270 #[tokio::test]
1271 async fn test_update_namespace_errors_if_nested_namespace_doesnt_exist() {
1272 let warehouse_loc = temp_path();
1273 let catalog = new_sql_catalog(warehouse_loc).await;
1274 let namespace_ident = NamespaceIdent::from_strs(["a", "b"]).unwrap();
1275
1276 let props = HashMap::from_iter([
1277 ("prop1".to_string(), "val1".to_string()),
1278 ("prop2".into(), "val2".into()),
1279 ]);
1280
1281 let err = catalog
1282 .update_namespace(&namespace_ident, props)
1283 .await
1284 .unwrap_err();
1285
1286 assert_eq!(
1287 err.message(),
1288 format!("No such namespace: {:?}", namespace_ident)
1289 );
1290 }
1291
1292 #[tokio::test]
1293 async fn test_drop_namespace() {
1294 let warehouse_loc = temp_path();
1295 let catalog = new_sql_catalog(warehouse_loc).await;
1296 let namespace_ident = NamespaceIdent::new("abc".into());
1297 create_namespace(&catalog, &namespace_ident).await;
1298
1299 catalog.drop_namespace(&namespace_ident).await.unwrap();
1300
1301 assert!(!catalog.namespace_exists(&namespace_ident).await.unwrap())
1302 }
1303
1304 #[tokio::test]
1305 async fn test_drop_nested_namespace() {
1306 let warehouse_loc = temp_path();
1307 let catalog = new_sql_catalog(warehouse_loc).await;
1308 let namespace_ident_a = NamespaceIdent::new("a".into());
1309 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1310 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1311
1312 catalog.drop_namespace(&namespace_ident_a_b).await.unwrap();
1313
1314 assert!(
1315 !catalog
1316 .namespace_exists(&namespace_ident_a_b)
1317 .await
1318 .unwrap()
1319 );
1320
1321 assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1322 }
1323
1324 #[tokio::test]
1325 async fn test_drop_deeply_nested_namespace() {
1326 let warehouse_loc = temp_path();
1327 let catalog = new_sql_catalog(warehouse_loc).await;
1328 let namespace_ident_a = NamespaceIdent::new("a".into());
1329 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1330 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1331 create_namespaces(&catalog, &vec![
1332 &namespace_ident_a,
1333 &namespace_ident_a_b,
1334 &namespace_ident_a_b_c,
1335 ])
1336 .await;
1337
1338 catalog
1339 .drop_namespace(&namespace_ident_a_b_c)
1340 .await
1341 .unwrap();
1342
1343 assert!(
1344 !catalog
1345 .namespace_exists(&namespace_ident_a_b_c)
1346 .await
1347 .unwrap()
1348 );
1349
1350 assert!(
1351 catalog
1352 .namespace_exists(&namespace_ident_a_b)
1353 .await
1354 .unwrap()
1355 );
1356
1357 assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1358 }
1359
1360 #[tokio::test]
1361 async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() {
1362 let warehouse_loc = temp_path();
1363 let catalog = new_sql_catalog(warehouse_loc).await;
1364
1365 let non_existent_namespace_ident = NamespaceIdent::new("abc".into());
1366 assert_eq!(
1367 catalog
1368 .drop_namespace(&non_existent_namespace_ident)
1369 .await
1370 .unwrap_err()
1371 .to_string(),
1372 format!(
1373 "Unexpected => No such namespace: {:?}",
1374 non_existent_namespace_ident
1375 )
1376 )
1377 }
1378
1379 #[tokio::test]
1380 async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() {
1381 let warehouse_loc = temp_path();
1382 let catalog = new_sql_catalog(warehouse_loc).await;
1383 create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
1384
1385 let non_existent_namespace_ident =
1386 NamespaceIdent::from_vec(vec!["a".into(), "b".into()]).unwrap();
1387 assert_eq!(
1388 catalog
1389 .drop_namespace(&non_existent_namespace_ident)
1390 .await
1391 .unwrap_err()
1392 .to_string(),
1393 format!(
1394 "Unexpected => No such namespace: {:?}",
1395 non_existent_namespace_ident
1396 )
1397 )
1398 }
1399
1400 #[tokio::test]
1401 async fn test_dropping_a_namespace_does_not_drop_namespaces_nested_under_that_one() {
1402 let warehouse_loc = temp_path();
1403 let catalog = new_sql_catalog(warehouse_loc).await;
1404 let namespace_ident_a = NamespaceIdent::new("a".into());
1405 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1406 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1407
1408 catalog.drop_namespace(&namespace_ident_a).await.unwrap();
1409
1410 assert!(!catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1411
1412 assert!(
1413 catalog
1414 .namespace_exists(&namespace_ident_a_b)
1415 .await
1416 .unwrap()
1417 );
1418 }
1419
1420 #[tokio::test]
1421 async fn test_list_tables_returns_empty_vector() {
1422 let warehouse_loc = temp_path();
1423 let catalog = new_sql_catalog(warehouse_loc).await;
1424 let namespace_ident = NamespaceIdent::new("a".into());
1425 create_namespace(&catalog, &namespace_ident).await;
1426
1427 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![]);
1428 }
1429
1430 #[tokio::test]
1431 async fn test_list_tables_throws_error_if_namespace_doesnt_exist() {
1432 let warehouse_loc = temp_path();
1433 let catalog = new_sql_catalog(warehouse_loc).await;
1434
1435 let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1436
1437 assert_eq!(
1438 catalog
1439 .list_tables(&non_existent_namespace_ident)
1440 .await
1441 .unwrap_err()
1442 .to_string(),
1443 format!(
1444 "Unexpected => No such namespace: {:?}",
1445 non_existent_namespace_ident
1446 ),
1447 );
1448 }
1449
1450 #[tokio::test]
1451 async fn test_create_table_with_location() {
1452 let warehouse_loc = temp_path();
1453 let catalog = new_sql_catalog(warehouse_loc.clone()).await;
1454 let namespace_ident = NamespaceIdent::new("a".into());
1455 create_namespace(&catalog, &namespace_ident).await;
1456
1457 let table_name = "abc";
1458 let location = warehouse_loc.clone();
1459 let table_creation = TableCreation::builder()
1460 .name(table_name.into())
1461 .location(location.clone())
1462 .schema(simple_table_schema())
1463 .build();
1464
1465 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1466
1467 assert_table_eq(
1468 &catalog
1469 .create_table(&namespace_ident, table_creation)
1470 .await
1471 .unwrap(),
1472 &expected_table_ident,
1473 &simple_table_schema(),
1474 );
1475
1476 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1477
1478 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1479
1480 assert!(
1481 table
1482 .metadata_location()
1483 .unwrap()
1484 .to_string()
1485 .starts_with(&location)
1486 )
1487 }
1488
1489 #[tokio::test]
1490 async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() {
1491 let warehouse_loc = temp_path();
1492 let catalog = new_sql_catalog(warehouse_loc).await;
1493
1494 let namespace_ident = NamespaceIdent::new("a".into());
1495 let mut namespace_properties = HashMap::new();
1496 let namespace_location = temp_path();
1497 namespace_properties.insert(
1498 NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1499 namespace_location.to_string(),
1500 );
1501 catalog
1502 .create_namespace(&namespace_ident, namespace_properties)
1503 .await
1504 .unwrap();
1505
1506 let table_name = "tbl1";
1507 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1508 let expected_table_metadata_location_regex = format!(
1509 "^{}/tbl1/metadata/00000-{}.metadata.json$",
1510 namespace_location, UUID_REGEX_STR,
1511 );
1512
1513 let table = catalog
1514 .create_table(
1515 &namespace_ident,
1516 TableCreation::builder()
1517 .name(table_name.into())
1518 .schema(simple_table_schema())
1519 .build(),
1521 )
1522 .await
1523 .unwrap();
1524 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1525 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1526
1527 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1528 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1529 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1530 }
1531
1532 #[tokio::test]
1533 async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing()
1534 {
1535 let warehouse_loc = temp_path();
1536 let catalog = new_sql_catalog(warehouse_loc).await;
1537
1538 let namespace_ident = NamespaceIdent::new("a".into());
1539 let mut namespace_properties = HashMap::new();
1540 let namespace_location = temp_path();
1541 namespace_properties.insert(
1542 NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1543 namespace_location.to_string(),
1544 );
1545 catalog
1546 .create_namespace(&namespace_ident, namespace_properties)
1547 .await
1548 .unwrap();
1549
1550 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1551 let mut nested_namespace_properties = HashMap::new();
1552 let nested_namespace_location = temp_path();
1553 nested_namespace_properties.insert(
1554 NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1555 nested_namespace_location.to_string(),
1556 );
1557 catalog
1558 .create_namespace(&nested_namespace_ident, nested_namespace_properties)
1559 .await
1560 .unwrap();
1561
1562 let table_name = "tbl1";
1563 let expected_table_ident =
1564 TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1565 let expected_table_metadata_location_regex = format!(
1566 "^{}/tbl1/metadata/00000-{}.metadata.json$",
1567 nested_namespace_location, UUID_REGEX_STR,
1568 );
1569
1570 let table = catalog
1571 .create_table(
1572 &nested_namespace_ident,
1573 TableCreation::builder()
1574 .name(table_name.into())
1575 .schema(simple_table_schema())
1576 .build(),
1578 )
1579 .await
1580 .unwrap();
1581 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1582 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1583
1584 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1585 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1586 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1587 }
1588
1589 #[tokio::test]
1590 async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1591 {
1592 let warehouse_loc = temp_path();
1593 let catalog = new_sql_catalog(warehouse_loc.clone()).await;
1594
1595 let namespace_ident = NamespaceIdent::new("a".into());
1596 let namespace_properties = HashMap::new();
1598 catalog
1599 .create_namespace(&namespace_ident, namespace_properties)
1600 .await
1601 .unwrap();
1602
1603 let table_name = "tbl1";
1604 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1605 let expected_table_metadata_location_regex = format!(
1606 "^{}/a/tbl1/metadata/00000-{}.metadata.json$",
1607 warehouse_loc, UUID_REGEX_STR
1608 );
1609
1610 let table = catalog
1611 .create_table(
1612 &namespace_ident,
1613 TableCreation::builder()
1614 .name(table_name.into())
1615 .schema(simple_table_schema())
1616 .build(),
1618 )
1619 .await
1620 .unwrap();
1621 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1622 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1623
1624 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1625 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1626 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1627 }
1628
1629 #[tokio::test]
1630 async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1631 {
1632 let warehouse_loc = temp_path();
1633 let catalog = new_sql_catalog(warehouse_loc.clone()).await;
1634
1635 let namespace_ident = NamespaceIdent::new("a".into());
1636 create_namespace(&catalog, &namespace_ident).await;
1637
1638 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1639 create_namespace(&catalog, &nested_namespace_ident).await;
1640
1641 let table_name = "tbl1";
1642 let expected_table_ident =
1643 TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1644 let expected_table_metadata_location_regex = format!(
1645 "^{}/a/b/tbl1/metadata/00000-{}.metadata.json$",
1646 warehouse_loc, UUID_REGEX_STR
1647 );
1648
1649 let table = catalog
1650 .create_table(
1651 &nested_namespace_ident,
1652 TableCreation::builder()
1653 .name(table_name.into())
1654 .schema(simple_table_schema())
1655 .build(),
1657 )
1658 .await
1659 .unwrap();
1660 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1661 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1662
1663 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1664 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1665 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1666 }
1667
1668 #[tokio::test]
1669 async fn test_create_table_throws_error_if_table_with_same_name_already_exists() {
1670 let warehouse_loc = temp_path();
1671 let catalog = new_sql_catalog(warehouse_loc.clone()).await;
1672 let namespace_ident = NamespaceIdent::new("a".into());
1673 create_namespace(&catalog, &namespace_ident).await;
1674 let table_name = "tbl1";
1675 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1676 create_table(&catalog, &table_ident).await;
1677
1678 let tmp_dir = TempDir::new().unwrap();
1679 let location = tmp_dir.path().to_str().unwrap().to_string();
1680
1681 assert_eq!(
1682 catalog
1683 .create_table(
1684 &namespace_ident,
1685 TableCreation::builder()
1686 .name(table_name.into())
1687 .schema(simple_table_schema())
1688 .location(location)
1689 .build()
1690 )
1691 .await
1692 .unwrap_err()
1693 .to_string(),
1694 format!("Unexpected => Table {:?} already exists.", &table_ident)
1695 );
1696 }
1697
1698 #[tokio::test]
1699 async fn test_rename_table_in_same_namespace() {
1700 let warehouse_loc = temp_path();
1701 let catalog = new_sql_catalog(warehouse_loc).await;
1702 let namespace_ident = NamespaceIdent::new("n1".into());
1703 create_namespace(&catalog, &namespace_ident).await;
1704 let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1705 let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1706 create_table(&catalog, &src_table_ident).await;
1707
1708 catalog
1709 .rename_table(&src_table_ident, &dst_table_ident)
1710 .await
1711 .unwrap();
1712
1713 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1714 dst_table_ident
1715 ],);
1716 }
1717
1718 #[tokio::test]
1719 async fn test_rename_table_across_namespaces() {
1720 let warehouse_loc = temp_path();
1721 let catalog = new_sql_catalog(warehouse_loc).await;
1722 let src_namespace_ident = NamespaceIdent::new("a".into());
1723 let dst_namespace_ident = NamespaceIdent::new("b".into());
1724 create_namespaces(&catalog, &vec![&src_namespace_ident, &dst_namespace_ident]).await;
1725 let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
1726 let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl2".into());
1727 create_table(&catalog, &src_table_ident).await;
1728
1729 catalog
1730 .rename_table(&src_table_ident, &dst_table_ident)
1731 .await
1732 .unwrap();
1733
1734 assert_eq!(
1735 catalog.list_tables(&src_namespace_ident).await.unwrap(),
1736 vec![],
1737 );
1738
1739 assert_eq!(
1740 catalog.list_tables(&dst_namespace_ident).await.unwrap(),
1741 vec![dst_table_ident],
1742 );
1743 }
1744
1745 #[tokio::test]
1746 async fn test_rename_table_src_table_is_same_as_dst_table() {
1747 let warehouse_loc = temp_path();
1748 let catalog = new_sql_catalog(warehouse_loc).await;
1749 let namespace_ident = NamespaceIdent::new("n1".into());
1750 create_namespace(&catalog, &namespace_ident).await;
1751 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into());
1752 create_table(&catalog, &table_ident).await;
1753
1754 catalog
1755 .rename_table(&table_ident, &table_ident)
1756 .await
1757 .unwrap();
1758
1759 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1760 table_ident
1761 ],);
1762 }
1763
1764 #[tokio::test]
1765 async fn test_rename_table_across_nested_namespaces() {
1766 let warehouse_loc = temp_path();
1767 let catalog = new_sql_catalog(warehouse_loc).await;
1768 let namespace_ident_a = NamespaceIdent::new("a".into());
1769 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1770 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1771 create_namespaces(&catalog, &vec![
1772 &namespace_ident_a,
1773 &namespace_ident_a_b,
1774 &namespace_ident_a_b_c,
1775 ])
1776 .await;
1777
1778 let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(), "tbl1".into());
1779 create_tables(&catalog, vec![&src_table_ident]).await;
1780
1781 let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1782 catalog
1783 .rename_table(&src_table_ident, &dst_table_ident)
1784 .await
1785 .unwrap();
1786
1787 assert!(!catalog.table_exists(&src_table_ident).await.unwrap());
1788
1789 assert!(catalog.table_exists(&dst_table_ident).await.unwrap());
1790 }
1791
1792 #[tokio::test]
1793 async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() {
1794 let warehouse_loc = temp_path();
1795 let catalog = new_sql_catalog(warehouse_loc).await;
1796 let src_namespace_ident = NamespaceIdent::new("n1".into());
1797 let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
1798 create_namespace(&catalog, &src_namespace_ident).await;
1799 create_table(&catalog, &src_table_ident).await;
1800
1801 let non_existent_dst_namespace_ident = NamespaceIdent::new("n2".into());
1802 let dst_table_ident =
1803 TableIdent::new(non_existent_dst_namespace_ident.clone(), "tbl1".into());
1804 assert_eq!(
1805 catalog
1806 .rename_table(&src_table_ident, &dst_table_ident)
1807 .await
1808 .unwrap_err()
1809 .to_string(),
1810 format!(
1811 "Unexpected => No such namespace: {:?}",
1812 non_existent_dst_namespace_ident
1813 ),
1814 );
1815 }
1816
1817 #[tokio::test]
1818 async fn test_rename_table_throws_error_if_src_table_doesnt_exist() {
1819 let warehouse_loc = temp_path();
1820 let catalog = new_sql_catalog(warehouse_loc).await;
1821 let namespace_ident = NamespaceIdent::new("n1".into());
1822 create_namespace(&catalog, &namespace_ident).await;
1823 let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1824 let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1825
1826 assert_eq!(
1827 catalog
1828 .rename_table(&src_table_ident, &dst_table_ident)
1829 .await
1830 .unwrap_err()
1831 .to_string(),
1832 format!("Unexpected => No such table: {:?}", src_table_ident),
1833 );
1834 }
1835
1836 #[tokio::test]
1837 async fn test_rename_table_throws_error_if_dst_table_already_exists() {
1838 let warehouse_loc = temp_path();
1839 let catalog = new_sql_catalog(warehouse_loc).await;
1840 let namespace_ident = NamespaceIdent::new("n1".into());
1841 create_namespace(&catalog, &namespace_ident).await;
1842 let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1843 let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1844 create_tables(&catalog, vec![&src_table_ident, &dst_table_ident]).await;
1845
1846 assert_eq!(
1847 catalog
1848 .rename_table(&src_table_ident, &dst_table_ident)
1849 .await
1850 .unwrap_err()
1851 .to_string(),
1852 format!("Unexpected => Table {:?} already exists.", &dst_table_ident),
1853 );
1854 }
1855
1856 #[tokio::test]
1857 async fn test_drop_table_throws_error_if_table_not_exist() {
1858 let warehouse_loc = temp_path();
1859 let catalog = new_sql_catalog(warehouse_loc.clone()).await;
1860 let namespace_ident = NamespaceIdent::new("a".into());
1861 let table_name = "tbl1";
1862 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1863 create_namespace(&catalog, &namespace_ident).await;
1864
1865 let err = catalog
1866 .drop_table(&table_ident)
1867 .await
1868 .unwrap_err()
1869 .to_string();
1870 assert_eq!(
1871 err,
1872 "Unexpected => No such table: TableIdent { namespace: NamespaceIdent([\"a\"]), name: \"tbl1\" }"
1873 );
1874 }
1875
1876 #[tokio::test]
1877 async fn test_drop_table() {
1878 let warehouse_loc = temp_path();
1879 let catalog = new_sql_catalog(warehouse_loc.clone()).await;
1880 let namespace_ident = NamespaceIdent::new("a".into());
1881 let table_name = "tbl1";
1882 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1883 create_namespace(&catalog, &namespace_ident).await;
1884
1885 let location = warehouse_loc.clone();
1886 let table_creation = TableCreation::builder()
1887 .name(table_name.into())
1888 .location(location.clone())
1889 .schema(simple_table_schema())
1890 .build();
1891
1892 catalog
1893 .create_table(&namespace_ident, table_creation)
1894 .await
1895 .unwrap();
1896
1897 let table = catalog.load_table(&table_ident).await.unwrap();
1898 assert_table_eq(&table, &table_ident, &simple_table_schema());
1899
1900 catalog.drop_table(&table_ident).await.unwrap();
1901 let err = catalog
1902 .load_table(&table_ident)
1903 .await
1904 .unwrap_err()
1905 .to_string();
1906 assert_eq!(
1907 err,
1908 "Unexpected => No such table: TableIdent { namespace: NamespaceIdent([\"a\"]), name: \"tbl1\" }"
1909 );
1910 }
1911}