1use std::collections::{HashMap, HashSet};
19use std::str::FromStr;
20use std::sync::Arc;
21use std::time::Duration;
22
23use async_trait::async_trait;
24use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
25use iceberg::spec::{TableMetadata, TableMetadataBuilder};
26use iceberg::table::Table;
27use iceberg::{
28 Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
29 TableCommit, TableCreation, TableIdent,
30};
31use sqlx::any::{AnyPoolOptions, AnyQueryResult, AnyRow, install_default_drivers};
32use sqlx::{Any, AnyPool, Row, Transaction};
33
34use crate::error::{
35 from_sqlx_error, no_such_namespace_err, no_such_table_err, table_already_exists_err,
36};
37
38pub const SQL_CATALOG_PROP_URI: &str = "uri";
40pub const SQL_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
42pub const SQL_CATALOG_PROP_BIND_STYLE: &str = "sql_bind_style";
44
45static CATALOG_TABLE_NAME: &str = "iceberg_tables";
46static CATALOG_FIELD_CATALOG_NAME: &str = "catalog_name";
47static CATALOG_FIELD_TABLE_NAME: &str = "table_name";
48static CATALOG_FIELD_TABLE_NAMESPACE: &str = "table_namespace";
49static CATALOG_FIELD_METADATA_LOCATION_PROP: &str = "metadata_location";
50static CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location";
51static CATALOG_FIELD_RECORD_TYPE: &str = "iceberg_type";
52static CATALOG_FIELD_TABLE_RECORD_TYPE: &str = "TABLE";
53
54static NAMESPACE_TABLE_NAME: &str = "iceberg_namespace_properties";
55static NAMESPACE_FIELD_NAME: &str = "namespace";
56static NAMESPACE_FIELD_PROPERTY_KEY: &str = "property_key";
57static NAMESPACE_FIELD_PROPERTY_VALUE: &str = "property_value";
58
59static NAMESPACE_LOCATION_PROPERTY_KEY: &str = "location";
60
61static MAX_CONNECTIONS: u32 = 10; static IDLE_TIMEOUT: u64 = 10; static TEST_BEFORE_ACQUIRE: bool = true; #[derive(Debug)]
67pub struct SqlCatalogBuilder {
68 config: SqlCatalogConfig,
69 storage_factory: Option<Arc<dyn StorageFactory>>,
70}
71
72impl Default for SqlCatalogBuilder {
73 fn default() -> Self {
74 Self {
75 config: SqlCatalogConfig {
76 uri: "".to_string(),
77 name: "".to_string(),
78 warehouse_location: "".to_string(),
79 sql_bind_style: SqlBindStyle::DollarNumeric,
80 props: HashMap::new(),
81 },
82 storage_factory: None,
83 }
84 }
85}
86
87impl SqlCatalogBuilder {
88 pub fn uri(mut self, uri: impl Into<String>) -> Self {
93 self.config.uri = uri.into();
94 self
95 }
96
97 pub fn warehouse_location(mut self, location: impl Into<String>) -> Self {
102 self.config.warehouse_location = location.into();
103 self
104 }
105
106 pub fn sql_bind_style(mut self, sql_bind_style: SqlBindStyle) -> Self {
111 self.config.sql_bind_style = sql_bind_style;
112 self
113 }
114
115 pub fn props(mut self, props: HashMap<String, String>) -> Self {
120 for (k, v) in props {
121 self.config.props.insert(k, v);
122 }
123 self
124 }
125
126 pub fn prop(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
133 self.config.props.insert(key.into(), value.into());
134 self
135 }
136}
137
138impl CatalogBuilder for SqlCatalogBuilder {
139 type C = SqlCatalog;
140
141 fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
142 self.storage_factory = Some(storage_factory);
143 self
144 }
145
146 fn load(
147 mut self,
148 name: impl Into<String>,
149 props: HashMap<String, String>,
150 ) -> impl Future<Output = Result<Self::C>> + Send {
151 for (k, v) in props {
152 self.config.props.insert(k, v);
153 }
154
155 if let Some(uri) = self.config.props.remove(SQL_CATALOG_PROP_URI) {
156 self.config.uri = uri;
157 }
158 if let Some(warehouse_location) = self.config.props.remove(SQL_CATALOG_PROP_WAREHOUSE) {
159 self.config.warehouse_location = warehouse_location;
160 }
161
162 let name = name.into();
163
164 let mut valid_sql_bind_style = true;
165 if let Some(sql_bind_style) = self.config.props.remove(SQL_CATALOG_PROP_BIND_STYLE) {
166 if let Ok(sql_bind_style) = SqlBindStyle::from_str(&sql_bind_style) {
167 self.config.sql_bind_style = sql_bind_style;
168 } else {
169 valid_sql_bind_style = false;
170 }
171 }
172
173 let valid_name = !name.trim().is_empty();
174
175 async move {
176 if !valid_name {
177 Err(Error::new(
178 ErrorKind::DataInvalid,
179 "Catalog name cannot be empty",
180 ))
181 } else if !valid_sql_bind_style {
182 Err(Error::new(
183 ErrorKind::DataInvalid,
184 format!(
185 "`{}` values are valid only if they're `{}` or `{}`",
186 SQL_CATALOG_PROP_BIND_STYLE,
187 SqlBindStyle::DollarNumeric,
188 SqlBindStyle::QMark
189 ),
190 ))
191 } else {
192 self.config.name = name;
193 SqlCatalog::new(self.config, self.storage_factory).await
194 }
195 }
196 }
197}
198
199#[derive(Debug)]
208struct SqlCatalogConfig {
209 uri: String,
210 name: String,
211 warehouse_location: String,
212 sql_bind_style: SqlBindStyle,
213 props: HashMap<String, String>,
214}
215
216#[derive(Debug)]
217pub struct SqlCatalog {
219 name: String,
220 connection: AnyPool,
221 warehouse_location: String,
222 fileio: FileIO,
223 sql_bind_style: SqlBindStyle,
224}
225
226#[derive(Debug, PartialEq, strum::EnumString, strum::Display)]
227pub enum SqlBindStyle {
229 DollarNumeric,
231 QMark,
233}
234
235impl SqlCatalog {
236 async fn new(
238 config: SqlCatalogConfig,
239 storage_factory: Option<Arc<dyn StorageFactory>>,
240 ) -> Result<Self> {
241 let factory = storage_factory.ok_or_else(|| {
242 Error::new(
243 ErrorKind::Unexpected,
244 "StorageFactory must be provided for SqlCatalog. Use `with_storage_factory` to configure it.",
245 )
246 })?;
247 let fileio = FileIOBuilder::new(factory).build();
248
249 install_default_drivers();
250 let max_connections: u32 = config
251 .props
252 .get("pool.max-connections")
253 .map(|v| v.parse().unwrap())
254 .unwrap_or(MAX_CONNECTIONS);
255 let idle_timeout: u64 = config
256 .props
257 .get("pool.idle-timeout")
258 .map(|v| v.parse().unwrap())
259 .unwrap_or(IDLE_TIMEOUT);
260 let test_before_acquire: bool = config
261 .props
262 .get("pool.test-before-acquire")
263 .map(|v| v.parse().unwrap())
264 .unwrap_or(TEST_BEFORE_ACQUIRE);
265
266 let pool = AnyPoolOptions::new()
267 .max_connections(max_connections)
268 .idle_timeout(Duration::from_secs(idle_timeout))
269 .test_before_acquire(test_before_acquire)
270 .connect(&config.uri)
271 .await
272 .map_err(from_sqlx_error)?;
273
274 sqlx::query(&format!(
275 "CREATE TABLE IF NOT EXISTS {CATALOG_TABLE_NAME} (
276 {CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL,
277 {CATALOG_FIELD_TABLE_NAMESPACE} VARCHAR(255) NOT NULL,
278 {CATALOG_FIELD_TABLE_NAME} VARCHAR(255) NOT NULL,
279 {CATALOG_FIELD_METADATA_LOCATION_PROP} VARCHAR(1000),
280 {CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} VARCHAR(1000),
281 {CATALOG_FIELD_RECORD_TYPE} VARCHAR(5),
282 PRIMARY KEY ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}))"
283 ))
284 .execute(&pool)
285 .await
286 .map_err(from_sqlx_error)?;
287
288 sqlx::query(&format!(
289 "CREATE TABLE IF NOT EXISTS {NAMESPACE_TABLE_NAME} (
290 {CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL,
291 {NAMESPACE_FIELD_NAME} VARCHAR(255) NOT NULL,
292 {NAMESPACE_FIELD_PROPERTY_KEY} VARCHAR(255),
293 {NAMESPACE_FIELD_PROPERTY_VALUE} VARCHAR(1000),
294 PRIMARY KEY ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}))"
295 ))
296 .execute(&pool)
297 .await
298 .map_err(from_sqlx_error)?;
299
300 Ok(SqlCatalog {
301 name: config.name.to_owned(),
302 connection: pool,
303 warehouse_location: config.warehouse_location,
304 fileio,
305 sql_bind_style: config.sql_bind_style,
306 })
307 }
308
309 fn replace_placeholders(&self, query: &str) -> String {
311 match self.sql_bind_style {
312 SqlBindStyle::DollarNumeric => {
313 let mut count = 1;
314 query
315 .chars()
316 .fold(String::with_capacity(query.len()), |mut acc, c| {
317 if c == '?' {
318 acc.push('$');
319 acc.push_str(&count.to_string());
320 count += 1;
321 } else {
322 acc.push(c);
323 }
324 acc
325 })
326 }
327 _ => query.to_owned(),
328 }
329 }
330
331 async fn fetch_rows(&self, query: &str, args: Vec<Option<&str>>) -> Result<Vec<AnyRow>> {
333 let query_with_placeholders = self.replace_placeholders(query);
334
335 let mut sqlx_query = sqlx::query(&query_with_placeholders);
336 for arg in args {
337 sqlx_query = sqlx_query.bind(arg);
338 }
339
340 sqlx_query
341 .fetch_all(&self.connection)
342 .await
343 .map_err(from_sqlx_error)
344 }
345
346 async fn execute(
348 &self,
349 query: &str,
350 args: Vec<Option<&str>>,
351 transaction: Option<&mut Transaction<'_, Any>>,
352 ) -> Result<AnyQueryResult> {
353 let query_with_placeholders = self.replace_placeholders(query);
354
355 let mut sqlx_query = sqlx::query(&query_with_placeholders);
356 for arg in args {
357 sqlx_query = sqlx_query.bind(arg);
358 }
359
360 match transaction {
361 Some(t) => sqlx_query.execute(&mut **t).await.map_err(from_sqlx_error),
362 None => {
363 let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?;
364 let result = sqlx_query.execute(&mut *tx).await.map_err(from_sqlx_error);
365 let _ = tx.commit().await.map_err(from_sqlx_error);
366 result
367 }
368 }
369 }
370}
371
372#[async_trait]
373impl Catalog for SqlCatalog {
374 async fn list_namespaces(
375 &self,
376 parent: Option<&NamespaceIdent>,
377 ) -> Result<Vec<NamespaceIdent>> {
378 let all_namespaces_stmt = format!(
380 "SELECT {CATALOG_FIELD_TABLE_NAMESPACE}
381 FROM {CATALOG_TABLE_NAME}
382 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
383 UNION
384 SELECT {NAMESPACE_FIELD_NAME}
385 FROM {NAMESPACE_TABLE_NAME}
386 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?"
387 );
388
389 let namespace_rows = self
390 .fetch_rows(&all_namespaces_stmt, vec![
391 Some(&self.name),
392 Some(&self.name),
393 ])
394 .await?;
395
396 let mut namespaces = HashSet::<NamespaceIdent>::with_capacity(namespace_rows.len());
397
398 if let Some(parent) = parent {
399 if self.namespace_exists(parent).await? {
400 let parent_str = parent.join(".");
401
402 for row in namespace_rows.iter() {
403 let nsp = row.try_get::<String, _>(0).map_err(from_sqlx_error)?;
404 if nsp != parent_str && nsp.starts_with(&parent_str) {
406 namespaces.insert(NamespaceIdent::from_strs(nsp.split("."))?);
407 }
408 }
409
410 Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
411 } else {
412 no_such_namespace_err(parent)
413 }
414 } else {
415 for row in namespace_rows.iter() {
416 let nsp = row.try_get::<String, _>(0).map_err(from_sqlx_error)?;
417 let mut levels = nsp.split(".").collect::<Vec<&str>>();
418 if !levels.is_empty() {
419 let first_level = levels.drain(..1).collect::<Vec<&str>>();
420 namespaces.insert(NamespaceIdent::from_strs(first_level)?);
421 }
422 }
423
424 Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
425 }
426 }
427
428 async fn create_namespace(
429 &self,
430 namespace: &NamespaceIdent,
431 properties: HashMap<String, String>,
432 ) -> Result<Namespace> {
433 let exists = self.namespace_exists(namespace).await?;
434
435 if exists {
436 return Err(Error::new(
437 iceberg::ErrorKind::Unexpected,
438 format!("Namespace {namespace:?} already exists"),
439 ));
440 }
441
442 let namespace_str = namespace.join(".");
443 let insert = format!(
444 "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE})
445 VALUES (?, ?, ?, ?)");
446 if !properties.is_empty() {
447 let mut insert_properties = properties.clone();
448 insert_properties.insert("exists".to_string(), "true".to_string());
449
450 let mut query_args = Vec::with_capacity(insert_properties.len() * 4);
451 let mut insert_stmt = insert.clone();
452 for (index, (key, value)) in insert_properties.iter().enumerate() {
453 query_args.extend_from_slice(&[
454 Some(self.name.as_str()),
455 Some(namespace_str.as_str()),
456 Some(key.as_str()),
457 Some(value.as_str()),
458 ]);
459 if index > 0 {
460 insert_stmt.push_str(", (?, ?, ?, ?)");
461 }
462 }
463
464 self.execute(&insert_stmt, query_args, None).await?;
465
466 Ok(Namespace::with_properties(
467 namespace.clone(),
468 insert_properties,
469 ))
470 } else {
471 self.execute(
473 &insert,
474 vec![
475 Some(&self.name),
476 Some(&namespace_str),
477 Some("exists"),
478 Some("true"),
479 ],
480 None,
481 )
482 .await?;
483 Ok(Namespace::with_properties(namespace.clone(), properties))
484 }
485 }
486
487 async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
488 let exists = self.namespace_exists(namespace).await?;
489 if exists {
490 let namespace_props = self
491 .fetch_rows(
492 &format!(
493 "SELECT
494 {NAMESPACE_FIELD_NAME},
495 {NAMESPACE_FIELD_PROPERTY_KEY},
496 {NAMESPACE_FIELD_PROPERTY_VALUE}
497 FROM {NAMESPACE_TABLE_NAME}
498 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
499 AND {NAMESPACE_FIELD_NAME} = ?"
500 ),
501 vec![Some(&self.name), Some(&namespace.join("."))],
502 )
503 .await?;
504
505 let mut properties = HashMap::with_capacity(namespace_props.len());
506
507 for row in namespace_props {
508 let key = row
509 .try_get::<String, _>(NAMESPACE_FIELD_PROPERTY_KEY)
510 .map_err(from_sqlx_error)?;
511 let value = row
512 .try_get::<String, _>(NAMESPACE_FIELD_PROPERTY_VALUE)
513 .map_err(from_sqlx_error)?;
514
515 properties.insert(key, value);
516 }
517
518 Ok(Namespace::with_properties(namespace.clone(), properties))
519 } else {
520 no_such_namespace_err(namespace)
521 }
522 }
523
524 async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
525 let namespace_str = namespace.join(".");
526
527 let table_namespaces = self
528 .fetch_rows(
529 &format!(
530 "SELECT 1 FROM {CATALOG_TABLE_NAME}
531 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
532 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
533 LIMIT 1"
534 ),
535 vec![Some(&self.name), Some(&namespace_str)],
536 )
537 .await?;
538
539 if !table_namespaces.is_empty() {
540 Ok(true)
541 } else {
542 let namespaces = self
543 .fetch_rows(
544 &format!(
545 "SELECT 1 FROM {NAMESPACE_TABLE_NAME}
546 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
547 AND {NAMESPACE_FIELD_NAME} = ?
548 LIMIT 1"
549 ),
550 vec![Some(&self.name), Some(&namespace_str)],
551 )
552 .await?;
553 if !namespaces.is_empty() {
554 Ok(true)
555 } else {
556 Ok(false)
557 }
558 }
559 }
560
561 async fn update_namespace(
562 &self,
563 namespace: &NamespaceIdent,
564 properties: HashMap<String, String>,
565 ) -> Result<()> {
566 let exists = self.namespace_exists(namespace).await?;
567 if exists {
568 let existing_properties = self.get_namespace(namespace).await?.properties().clone();
569 let namespace_str = namespace.join(".");
570
571 let mut updates = vec![];
572 let mut inserts = vec![];
573
574 for (key, value) in properties.iter() {
575 if existing_properties.contains_key(key) {
576 if existing_properties.get(key) != Some(value) {
577 updates.push((key, value));
578 }
579 } else {
580 inserts.push((key, value));
581 }
582 }
583
584 let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?;
585 let update_stmt = format!(
586 "UPDATE {NAMESPACE_TABLE_NAME} SET {NAMESPACE_FIELD_PROPERTY_VALUE} = ?
587 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
588 AND {NAMESPACE_FIELD_NAME} = ?
589 AND {NAMESPACE_FIELD_PROPERTY_KEY} = ?"
590 );
591
592 let insert_stmt = format!(
593 "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE})
594 VALUES (?, ?, ?, ?)"
595 );
596
597 for (key, value) in updates {
598 self.execute(
599 &update_stmt,
600 vec![
601 Some(value),
602 Some(&self.name),
603 Some(&namespace_str),
604 Some(key),
605 ],
606 Some(&mut tx),
607 )
608 .await?;
609 }
610
611 for (key, value) in inserts {
612 self.execute(
613 &insert_stmt,
614 vec![
615 Some(&self.name),
616 Some(&namespace_str),
617 Some(key),
618 Some(value),
619 ],
620 Some(&mut tx),
621 )
622 .await?;
623 }
624
625 let _ = tx.commit().await.map_err(from_sqlx_error)?;
626
627 Ok(())
628 } else {
629 no_such_namespace_err(namespace)
630 }
631 }
632
633 async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
634 let exists = self.namespace_exists(namespace).await?;
635 if exists {
636 let tables = self.list_tables(namespace).await?;
638 if !tables.is_empty() {
639 return Err(Error::new(
640 iceberg::ErrorKind::Unexpected,
641 format!(
642 "Namespace {:?} is not empty. {} tables exist.",
643 namespace,
644 tables.len()
645 ),
646 ));
647 }
648
649 self.execute(
650 &format!(
651 "DELETE FROM {NAMESPACE_TABLE_NAME}
652 WHERE {NAMESPACE_FIELD_NAME} = ?
653 AND {CATALOG_FIELD_CATALOG_NAME} = ?"
654 ),
655 vec![Some(&namespace.join(".")), Some(&self.name)],
656 None,
657 )
658 .await?;
659
660 Ok(())
661 } else {
662 no_such_namespace_err(namespace)
663 }
664 }
665
666 async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
667 let exists = self.namespace_exists(namespace).await?;
668 if exists {
669 let rows = self
670 .fetch_rows(
671 &format!(
672 "SELECT {CATALOG_FIELD_TABLE_NAME},
673 {CATALOG_FIELD_TABLE_NAMESPACE}
674 FROM {CATALOG_TABLE_NAME}
675 WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
676 AND {CATALOG_FIELD_CATALOG_NAME} = ?
677 AND (
678 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
679 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
680 )",
681 ),
682 vec![Some(&namespace.join(".")), Some(&self.name)],
683 )
684 .await?;
685
686 let mut tables = HashSet::<TableIdent>::with_capacity(rows.len());
687
688 for row in rows.iter() {
689 let tbl = row
690 .try_get::<String, _>(CATALOG_FIELD_TABLE_NAME)
691 .map_err(from_sqlx_error)?;
692 let ns_strs = row
693 .try_get::<String, _>(CATALOG_FIELD_TABLE_NAMESPACE)
694 .map_err(from_sqlx_error)?;
695 let ns = NamespaceIdent::from_strs(ns_strs.split("."))?;
696 tables.insert(TableIdent::new(ns, tbl));
697 }
698
699 Ok(tables.into_iter().collect::<Vec<TableIdent>>())
700 } else {
701 no_such_namespace_err(namespace)
702 }
703 }
704
705 async fn table_exists(&self, identifier: &TableIdent) -> Result<bool> {
706 let namespace = identifier.namespace().join(".");
707 let table_name = identifier.name();
708 let table_counts = self
709 .fetch_rows(
710 &format!(
711 "SELECT 1
712 FROM {CATALOG_TABLE_NAME}
713 WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
714 AND {CATALOG_FIELD_CATALOG_NAME} = ?
715 AND {CATALOG_FIELD_TABLE_NAME} = ?
716 AND (
717 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
718 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
719 )"
720 ),
721 vec![Some(&namespace), Some(&self.name), Some(table_name)],
722 )
723 .await?;
724
725 if !table_counts.is_empty() {
726 Ok(true)
727 } else {
728 Ok(false)
729 }
730 }
731
732 async fn drop_table(&self, identifier: &TableIdent) -> Result<()> {
733 if !self.table_exists(identifier).await? {
734 return no_such_table_err(identifier);
735 }
736
737 self.execute(
738 &format!(
739 "DELETE FROM {CATALOG_TABLE_NAME}
740 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
741 AND {CATALOG_FIELD_TABLE_NAME} = ?
742 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
743 AND (
744 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
745 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
746 )"
747 ),
748 vec![
749 Some(&self.name),
750 Some(identifier.name()),
751 Some(&identifier.namespace().join(".")),
752 ],
753 None,
754 )
755 .await?;
756
757 Ok(())
758 }
759
760 async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
761 if !self.table_exists(identifier).await? {
762 return no_such_table_err(identifier);
763 }
764
765 let rows = self
766 .fetch_rows(
767 &format!(
768 "SELECT {CATALOG_FIELD_METADATA_LOCATION_PROP}
769 FROM {CATALOG_TABLE_NAME}
770 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
771 AND {CATALOG_FIELD_TABLE_NAME} = ?
772 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
773 AND (
774 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
775 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
776 )"
777 ),
778 vec![
779 Some(&self.name),
780 Some(identifier.name()),
781 Some(&identifier.namespace().join(".")),
782 ],
783 )
784 .await?;
785
786 if rows.is_empty() {
787 return no_such_table_err(identifier);
788 }
789
790 let row = &rows[0];
791 let tbl_metadata_location = row
792 .try_get::<String, _>(CATALOG_FIELD_METADATA_LOCATION_PROP)
793 .map_err(from_sqlx_error)?;
794
795 let metadata = TableMetadata::read_from(&self.fileio, &tbl_metadata_location).await?;
796
797 Ok(Table::builder()
798 .file_io(self.fileio.clone())
799 .identifier(identifier.clone())
800 .metadata_location(tbl_metadata_location)
801 .metadata(metadata)
802 .build()?)
803 }
804
805 async fn create_table(
806 &self,
807 namespace: &NamespaceIdent,
808 creation: TableCreation,
809 ) -> Result<Table> {
810 if !self.namespace_exists(namespace).await? {
811 return no_such_namespace_err(namespace);
812 }
813
814 let tbl_name = creation.name.clone();
815 let tbl_ident = TableIdent::new(namespace.clone(), tbl_name.clone());
816
817 if self.table_exists(&tbl_ident).await? {
818 return table_already_exists_err(&tbl_ident);
819 }
820
821 let (tbl_creation, location) = match creation.location.clone() {
822 Some(location) => (creation, location),
823 None => {
824 let nsp_properties = self.get_namespace(namespace).await?.properties().clone();
827 let nsp_location = match nsp_properties.get(NAMESPACE_LOCATION_PROPERTY_KEY) {
828 Some(location) => location.clone(),
829 None => {
830 format!(
831 "{}/{}",
832 self.warehouse_location.clone(),
833 namespace.join("/")
834 )
835 }
836 };
837
838 let tbl_location = format!("{}/{}", nsp_location, tbl_ident.name());
839
840 (
841 TableCreation {
842 location: Some(tbl_location.clone()),
843 ..creation
844 },
845 tbl_location,
846 )
847 }
848 };
849
850 let tbl_metadata = TableMetadataBuilder::from_table_creation(tbl_creation)?
851 .build()?
852 .metadata;
853 let tbl_metadata_location =
854 MetadataLocation::new_with_table_location(location.clone()).to_string();
855
856 tbl_metadata
857 .write_to(&self.fileio, &tbl_metadata_location)
858 .await?;
859
860 self.execute(&format!(
861 "INSERT INTO {CATALOG_TABLE_NAME}
862 ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE})
863 VALUES (?, ?, ?, ?, ?)
864 "), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name.clone()), Some(&tbl_metadata_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?;
865
866 Ok(Table::builder()
867 .file_io(self.fileio.clone())
868 .metadata_location(tbl_metadata_location)
869 .identifier(tbl_ident)
870 .metadata(tbl_metadata)
871 .build()?)
872 }
873
874 async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
875 if src == dest {
876 return Ok(());
877 }
878
879 if !self.table_exists(src).await? {
880 return no_such_table_err(src);
881 }
882
883 if !self.namespace_exists(dest.namespace()).await? {
884 return no_such_namespace_err(dest.namespace());
885 }
886
887 if self.table_exists(dest).await? {
888 return table_already_exists_err(dest);
889 }
890
891 self.execute(
892 &format!(
893 "UPDATE {CATALOG_TABLE_NAME}
894 SET {CATALOG_FIELD_TABLE_NAME} = ?, {CATALOG_FIELD_TABLE_NAMESPACE} = ?
895 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
896 AND {CATALOG_FIELD_TABLE_NAME} = ?
897 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
898 AND (
899 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
900 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
901 )"
902 ),
903 vec![
904 Some(dest.name()),
905 Some(&dest.namespace().join(".")),
906 Some(&self.name),
907 Some(src.name()),
908 Some(&src.namespace().join(".")),
909 ],
910 None,
911 )
912 .await?;
913
914 Ok(())
915 }
916
917 async fn register_table(
918 &self,
919 table_ident: &TableIdent,
920 metadata_location: String,
921 ) -> Result<Table> {
922 if self.table_exists(table_ident).await? {
923 return table_already_exists_err(table_ident);
924 }
925
926 let metadata = TableMetadata::read_from(&self.fileio, &metadata_location).await?;
927
928 let namespace = table_ident.namespace();
929 let tbl_name = table_ident.name().to_string();
930
931 self.execute(&format!(
932 "INSERT INTO {CATALOG_TABLE_NAME}
933 ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE})
934 VALUES (?, ?, ?, ?, ?)
935 "), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name), Some(&metadata_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?;
936
937 Ok(Table::builder()
938 .identifier(table_ident.clone())
939 .metadata_location(metadata_location)
940 .metadata(metadata)
941 .file_io(self.fileio.clone())
942 .build()?)
943 }
944
945 async fn update_table(&self, commit: TableCommit) -> Result<Table> {
947 let table_ident = commit.identifier().clone();
948 let current_table = self.load_table(&table_ident).await?;
949 let current_metadata_location = current_table.metadata_location_result()?.to_string();
950
951 let staged_table = commit.apply(current_table)?;
952 let staged_metadata_location = staged_table.metadata_location_result()?;
953
954 staged_table
955 .metadata()
956 .write_to(staged_table.file_io(), &staged_metadata_location)
957 .await?;
958
959 let update_result = self
960 .execute(
961 &format!(
962 "UPDATE {CATALOG_TABLE_NAME}
963 SET {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?, {CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} = ?
964 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
965 AND {CATALOG_FIELD_TABLE_NAME} = ?
966 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
967 AND (
968 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
969 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
970 )
971 AND {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?"
972 ),
973 vec![
974 Some(staged_metadata_location),
975 Some(current_metadata_location.as_str()),
976 Some(&self.name),
977 Some(table_ident.name()),
978 Some(&table_ident.namespace().join(".")),
979 Some(current_metadata_location.as_str()),
980 ],
981 None,
982 )
983 .await?;
984
985 if update_result.rows_affected() == 0 {
986 return Err(Error::new(
987 ErrorKind::CatalogCommitConflicts,
988 format!("Commit conflicted for table: {table_ident}"),
989 )
990 .with_retryable(true));
991 }
992
993 Ok(staged_table)
994 }
995}
996
997#[cfg(test)]
998mod tests {
999 use std::collections::{HashMap, HashSet};
1000 use std::hash::Hash;
1001 use std::sync::Arc;
1002
1003 use iceberg::io::LocalFsStorageFactory;
1004 use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
1005 use iceberg::table::Table;
1006 use iceberg::transaction::{ApplyTransactionAction, Transaction};
1007 use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCreation, TableIdent};
1008 use itertools::Itertools;
1009 use regex::Regex;
1010 use sqlx::migrate::MigrateDatabase;
1011 use tempfile::TempDir;
1012
1013 use crate::catalog::{
1014 NAMESPACE_LOCATION_PROPERTY_KEY, SQL_CATALOG_PROP_BIND_STYLE, SQL_CATALOG_PROP_URI,
1015 SQL_CATALOG_PROP_WAREHOUSE,
1016 };
1017 use crate::{SqlBindStyle, SqlCatalogBuilder};
1018
1019 const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
1020
1021 fn temp_path() -> String {
1022 let temp_dir = TempDir::new().unwrap();
1023 temp_dir.path().to_str().unwrap().to_string()
1024 }
1025
1026 fn to_set<T: std::cmp::Eq + Hash>(vec: Vec<T>) -> HashSet<T> {
1027 HashSet::from_iter(vec)
1028 }
1029
1030 fn default_properties() -> HashMap<String, String> {
1031 HashMap::from([("exists".to_string(), "true".to_string())])
1032 }
1033
1034 async fn new_sql_catalog(
1036 warehouse_location: String,
1037 name: Option<impl ToString>,
1038 ) -> impl Catalog {
1039 let name = if let Some(name) = name {
1040 name.to_string()
1041 } else {
1042 "iceberg".to_string()
1043 };
1044 let sql_lite_uri = format!("sqlite:{}", temp_path());
1045 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1046
1047 let props = HashMap::from_iter([
1048 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.to_string()),
1049 (SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location),
1050 (
1051 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1052 SqlBindStyle::DollarNumeric.to_string(),
1053 ),
1054 ]);
1055 SqlCatalogBuilder::default()
1056 .with_storage_factory(Arc::new(LocalFsStorageFactory))
1057 .load(&name, props)
1058 .await
1059 .unwrap()
1060 }
1061
1062 async fn create_namespace<C: Catalog>(catalog: &C, namespace_ident: &NamespaceIdent) {
1063 let _ = catalog
1064 .create_namespace(namespace_ident, HashMap::new())
1065 .await
1066 .unwrap();
1067 }
1068
1069 async fn create_namespaces<C: Catalog>(catalog: &C, namespace_idents: &Vec<&NamespaceIdent>) {
1070 for namespace_ident in namespace_idents {
1071 let _ = create_namespace(catalog, namespace_ident).await;
1072 }
1073 }
1074
1075 fn simple_table_schema() -> Schema {
1076 Schema::builder()
1077 .with_fields(vec![
1078 NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
1079 ])
1080 .build()
1081 .unwrap()
1082 }
1083
1084 async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) {
1085 let _ = catalog
1086 .create_table(
1087 &table_ident.namespace,
1088 TableCreation::builder()
1089 .name(table_ident.name().into())
1090 .schema(simple_table_schema())
1091 .location(temp_path())
1092 .build(),
1093 )
1094 .await
1095 .unwrap();
1096 }
1097
1098 async fn create_tables<C: Catalog>(catalog: &C, table_idents: Vec<&TableIdent>) {
1099 for table_ident in table_idents {
1100 create_table(catalog, table_ident).await;
1101 }
1102 }
1103
1104 fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) {
1105 assert_eq!(table.identifier(), expected_table_ident);
1106
1107 let metadata = table.metadata();
1108
1109 assert_eq!(metadata.current_schema().as_ref(), expected_schema);
1110
1111 let expected_partition_spec = PartitionSpec::builder(expected_schema.clone())
1112 .with_spec_id(0)
1113 .build()
1114 .unwrap();
1115
1116 assert_eq!(
1117 metadata
1118 .partition_specs_iter()
1119 .map(|p| p.as_ref())
1120 .collect_vec(),
1121 vec![&expected_partition_spec]
1122 );
1123
1124 let expected_sorted_order = SortOrder::builder()
1125 .with_order_id(0)
1126 .with_fields(vec![])
1127 .build(expected_schema)
1128 .unwrap();
1129
1130 assert_eq!(
1131 metadata
1132 .sort_orders_iter()
1133 .map(|s| s.as_ref())
1134 .collect_vec(),
1135 vec![&expected_sorted_order]
1136 );
1137
1138 assert_eq!(metadata.properties(), &HashMap::new());
1139
1140 assert!(!table.readonly());
1141 }
1142
1143 fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) {
1144 let actual = table.metadata_location().unwrap().to_string();
1145 let regex = Regex::new(regex_str).unwrap();
1146 assert!(regex.is_match(&actual))
1147 }
1148
1149 #[tokio::test]
1150 async fn test_initialized() {
1151 let warehouse_loc = temp_path();
1152 new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1153 new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1155 new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1156 }
1157
1158 #[tokio::test]
1159 async fn test_builder_method() {
1160 let sql_lite_uri = format!("sqlite:{}", temp_path());
1161 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1162 let warehouse_location = temp_path();
1163
1164 let catalog = SqlCatalogBuilder::default()
1165 .with_storage_factory(Arc::new(LocalFsStorageFactory))
1166 .uri(sql_lite_uri.to_string())
1167 .warehouse_location(warehouse_location.clone())
1168 .sql_bind_style(SqlBindStyle::QMark)
1169 .load("iceberg", HashMap::default())
1170 .await;
1171 assert!(catalog.is_ok());
1172
1173 let catalog = catalog.unwrap();
1174 assert!(catalog.warehouse_location == warehouse_location);
1175 assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1176 }
1177
1178 #[tokio::test]
1181 async fn test_builder_props_non_existent_path_fails() {
1182 let sql_lite_uri = format!("sqlite:{}", temp_path());
1183 let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1184 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1185 let warehouse_location = temp_path();
1186
1187 let catalog = SqlCatalogBuilder::default()
1188 .with_storage_factory(Arc::new(LocalFsStorageFactory))
1189 .uri(sql_lite_uri)
1190 .warehouse_location(warehouse_location)
1191 .load(
1192 "iceberg",
1193 HashMap::from_iter([(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2)]),
1194 )
1195 .await;
1196 assert!(catalog.is_err());
1197 }
1198
1199 #[tokio::test]
1203 async fn test_builder_props_set_valid_uri() {
1204 let sql_lite_uri = format!("sqlite:{}", temp_path());
1205 let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1206 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1207 let warehouse_location = temp_path();
1208
1209 let catalog = SqlCatalogBuilder::default()
1210 .with_storage_factory(Arc::new(LocalFsStorageFactory))
1211 .uri(sql_lite_uri2)
1212 .warehouse_location(warehouse_location)
1213 .load(
1214 "iceberg",
1215 HashMap::from_iter([(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone())]),
1216 )
1217 .await;
1218 assert!(catalog.is_ok());
1219 }
1220
1221 #[tokio::test]
1223 async fn test_builder_props_take_precedence() {
1224 let sql_lite_uri = format!("sqlite:{}", temp_path());
1225 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1226 let warehouse_location = temp_path();
1227 let warehouse_location2 = temp_path();
1228
1229 let catalog = SqlCatalogBuilder::default()
1230 .with_storage_factory(Arc::new(LocalFsStorageFactory))
1231 .warehouse_location(warehouse_location2)
1232 .sql_bind_style(SqlBindStyle::DollarNumeric)
1233 .load(
1234 "iceberg",
1235 HashMap::from_iter([
1236 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri),
1237 (
1238 SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1239 warehouse_location.clone(),
1240 ),
1241 (
1242 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1243 SqlBindStyle::QMark.to_string(),
1244 ),
1245 ]),
1246 )
1247 .await;
1248
1249 assert!(catalog.is_ok());
1250
1251 let catalog = catalog.unwrap();
1252 assert!(catalog.warehouse_location == warehouse_location);
1253 assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1254 }
1255
1256 #[tokio::test]
1258 async fn test_builder_props_take_precedence_props() {
1259 let sql_lite_uri = format!("sqlite:{}", temp_path());
1260 let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1261 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1262 let warehouse_location = temp_path();
1263 let warehouse_location2 = temp_path();
1264
1265 let props = HashMap::from_iter([
1266 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone()),
1267 (
1268 SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1269 warehouse_location.clone(),
1270 ),
1271 (
1272 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1273 SqlBindStyle::QMark.to_string(),
1274 ),
1275 ]);
1276 let props2 = HashMap::from_iter([
1277 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2.clone()),
1278 (
1279 SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1280 warehouse_location2.clone(),
1281 ),
1282 (
1283 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1284 SqlBindStyle::DollarNumeric.to_string(),
1285 ),
1286 ]);
1287
1288 let catalog = SqlCatalogBuilder::default()
1289 .with_storage_factory(Arc::new(LocalFsStorageFactory))
1290 .props(props2)
1291 .load("iceberg", props)
1292 .await;
1293
1294 assert!(catalog.is_ok());
1295
1296 let catalog = catalog.unwrap();
1297 assert!(catalog.warehouse_location == warehouse_location);
1298 assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1299 }
1300
1301 #[tokio::test]
1303 async fn test_builder_props_take_precedence_prop() {
1304 let sql_lite_uri = format!("sqlite:{}", temp_path());
1305 let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1306 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1307 let warehouse_location = temp_path();
1308 let warehouse_location2 = temp_path();
1309
1310 let props = HashMap::from_iter([
1311 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone()),
1312 (
1313 SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1314 warehouse_location.clone(),
1315 ),
1316 (
1317 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1318 SqlBindStyle::QMark.to_string(),
1319 ),
1320 ]);
1321
1322 let catalog = SqlCatalogBuilder::default()
1323 .with_storage_factory(Arc::new(LocalFsStorageFactory))
1324 .prop(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2)
1325 .prop(SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location2)
1326 .prop(
1327 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1328 SqlBindStyle::DollarNumeric.to_string(),
1329 )
1330 .load("iceberg", props)
1331 .await;
1332
1333 assert!(catalog.is_ok());
1334
1335 let catalog = catalog.unwrap();
1336 assert!(catalog.warehouse_location == warehouse_location);
1337 assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1338 }
1339
1340 #[tokio::test]
1342 async fn test_builder_props_invalid_bind_style_fails() {
1343 let sql_lite_uri = format!("sqlite:{}", temp_path());
1344 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1345 let warehouse_location = temp_path();
1346
1347 let catalog = SqlCatalogBuilder::default()
1348 .with_storage_factory(Arc::new(LocalFsStorageFactory))
1349 .load(
1350 "iceberg",
1351 HashMap::from_iter([
1352 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri),
1353 (SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location),
1354 (SQL_CATALOG_PROP_BIND_STYLE.to_string(), "AAA".to_string()),
1355 ]),
1356 )
1357 .await;
1358
1359 assert!(catalog.is_err());
1360 }
1361
1362 #[tokio::test]
1363 async fn test_list_namespaces_returns_empty_vector() {
1364 let warehouse_loc = temp_path();
1365 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1366
1367 assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
1368 }
1369
1370 #[tokio::test]
1371 async fn test_list_namespaces_returns_empty_different_name() {
1372 let warehouse_loc = temp_path();
1373 let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1374 let namespace_ident_1 = NamespaceIdent::new("a".into());
1375 let namespace_ident_2 = NamespaceIdent::new("b".into());
1376 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1377 assert_eq!(
1378 to_set(catalog.list_namespaces(None).await.unwrap()),
1379 to_set(vec![namespace_ident_1, namespace_ident_2])
1380 );
1381
1382 let catalog2 = new_sql_catalog(warehouse_loc, Some("test")).await;
1383 assert_eq!(catalog2.list_namespaces(None).await.unwrap(), vec![]);
1384 }
1385
1386 #[tokio::test]
1387 async fn test_list_namespaces_returns_multiple_namespaces() {
1388 let warehouse_loc = temp_path();
1389 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1390 let namespace_ident_1 = NamespaceIdent::new("a".into());
1391 let namespace_ident_2 = NamespaceIdent::new("b".into());
1392 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1393
1394 assert_eq!(
1395 to_set(catalog.list_namespaces(None).await.unwrap()),
1396 to_set(vec![namespace_ident_1, namespace_ident_2])
1397 );
1398 }
1399
1400 #[tokio::test]
1401 async fn test_list_namespaces_returns_only_top_level_namespaces() {
1402 let warehouse_loc = temp_path();
1403 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1404 let namespace_ident_1 = NamespaceIdent::new("a".into());
1405 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1406 let namespace_ident_3 = NamespaceIdent::new("b".into());
1407 create_namespaces(&catalog, &vec![
1408 &namespace_ident_1,
1409 &namespace_ident_2,
1410 &namespace_ident_3,
1411 ])
1412 .await;
1413
1414 assert_eq!(
1415 to_set(catalog.list_namespaces(None).await.unwrap()),
1416 to_set(vec![namespace_ident_1, namespace_ident_3])
1417 );
1418 }
1419
1420 #[tokio::test]
1421 async fn test_list_namespaces_returns_no_namespaces_under_parent() {
1422 let warehouse_loc = temp_path();
1423 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1424 let namespace_ident_1 = NamespaceIdent::new("a".into());
1425 let namespace_ident_2 = NamespaceIdent::new("b".into());
1426 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1427
1428 assert_eq!(
1429 catalog
1430 .list_namespaces(Some(&namespace_ident_1))
1431 .await
1432 .unwrap(),
1433 vec![]
1434 );
1435 }
1436
1437 #[tokio::test]
1438 async fn test_list_namespaces_returns_namespace_under_parent() {
1439 let warehouse_loc = temp_path();
1440 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1441 let namespace_ident_1 = NamespaceIdent::new("a".into());
1442 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1443 let namespace_ident_3 = NamespaceIdent::new("c".into());
1444 create_namespaces(&catalog, &vec![
1445 &namespace_ident_1,
1446 &namespace_ident_2,
1447 &namespace_ident_3,
1448 ])
1449 .await;
1450
1451 assert_eq!(
1452 to_set(catalog.list_namespaces(None).await.unwrap()),
1453 to_set(vec![namespace_ident_1.clone(), namespace_ident_3])
1454 );
1455
1456 assert_eq!(
1457 catalog
1458 .list_namespaces(Some(&namespace_ident_1))
1459 .await
1460 .unwrap(),
1461 vec![NamespaceIdent::from_strs(vec!["a", "b"]).unwrap()]
1462 );
1463 }
1464
1465 #[tokio::test]
1466 async fn test_list_namespaces_returns_multiple_namespaces_under_parent() {
1467 let warehouse_loc = temp_path();
1468 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1469 let namespace_ident_1 = NamespaceIdent::new("a".to_string());
1470 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap();
1471 let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1472 let namespace_ident_4 = NamespaceIdent::from_strs(vec!["a", "c"]).unwrap();
1473 let namespace_ident_5 = NamespaceIdent::new("b".into());
1474 create_namespaces(&catalog, &vec![
1475 &namespace_ident_1,
1476 &namespace_ident_2,
1477 &namespace_ident_3,
1478 &namespace_ident_4,
1479 &namespace_ident_5,
1480 ])
1481 .await;
1482
1483 assert_eq!(
1484 to_set(
1485 catalog
1486 .list_namespaces(Some(&namespace_ident_1))
1487 .await
1488 .unwrap()
1489 ),
1490 to_set(vec![
1491 NamespaceIdent::from_strs(vec!["a", "a"]).unwrap(),
1492 NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(),
1493 NamespaceIdent::from_strs(vec!["a", "c"]).unwrap(),
1494 ])
1495 );
1496 }
1497
1498 #[tokio::test]
1499 async fn test_namespace_exists_returns_false() {
1500 let warehouse_loc = temp_path();
1501 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1502 let namespace_ident = NamespaceIdent::new("a".into());
1503 create_namespace(&catalog, &namespace_ident).await;
1504
1505 assert!(
1506 !catalog
1507 .namespace_exists(&NamespaceIdent::new("b".into()))
1508 .await
1509 .unwrap()
1510 );
1511 }
1512
1513 #[tokio::test]
1514 async fn test_namespace_exists_returns_true() {
1515 let warehouse_loc = temp_path();
1516 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1517 let namespace_ident = NamespaceIdent::new("a".into());
1518 create_namespace(&catalog, &namespace_ident).await;
1519
1520 assert!(catalog.namespace_exists(&namespace_ident).await.unwrap());
1521 }
1522
1523 #[tokio::test]
1524 async fn test_create_namespace_with_properties() {
1525 let warehouse_loc = temp_path();
1526 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1527 let namespace_ident = NamespaceIdent::new("abc".into());
1528
1529 let mut properties = default_properties();
1530 properties.insert("k".into(), "v".into());
1531
1532 assert_eq!(
1533 catalog
1534 .create_namespace(&namespace_ident, properties.clone())
1535 .await
1536 .unwrap(),
1537 Namespace::with_properties(namespace_ident.clone(), properties.clone())
1538 );
1539
1540 assert_eq!(
1541 catalog.get_namespace(&namespace_ident).await.unwrap(),
1542 Namespace::with_properties(namespace_ident, properties)
1543 );
1544 }
1545
1546 #[tokio::test]
1547 async fn test_create_namespace_throws_error_if_namespace_already_exists() {
1548 let warehouse_loc = temp_path();
1549 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1550 let namespace_ident = NamespaceIdent::new("a".into());
1551 create_namespace(&catalog, &namespace_ident).await;
1552
1553 assert_eq!(
1554 catalog
1555 .create_namespace(&namespace_ident, HashMap::new())
1556 .await
1557 .unwrap_err()
1558 .to_string(),
1559 format!(
1560 "Unexpected => Namespace {:?} already exists",
1561 &namespace_ident
1562 )
1563 );
1564
1565 assert_eq!(
1566 catalog.get_namespace(&namespace_ident).await.unwrap(),
1567 Namespace::with_properties(namespace_ident, default_properties())
1568 );
1569 }
1570
1571 #[tokio::test]
1572 async fn test_create_nested_namespace() {
1573 let warehouse_loc = temp_path();
1574 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1575 let parent_namespace_ident = NamespaceIdent::new("a".into());
1576 create_namespace(&catalog, &parent_namespace_ident).await;
1577
1578 let child_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1579
1580 assert_eq!(
1581 catalog
1582 .create_namespace(&child_namespace_ident, HashMap::new())
1583 .await
1584 .unwrap(),
1585 Namespace::new(child_namespace_ident.clone())
1586 );
1587
1588 assert_eq!(
1589 catalog.get_namespace(&child_namespace_ident).await.unwrap(),
1590 Namespace::with_properties(child_namespace_ident, default_properties())
1591 );
1592 }
1593
1594 #[tokio::test]
1595 async fn test_create_deeply_nested_namespace() {
1596 let warehouse_loc = temp_path();
1597 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1598 let namespace_ident_a = NamespaceIdent::new("a".into());
1599 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1600 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1601
1602 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1603
1604 assert_eq!(
1605 catalog
1606 .create_namespace(&namespace_ident_a_b_c, HashMap::new())
1607 .await
1608 .unwrap(),
1609 Namespace::new(namespace_ident_a_b_c.clone())
1610 );
1611
1612 assert_eq!(
1613 catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
1614 Namespace::with_properties(namespace_ident_a_b_c, default_properties())
1615 );
1616 }
1617
1618 #[tokio::test]
1619 async fn test_update_namespace_noop() {
1620 let warehouse_loc = temp_path();
1621 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1622 let namespace_ident = NamespaceIdent::new("a".into());
1623 create_namespace(&catalog, &namespace_ident).await;
1624
1625 catalog
1626 .update_namespace(&namespace_ident, HashMap::new())
1627 .await
1628 .unwrap();
1629
1630 assert_eq!(
1631 *catalog
1632 .get_namespace(&namespace_ident)
1633 .await
1634 .unwrap()
1635 .properties(),
1636 HashMap::from_iter([("exists".to_string(), "true".to_string())])
1637 )
1638 }
1639
1640 #[tokio::test]
1641 async fn test_update_namespace() {
1642 let warehouse_loc = temp_path();
1643 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1644 let namespace_ident = NamespaceIdent::new("a".into());
1645 create_namespace(&catalog, &namespace_ident).await;
1646
1647 let mut props = HashMap::from_iter([
1648 ("prop1".to_string(), "val1".to_string()),
1649 ("prop2".into(), "val2".into()),
1650 ]);
1651
1652 catalog
1653 .update_namespace(&namespace_ident, props.clone())
1654 .await
1655 .unwrap();
1656
1657 props.insert("exists".into(), "true".into());
1658
1659 assert_eq!(
1660 *catalog
1661 .get_namespace(&namespace_ident)
1662 .await
1663 .unwrap()
1664 .properties(),
1665 props
1666 )
1667 }
1668
1669 #[tokio::test]
1670 async fn test_update_nested_namespace() {
1671 let warehouse_loc = temp_path();
1672 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1673 let namespace_ident = NamespaceIdent::from_strs(["a", "b"]).unwrap();
1674 create_namespace(&catalog, &namespace_ident).await;
1675
1676 let mut props = HashMap::from_iter([
1677 ("prop1".to_string(), "val1".to_string()),
1678 ("prop2".into(), "val2".into()),
1679 ]);
1680
1681 catalog
1682 .update_namespace(&namespace_ident, props.clone())
1683 .await
1684 .unwrap();
1685
1686 props.insert("exists".into(), "true".into());
1687
1688 assert_eq!(
1689 *catalog
1690 .get_namespace(&namespace_ident)
1691 .await
1692 .unwrap()
1693 .properties(),
1694 props
1695 )
1696 }
1697
1698 #[tokio::test]
1699 async fn test_update_namespace_errors_if_namespace_doesnt_exist() {
1700 let warehouse_loc = temp_path();
1701 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1702 let namespace_ident = NamespaceIdent::new("a".into());
1703
1704 let props = HashMap::from_iter([
1705 ("prop1".to_string(), "val1".to_string()),
1706 ("prop2".into(), "val2".into()),
1707 ]);
1708
1709 let err = catalog
1710 .update_namespace(&namespace_ident, props)
1711 .await
1712 .unwrap_err();
1713
1714 assert_eq!(
1715 err.message(),
1716 format!("No such namespace: {namespace_ident:?}")
1717 );
1718 }
1719
1720 #[tokio::test]
1721 async fn test_update_namespace_errors_if_nested_namespace_doesnt_exist() {
1722 let warehouse_loc = temp_path();
1723 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1724 let namespace_ident = NamespaceIdent::from_strs(["a", "b"]).unwrap();
1725
1726 let props = HashMap::from_iter([
1727 ("prop1".to_string(), "val1".to_string()),
1728 ("prop2".into(), "val2".into()),
1729 ]);
1730
1731 let err = catalog
1732 .update_namespace(&namespace_ident, props)
1733 .await
1734 .unwrap_err();
1735
1736 assert_eq!(
1737 err.message(),
1738 format!("No such namespace: {namespace_ident:?}")
1739 );
1740 }
1741
1742 #[tokio::test]
1743 async fn test_drop_namespace() {
1744 let warehouse_loc = temp_path();
1745 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1746 let namespace_ident = NamespaceIdent::new("abc".into());
1747 create_namespace(&catalog, &namespace_ident).await;
1748
1749 catalog.drop_namespace(&namespace_ident).await.unwrap();
1750
1751 assert!(!catalog.namespace_exists(&namespace_ident).await.unwrap())
1752 }
1753
1754 #[tokio::test]
1755 async fn test_drop_nested_namespace() {
1756 let warehouse_loc = temp_path();
1757 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1758 let namespace_ident_a = NamespaceIdent::new("a".into());
1759 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1760 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1761
1762 catalog.drop_namespace(&namespace_ident_a_b).await.unwrap();
1763
1764 assert!(
1765 !catalog
1766 .namespace_exists(&namespace_ident_a_b)
1767 .await
1768 .unwrap()
1769 );
1770
1771 assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1772 }
1773
1774 #[tokio::test]
1775 async fn test_drop_deeply_nested_namespace() {
1776 let warehouse_loc = temp_path();
1777 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1778 let namespace_ident_a = NamespaceIdent::new("a".into());
1779 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1780 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1781 create_namespaces(&catalog, &vec![
1782 &namespace_ident_a,
1783 &namespace_ident_a_b,
1784 &namespace_ident_a_b_c,
1785 ])
1786 .await;
1787
1788 catalog
1789 .drop_namespace(&namespace_ident_a_b_c)
1790 .await
1791 .unwrap();
1792
1793 assert!(
1794 !catalog
1795 .namespace_exists(&namespace_ident_a_b_c)
1796 .await
1797 .unwrap()
1798 );
1799
1800 assert!(
1801 catalog
1802 .namespace_exists(&namespace_ident_a_b)
1803 .await
1804 .unwrap()
1805 );
1806
1807 assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1808 }
1809
1810 #[tokio::test]
1811 async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() {
1812 let warehouse_loc = temp_path();
1813 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1814
1815 let non_existent_namespace_ident = NamespaceIdent::new("abc".into());
1816 assert_eq!(
1817 catalog
1818 .drop_namespace(&non_existent_namespace_ident)
1819 .await
1820 .unwrap_err()
1821 .to_string(),
1822 format!("Unexpected => No such namespace: {non_existent_namespace_ident:?}")
1823 )
1824 }
1825
1826 #[tokio::test]
1827 async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() {
1828 let warehouse_loc = temp_path();
1829 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1830 create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
1831
1832 let non_existent_namespace_ident =
1833 NamespaceIdent::from_vec(vec!["a".into(), "b".into()]).unwrap();
1834 assert_eq!(
1835 catalog
1836 .drop_namespace(&non_existent_namespace_ident)
1837 .await
1838 .unwrap_err()
1839 .to_string(),
1840 format!("Unexpected => No such namespace: {non_existent_namespace_ident:?}")
1841 )
1842 }
1843
1844 #[tokio::test]
1845 async fn test_dropping_a_namespace_does_not_drop_namespaces_nested_under_that_one() {
1846 let warehouse_loc = temp_path();
1847 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1848 let namespace_ident_a = NamespaceIdent::new("a".into());
1849 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1850 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1851
1852 catalog.drop_namespace(&namespace_ident_a).await.unwrap();
1853
1854 assert!(!catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1855
1856 assert!(
1857 catalog
1858 .namespace_exists(&namespace_ident_a_b)
1859 .await
1860 .unwrap()
1861 );
1862 }
1863
1864 #[tokio::test]
1865 async fn test_list_tables_returns_empty_vector() {
1866 let warehouse_loc = temp_path();
1867 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1868 let namespace_ident = NamespaceIdent::new("a".into());
1869 create_namespace(&catalog, &namespace_ident).await;
1870
1871 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![]);
1872 }
1873
1874 #[tokio::test]
1875 async fn test_list_tables_throws_error_if_namespace_doesnt_exist() {
1876 let warehouse_loc = temp_path();
1877 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1878
1879 let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1880
1881 assert_eq!(
1882 catalog
1883 .list_tables(&non_existent_namespace_ident)
1884 .await
1885 .unwrap_err()
1886 .to_string(),
1887 format!("Unexpected => No such namespace: {non_existent_namespace_ident:?}"),
1888 );
1889 }
1890
1891 #[tokio::test]
1892 async fn test_create_table_with_location() {
1893 let warehouse_loc = temp_path();
1894 let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1895 let namespace_ident = NamespaceIdent::new("a".into());
1896 create_namespace(&catalog, &namespace_ident).await;
1897
1898 let table_name = "abc";
1899 let location = warehouse_loc.clone();
1900 let table_creation = TableCreation::builder()
1901 .name(table_name.into())
1902 .location(location.clone())
1903 .schema(simple_table_schema())
1904 .build();
1905
1906 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1907
1908 assert_table_eq(
1909 &catalog
1910 .create_table(&namespace_ident, table_creation)
1911 .await
1912 .unwrap(),
1913 &expected_table_ident,
1914 &simple_table_schema(),
1915 );
1916
1917 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1918
1919 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1920
1921 assert!(
1922 table
1923 .metadata_location()
1924 .unwrap()
1925 .to_string()
1926 .starts_with(&location)
1927 )
1928 }
1929
1930 #[tokio::test]
1931 async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() {
1932 let warehouse_loc = temp_path();
1933 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1934
1935 let namespace_ident = NamespaceIdent::new("a".into());
1936 let mut namespace_properties = HashMap::new();
1937 let namespace_location = temp_path();
1938 namespace_properties.insert(
1939 NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1940 namespace_location.to_string(),
1941 );
1942 catalog
1943 .create_namespace(&namespace_ident, namespace_properties)
1944 .await
1945 .unwrap();
1946
1947 let table_name = "tbl1";
1948 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1949 let expected_table_metadata_location_regex =
1950 format!("^{namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",);
1951
1952 let table = catalog
1953 .create_table(
1954 &namespace_ident,
1955 TableCreation::builder()
1956 .name(table_name.into())
1957 .schema(simple_table_schema())
1958 .build(),
1960 )
1961 .await
1962 .unwrap();
1963 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1964 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1965
1966 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1967 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1968 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1969 }
1970
1971 #[tokio::test]
1972 async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing()
1973 {
1974 let warehouse_loc = temp_path();
1975 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1976
1977 let namespace_ident = NamespaceIdent::new("a".into());
1978 let mut namespace_properties = HashMap::new();
1979 let namespace_location = temp_path();
1980 namespace_properties.insert(
1981 NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1982 namespace_location.to_string(),
1983 );
1984 catalog
1985 .create_namespace(&namespace_ident, namespace_properties)
1986 .await
1987 .unwrap();
1988
1989 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1990 let mut nested_namespace_properties = HashMap::new();
1991 let nested_namespace_location = temp_path();
1992 nested_namespace_properties.insert(
1993 NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1994 nested_namespace_location.to_string(),
1995 );
1996 catalog
1997 .create_namespace(&nested_namespace_ident, nested_namespace_properties)
1998 .await
1999 .unwrap();
2000
2001 let table_name = "tbl1";
2002 let expected_table_ident =
2003 TableIdent::new(nested_namespace_ident.clone(), table_name.into());
2004 let expected_table_metadata_location_regex = format!(
2005 "^{nested_namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",
2006 );
2007
2008 let table = catalog
2009 .create_table(
2010 &nested_namespace_ident,
2011 TableCreation::builder()
2012 .name(table_name.into())
2013 .schema(simple_table_schema())
2014 .build(),
2016 )
2017 .await
2018 .unwrap();
2019 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
2020 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
2021
2022 let table = catalog.load_table(&expected_table_ident).await.unwrap();
2023 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
2024 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
2025 }
2026
2027 #[tokio::test]
2028 async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
2029 {
2030 let warehouse_loc = temp_path();
2031 let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
2032
2033 let namespace_ident = NamespaceIdent::new("a".into());
2034 let namespace_properties = HashMap::new();
2036 catalog
2037 .create_namespace(&namespace_ident, namespace_properties)
2038 .await
2039 .unwrap();
2040
2041 let table_name = "tbl1";
2042 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2043 let expected_table_metadata_location_regex =
2044 format!("^{warehouse_loc}/a/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
2045
2046 let table = catalog
2047 .create_table(
2048 &namespace_ident,
2049 TableCreation::builder()
2050 .name(table_name.into())
2051 .schema(simple_table_schema())
2052 .build(),
2054 )
2055 .await
2056 .unwrap();
2057 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
2058 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
2059
2060 let table = catalog.load_table(&expected_table_ident).await.unwrap();
2061 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
2062 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
2063 }
2064
2065 #[tokio::test]
2066 async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
2067 {
2068 let warehouse_loc = temp_path();
2069 let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
2070
2071 let namespace_ident = NamespaceIdent::new("a".into());
2072 create_namespace(&catalog, &namespace_ident).await;
2073
2074 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
2075 create_namespace(&catalog, &nested_namespace_ident).await;
2076
2077 let table_name = "tbl1";
2078 let expected_table_ident =
2079 TableIdent::new(nested_namespace_ident.clone(), table_name.into());
2080 let expected_table_metadata_location_regex =
2081 format!("^{warehouse_loc}/a/b/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
2082
2083 let table = catalog
2084 .create_table(
2085 &nested_namespace_ident,
2086 TableCreation::builder()
2087 .name(table_name.into())
2088 .schema(simple_table_schema())
2089 .build(),
2091 )
2092 .await
2093 .unwrap();
2094 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
2095 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
2096
2097 let table = catalog.load_table(&expected_table_ident).await.unwrap();
2098 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
2099 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
2100 }
2101
2102 #[tokio::test]
2103 async fn test_create_table_throws_error_if_table_with_same_name_already_exists() {
2104 let warehouse_loc = temp_path();
2105 let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
2106 let namespace_ident = NamespaceIdent::new("a".into());
2107 create_namespace(&catalog, &namespace_ident).await;
2108 let table_name = "tbl1";
2109 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2110 create_table(&catalog, &table_ident).await;
2111
2112 let tmp_dir = TempDir::new().unwrap();
2113 let location = tmp_dir.path().to_str().unwrap().to_string();
2114
2115 assert_eq!(
2116 catalog
2117 .create_table(
2118 &namespace_ident,
2119 TableCreation::builder()
2120 .name(table_name.into())
2121 .schema(simple_table_schema())
2122 .location(location)
2123 .build()
2124 )
2125 .await
2126 .unwrap_err()
2127 .to_string(),
2128 format!("Unexpected => Table {:?} already exists.", &table_ident)
2129 );
2130 }
2131
2132 #[tokio::test]
2133 async fn test_rename_table_in_same_namespace() {
2134 let warehouse_loc = temp_path();
2135 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2136 let namespace_ident = NamespaceIdent::new("n1".into());
2137 create_namespace(&catalog, &namespace_ident).await;
2138 let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
2139 let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
2140 create_table(&catalog, &src_table_ident).await;
2141
2142 catalog
2143 .rename_table(&src_table_ident, &dst_table_ident)
2144 .await
2145 .unwrap();
2146
2147 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
2148 dst_table_ident
2149 ],);
2150 }
2151
2152 #[tokio::test]
2153 async fn test_rename_table_across_namespaces() {
2154 let warehouse_loc = temp_path();
2155 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2156 let src_namespace_ident = NamespaceIdent::new("a".into());
2157 let dst_namespace_ident = NamespaceIdent::new("b".into());
2158 create_namespaces(&catalog, &vec![&src_namespace_ident, &dst_namespace_ident]).await;
2159 let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
2160 let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl2".into());
2161 create_table(&catalog, &src_table_ident).await;
2162
2163 catalog
2164 .rename_table(&src_table_ident, &dst_table_ident)
2165 .await
2166 .unwrap();
2167
2168 assert_eq!(
2169 catalog.list_tables(&src_namespace_ident).await.unwrap(),
2170 vec![],
2171 );
2172
2173 assert_eq!(
2174 catalog.list_tables(&dst_namespace_ident).await.unwrap(),
2175 vec![dst_table_ident],
2176 );
2177 }
2178
2179 #[tokio::test]
2180 async fn test_rename_table_src_table_is_same_as_dst_table() {
2181 let warehouse_loc = temp_path();
2182 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2183 let namespace_ident = NamespaceIdent::new("n1".into());
2184 create_namespace(&catalog, &namespace_ident).await;
2185 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into());
2186 create_table(&catalog, &table_ident).await;
2187
2188 catalog
2189 .rename_table(&table_ident, &table_ident)
2190 .await
2191 .unwrap();
2192
2193 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
2194 table_ident
2195 ],);
2196 }
2197
2198 #[tokio::test]
2199 async fn test_rename_table_across_nested_namespaces() {
2200 let warehouse_loc = temp_path();
2201 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2202 let namespace_ident_a = NamespaceIdent::new("a".into());
2203 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
2204 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
2205 create_namespaces(&catalog, &vec![
2206 &namespace_ident_a,
2207 &namespace_ident_a_b,
2208 &namespace_ident_a_b_c,
2209 ])
2210 .await;
2211
2212 let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(), "tbl1".into());
2213 create_tables(&catalog, vec![&src_table_ident]).await;
2214
2215 let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
2216 catalog
2217 .rename_table(&src_table_ident, &dst_table_ident)
2218 .await
2219 .unwrap();
2220
2221 assert!(!catalog.table_exists(&src_table_ident).await.unwrap());
2222
2223 assert!(catalog.table_exists(&dst_table_ident).await.unwrap());
2224 }
2225
2226 #[tokio::test]
2227 async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() {
2228 let warehouse_loc = temp_path();
2229 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2230 let src_namespace_ident = NamespaceIdent::new("n1".into());
2231 let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
2232 create_namespace(&catalog, &src_namespace_ident).await;
2233 create_table(&catalog, &src_table_ident).await;
2234
2235 let non_existent_dst_namespace_ident = NamespaceIdent::new("n2".into());
2236 let dst_table_ident =
2237 TableIdent::new(non_existent_dst_namespace_ident.clone(), "tbl1".into());
2238 assert_eq!(
2239 catalog
2240 .rename_table(&src_table_ident, &dst_table_ident)
2241 .await
2242 .unwrap_err()
2243 .to_string(),
2244 format!("Unexpected => No such namespace: {non_existent_dst_namespace_ident:?}"),
2245 );
2246 }
2247
2248 #[tokio::test]
2249 async fn test_rename_table_throws_error_if_src_table_doesnt_exist() {
2250 let warehouse_loc = temp_path();
2251 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2252 let namespace_ident = NamespaceIdent::new("n1".into());
2253 create_namespace(&catalog, &namespace_ident).await;
2254 let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
2255 let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
2256
2257 assert_eq!(
2258 catalog
2259 .rename_table(&src_table_ident, &dst_table_ident)
2260 .await
2261 .unwrap_err()
2262 .to_string(),
2263 format!("Unexpected => No such table: {src_table_ident:?}"),
2264 );
2265 }
2266
2267 #[tokio::test]
2268 async fn test_rename_table_throws_error_if_dst_table_already_exists() {
2269 let warehouse_loc = temp_path();
2270 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2271 let namespace_ident = NamespaceIdent::new("n1".into());
2272 create_namespace(&catalog, &namespace_ident).await;
2273 let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
2274 let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
2275 create_tables(&catalog, vec![&src_table_ident, &dst_table_ident]).await;
2276
2277 assert_eq!(
2278 catalog
2279 .rename_table(&src_table_ident, &dst_table_ident)
2280 .await
2281 .unwrap_err()
2282 .to_string(),
2283 format!("Unexpected => Table {:?} already exists.", &dst_table_ident),
2284 );
2285 }
2286
2287 #[tokio::test]
2288 async fn test_drop_table_throws_error_if_table_not_exist() {
2289 let warehouse_loc = temp_path();
2290 let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
2291 let namespace_ident = NamespaceIdent::new("a".into());
2292 let table_name = "tbl1";
2293 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2294 create_namespace(&catalog, &namespace_ident).await;
2295
2296 let err = catalog
2297 .drop_table(&table_ident)
2298 .await
2299 .unwrap_err()
2300 .to_string();
2301 assert_eq!(
2302 err,
2303 "Unexpected => No such table: TableIdent { namespace: NamespaceIdent([\"a\"]), name: \"tbl1\" }"
2304 );
2305 }
2306
2307 #[tokio::test]
2308 async fn test_drop_table() {
2309 let warehouse_loc = temp_path();
2310 let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
2311 let namespace_ident = NamespaceIdent::new("a".into());
2312 let table_name = "tbl1";
2313 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2314 create_namespace(&catalog, &namespace_ident).await;
2315
2316 let location = warehouse_loc.clone();
2317 let table_creation = TableCreation::builder()
2318 .name(table_name.into())
2319 .location(location.clone())
2320 .schema(simple_table_schema())
2321 .build();
2322
2323 catalog
2324 .create_table(&namespace_ident, table_creation)
2325 .await
2326 .unwrap();
2327
2328 let table = catalog.load_table(&table_ident).await.unwrap();
2329 assert_table_eq(&table, &table_ident, &simple_table_schema());
2330
2331 catalog.drop_table(&table_ident).await.unwrap();
2332 let err = catalog
2333 .load_table(&table_ident)
2334 .await
2335 .unwrap_err()
2336 .to_string();
2337 assert_eq!(
2338 err,
2339 "Unexpected => No such table: TableIdent { namespace: NamespaceIdent([\"a\"]), name: \"tbl1\" }"
2340 );
2341 }
2342
2343 #[tokio::test]
2344 async fn test_register_table_throws_error_if_table_with_same_name_already_exists() {
2345 let warehouse_loc = temp_path();
2346 let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
2347 let namespace_ident = NamespaceIdent::new("a".into());
2348 create_namespace(&catalog, &namespace_ident).await;
2349 let table_name = "tbl1";
2350 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2351 create_table(&catalog, &table_ident).await;
2352
2353 assert_eq!(
2354 catalog
2355 .register_table(&table_ident, warehouse_loc)
2356 .await
2357 .unwrap_err()
2358 .to_string(),
2359 format!("Unexpected => Table {:?} already exists.", &table_ident)
2360 );
2361 }
2362
2363 #[tokio::test]
2364 async fn test_register_table() {
2365 let warehouse_loc = temp_path();
2366 let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
2367 let namespace_ident = NamespaceIdent::new("a".into());
2368 create_namespace(&catalog, &namespace_ident).await;
2369
2370 let table_name = "abc";
2371 let location = warehouse_loc.clone();
2372 let table_creation = TableCreation::builder()
2373 .name(table_name.into())
2374 .location(location.clone())
2375 .schema(simple_table_schema())
2376 .build();
2377
2378 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2379 let expected_table = catalog
2380 .create_table(&namespace_ident, table_creation)
2381 .await
2382 .unwrap();
2383
2384 let metadata_location = expected_table
2385 .metadata_location()
2386 .expect("Expected metadata location to be set")
2387 .to_string();
2388
2389 assert_table_eq(&expected_table, &table_ident, &simple_table_schema());
2390
2391 let _ = catalog.drop_table(&table_ident).await;
2392
2393 let table = catalog
2394 .register_table(&table_ident, metadata_location.clone())
2395 .await
2396 .unwrap();
2397
2398 assert_eq!(table.identifier(), expected_table.identifier());
2399 assert_eq!(table.metadata_location(), Some(metadata_location.as_str()));
2400 }
2401
2402 #[tokio::test]
2403 async fn test_update_table() {
2404 let warehouse_loc = temp_path();
2405 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2406
2407 let namespace_ident = NamespaceIdent::new("ns1".into());
2409 create_namespace(&catalog, &namespace_ident).await;
2410 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
2411 create_table(&catalog, &table_ident).await;
2412
2413 let table = catalog.load_table(&table_ident).await.unwrap();
2414
2415 let original_metadata_location = table.metadata_location().unwrap().to_string();
2417
2418 let tx = Transaction::new(&table);
2420 let tx = tx
2421 .update_table_properties()
2422 .set("test_property".to_string(), "test_value".to_string())
2423 .apply(tx)
2424 .unwrap();
2425
2426 let updated_table = tx.commit(&catalog).await.unwrap();
2428
2429 assert_eq!(
2431 updated_table.metadata().properties().get("test_property"),
2432 Some(&"test_value".to_string())
2433 );
2434 assert_ne!(
2436 updated_table.metadata_location().unwrap(),
2437 original_metadata_location.as_str()
2438 );
2439
2440 let reloaded = catalog.load_table(&table_ident).await.unwrap();
2442
2443 assert_eq!(
2445 reloaded.metadata().properties().get("test_property"),
2446 Some(&"test_value".to_string())
2447 );
2448 assert_eq!(
2449 reloaded.metadata_location(),
2450 updated_table.metadata_location()
2451 );
2452 }
2453}