Skip to main content

iceberg_catalog_sql/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::{HashMap, HashSet};
19use std::str::FromStr;
20use std::sync::Arc;
21use std::time::Duration;
22
23use async_trait::async_trait;
24use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
25use iceberg::spec::{TableMetadata, TableMetadataBuilder};
26use iceberg::table::Table;
27use iceberg::{
28    Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
29    TableCommit, TableCreation, TableIdent,
30};
31use sqlx::any::{AnyPoolOptions, AnyQueryResult, AnyRow, install_default_drivers};
32use sqlx::{Any, AnyPool, Row, Transaction};
33
34use crate::error::{
35    from_sqlx_error, no_such_namespace_err, no_such_table_err, table_already_exists_err,
36};
37
38/// catalog URI
39pub const SQL_CATALOG_PROP_URI: &str = "uri";
40/// catalog warehouse location
41pub const SQL_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
42/// catalog sql bind style
43pub const SQL_CATALOG_PROP_BIND_STYLE: &str = "sql_bind_style";
44
45static CATALOG_TABLE_NAME: &str = "iceberg_tables";
46static CATALOG_FIELD_CATALOG_NAME: &str = "catalog_name";
47static CATALOG_FIELD_TABLE_NAME: &str = "table_name";
48static CATALOG_FIELD_TABLE_NAMESPACE: &str = "table_namespace";
49static CATALOG_FIELD_METADATA_LOCATION_PROP: &str = "metadata_location";
50static CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location";
51static CATALOG_FIELD_RECORD_TYPE: &str = "iceberg_type";
52static CATALOG_FIELD_TABLE_RECORD_TYPE: &str = "TABLE";
53
54static NAMESPACE_TABLE_NAME: &str = "iceberg_namespace_properties";
55static NAMESPACE_FIELD_NAME: &str = "namespace";
56static NAMESPACE_FIELD_PROPERTY_KEY: &str = "property_key";
57static NAMESPACE_FIELD_PROPERTY_VALUE: &str = "property_value";
58
59static NAMESPACE_LOCATION_PROPERTY_KEY: &str = "location";
60
61static MAX_CONNECTIONS: u32 = 10; // Default the SQL pool to 10 connections if not provided
62static IDLE_TIMEOUT: u64 = 10; // Default the maximum idle timeout per connection to 10s before it is closed
63static TEST_BEFORE_ACQUIRE: bool = true; // Default the health-check of each connection to enabled prior to returning
64
65/// Builder for [`SqlCatalog`]
66#[derive(Debug)]
67pub struct SqlCatalogBuilder {
68    config: SqlCatalogConfig,
69    storage_factory: Option<Arc<dyn StorageFactory>>,
70}
71
72impl Default for SqlCatalogBuilder {
73    fn default() -> Self {
74        Self {
75            config: SqlCatalogConfig {
76                uri: "".to_string(),
77                name: "".to_string(),
78                warehouse_location: "".to_string(),
79                sql_bind_style: SqlBindStyle::DollarNumeric,
80                props: HashMap::new(),
81            },
82            storage_factory: None,
83        }
84    }
85}
86
87impl SqlCatalogBuilder {
88    /// Configure the database URI
89    ///
90    /// If `SQL_CATALOG_PROP_URI` has a value set in `props` during `SqlCatalogBuilder::load`,
91    /// that value takes precedence, and the value specified by this method will not be used.
92    pub fn uri(mut self, uri: impl Into<String>) -> Self {
93        self.config.uri = uri.into();
94        self
95    }
96
97    /// Configure the warehouse location
98    ///
99    /// If `SQL_CATALOG_PROP_WAREHOUSE` has a value set in `props` during `SqlCatalogBuilder::load`,
100    /// that value takes precedence, and the value specified by this method will not be used.
101    pub fn warehouse_location(mut self, location: impl Into<String>) -> Self {
102        self.config.warehouse_location = location.into();
103        self
104    }
105
106    /// Configure the bound SQL Statement
107    ///
108    /// If `SQL_CATALOG_PROP_BIND_STYLE` has a value set in `props` during `SqlCatalogBuilder::load`,
109    /// that value takes precedence, and the value specified by this method will not be used.
110    pub fn sql_bind_style(mut self, sql_bind_style: SqlBindStyle) -> Self {
111        self.config.sql_bind_style = sql_bind_style;
112        self
113    }
114
115    /// Configure the any properties
116    ///
117    /// If the same key has values set in `props` during `SqlCatalogBuilder::load`,
118    /// those values will take precedence.
119    pub fn props(mut self, props: HashMap<String, String>) -> Self {
120        for (k, v) in props {
121            self.config.props.insert(k, v);
122        }
123        self
124    }
125
126    /// Set a new property on the property to be configured.
127    /// When multiple methods are executed with the same key,
128    /// the later-set value takes precedence.
129    ///
130    /// If the same key has values set in `props` during `SqlCatalogBuilder::load`,
131    /// those values will take precedence.
132    pub fn prop(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
133        self.config.props.insert(key.into(), value.into());
134        self
135    }
136}
137
138impl CatalogBuilder for SqlCatalogBuilder {
139    type C = SqlCatalog;
140
141    fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
142        self.storage_factory = Some(storage_factory);
143        self
144    }
145
146    fn load(
147        mut self,
148        name: impl Into<String>,
149        props: HashMap<String, String>,
150    ) -> impl Future<Output = Result<Self::C>> + Send {
151        for (k, v) in props {
152            self.config.props.insert(k, v);
153        }
154
155        if let Some(uri) = self.config.props.remove(SQL_CATALOG_PROP_URI) {
156            self.config.uri = uri;
157        }
158        if let Some(warehouse_location) = self.config.props.remove(SQL_CATALOG_PROP_WAREHOUSE) {
159            self.config.warehouse_location = warehouse_location;
160        }
161
162        let name = name.into();
163
164        let mut valid_sql_bind_style = true;
165        if let Some(sql_bind_style) = self.config.props.remove(SQL_CATALOG_PROP_BIND_STYLE) {
166            if let Ok(sql_bind_style) = SqlBindStyle::from_str(&sql_bind_style) {
167                self.config.sql_bind_style = sql_bind_style;
168            } else {
169                valid_sql_bind_style = false;
170            }
171        }
172
173        let valid_name = !name.trim().is_empty();
174
175        async move {
176            if !valid_name {
177                Err(Error::new(
178                    ErrorKind::DataInvalid,
179                    "Catalog name cannot be empty",
180                ))
181            } else if !valid_sql_bind_style {
182                Err(Error::new(
183                    ErrorKind::DataInvalid,
184                    format!(
185                        "`{}` values are valid only if they're `{}` or `{}`",
186                        SQL_CATALOG_PROP_BIND_STYLE,
187                        SqlBindStyle::DollarNumeric,
188                        SqlBindStyle::QMark
189                    ),
190                ))
191            } else {
192                self.config.name = name;
193                SqlCatalog::new(self.config, self.storage_factory).await
194            }
195        }
196    }
197}
198
199/// A struct representing the SQL catalog configuration.
200///
201/// This struct contains various parameters that are used to configure a SQL catalog,
202/// such as the database URI, warehouse location, and file I/O settings.
203/// You are required to provide a `SqlBindStyle`, which determines how SQL statements will be bound to values in the catalog.
204/// The options available for this parameter include:
205/// - `SqlBindStyle::DollarNumeric`: Binds SQL statements using `$1`, `$2`, etc., as placeholders. This is for PostgreSQL databases.
206/// - `SqlBindStyle::QuestionMark`: Binds SQL statements using `?` as a placeholder. This is for MySQL and SQLite databases.
207#[derive(Debug)]
208struct SqlCatalogConfig {
209    uri: String,
210    name: String,
211    warehouse_location: String,
212    sql_bind_style: SqlBindStyle,
213    props: HashMap<String, String>,
214}
215
216#[derive(Debug)]
217/// Sql catalog implementation.
218pub struct SqlCatalog {
219    name: String,
220    connection: AnyPool,
221    warehouse_location: String,
222    fileio: FileIO,
223    sql_bind_style: SqlBindStyle,
224}
225
226#[derive(Debug, PartialEq, strum::EnumString, strum::Display)]
227/// Set the SQL parameter bind style to either $1..$N (Postgres style) or ? (SQLite/MySQL/MariaDB)
228pub enum SqlBindStyle {
229    /// DollarNumeric uses parameters of the form `$1..$N``, which is the Postgres style
230    DollarNumeric,
231    /// QMark uses parameters of the form `?` which is the style for other dialects (SQLite/MySQL/MariaDB)
232    QMark,
233}
234
235impl SqlCatalog {
236    /// Create new sql catalog instance
237    async fn new(
238        config: SqlCatalogConfig,
239        storage_factory: Option<Arc<dyn StorageFactory>>,
240    ) -> Result<Self> {
241        let factory = storage_factory.ok_or_else(|| {
242            Error::new(
243                ErrorKind::Unexpected,
244                "StorageFactory must be provided for SqlCatalog. Use `with_storage_factory` to configure it.",
245            )
246        })?;
247        let fileio = FileIOBuilder::new(factory).build();
248
249        install_default_drivers();
250        let max_connections: u32 = config
251            .props
252            .get("pool.max-connections")
253            .map(|v| v.parse().unwrap())
254            .unwrap_or(MAX_CONNECTIONS);
255        let idle_timeout: u64 = config
256            .props
257            .get("pool.idle-timeout")
258            .map(|v| v.parse().unwrap())
259            .unwrap_or(IDLE_TIMEOUT);
260        let test_before_acquire: bool = config
261            .props
262            .get("pool.test-before-acquire")
263            .map(|v| v.parse().unwrap())
264            .unwrap_or(TEST_BEFORE_ACQUIRE);
265
266        let pool = AnyPoolOptions::new()
267            .max_connections(max_connections)
268            .idle_timeout(Duration::from_secs(idle_timeout))
269            .test_before_acquire(test_before_acquire)
270            .connect(&config.uri)
271            .await
272            .map_err(from_sqlx_error)?;
273
274        sqlx::query(&format!(
275            "CREATE TABLE IF NOT EXISTS {CATALOG_TABLE_NAME} (
276                {CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL,
277                {CATALOG_FIELD_TABLE_NAMESPACE} VARCHAR(255) NOT NULL,
278                {CATALOG_FIELD_TABLE_NAME} VARCHAR(255) NOT NULL,
279                {CATALOG_FIELD_METADATA_LOCATION_PROP} VARCHAR(1000),
280                {CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} VARCHAR(1000),
281                {CATALOG_FIELD_RECORD_TYPE} VARCHAR(5),
282                PRIMARY KEY ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}))"
283        ))
284        .execute(&pool)
285        .await
286        .map_err(from_sqlx_error)?;
287
288        sqlx::query(&format!(
289            "CREATE TABLE IF NOT EXISTS {NAMESPACE_TABLE_NAME} (
290                {CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL,
291                {NAMESPACE_FIELD_NAME} VARCHAR(255) NOT NULL,
292                {NAMESPACE_FIELD_PROPERTY_KEY} VARCHAR(255),
293                {NAMESPACE_FIELD_PROPERTY_VALUE} VARCHAR(1000),
294                PRIMARY KEY ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}))"
295        ))
296        .execute(&pool)
297        .await
298        .map_err(from_sqlx_error)?;
299
300        Ok(SqlCatalog {
301            name: config.name.to_owned(),
302            connection: pool,
303            warehouse_location: config.warehouse_location,
304            fileio,
305            sql_bind_style: config.sql_bind_style,
306        })
307    }
308
309    /// SQLX Any does not implement PostgresSQL bindings, so we have to do this.
310    fn replace_placeholders(&self, query: &str) -> String {
311        match self.sql_bind_style {
312            SqlBindStyle::DollarNumeric => {
313                let mut count = 1;
314                query
315                    .chars()
316                    .fold(String::with_capacity(query.len()), |mut acc, c| {
317                        if c == '?' {
318                            acc.push('$');
319                            acc.push_str(&count.to_string());
320                            count += 1;
321                        } else {
322                            acc.push(c);
323                        }
324                        acc
325                    })
326            }
327            _ => query.to_owned(),
328        }
329    }
330
331    /// Fetch a vec of AnyRows from a given query
332    async fn fetch_rows(&self, query: &str, args: Vec<Option<&str>>) -> Result<Vec<AnyRow>> {
333        let query_with_placeholders = self.replace_placeholders(query);
334
335        let mut sqlx_query = sqlx::query(&query_with_placeholders);
336        for arg in args {
337            sqlx_query = sqlx_query.bind(arg);
338        }
339
340        sqlx_query
341            .fetch_all(&self.connection)
342            .await
343            .map_err(from_sqlx_error)
344    }
345
346    /// Execute statements in a transaction, provided or not
347    async fn execute(
348        &self,
349        query: &str,
350        args: Vec<Option<&str>>,
351        transaction: Option<&mut Transaction<'_, Any>>,
352    ) -> Result<AnyQueryResult> {
353        let query_with_placeholders = self.replace_placeholders(query);
354
355        let mut sqlx_query = sqlx::query(&query_with_placeholders);
356        for arg in args {
357            sqlx_query = sqlx_query.bind(arg);
358        }
359
360        match transaction {
361            Some(t) => sqlx_query.execute(&mut **t).await.map_err(from_sqlx_error),
362            None => {
363                let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?;
364                let result = sqlx_query.execute(&mut *tx).await.map_err(from_sqlx_error);
365                let _ = tx.commit().await.map_err(from_sqlx_error);
366                result
367            }
368        }
369    }
370}
371
372#[async_trait]
373impl Catalog for SqlCatalog {
374    async fn list_namespaces(
375        &self,
376        parent: Option<&NamespaceIdent>,
377    ) -> Result<Vec<NamespaceIdent>> {
378        // UNION will remove duplicates.
379        let all_namespaces_stmt = format!(
380            "SELECT {CATALOG_FIELD_TABLE_NAMESPACE}
381             FROM {CATALOG_TABLE_NAME}
382             WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
383             UNION
384             SELECT {NAMESPACE_FIELD_NAME}
385             FROM {NAMESPACE_TABLE_NAME}
386             WHERE {CATALOG_FIELD_CATALOG_NAME} = ?"
387        );
388
389        let namespace_rows = self
390            .fetch_rows(&all_namespaces_stmt, vec![
391                Some(&self.name),
392                Some(&self.name),
393            ])
394            .await?;
395
396        let mut namespaces = HashSet::<NamespaceIdent>::with_capacity(namespace_rows.len());
397
398        if let Some(parent) = parent {
399            if self.namespace_exists(parent).await? {
400                let parent_str = parent.join(".");
401
402                for row in namespace_rows.iter() {
403                    let nsp = row.try_get::<String, _>(0).map_err(from_sqlx_error)?;
404                    // if parent = a, then we only want to see a.b, a.c returned.
405                    if nsp != parent_str && nsp.starts_with(&parent_str) {
406                        namespaces.insert(NamespaceIdent::from_strs(nsp.split("."))?);
407                    }
408                }
409
410                Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
411            } else {
412                no_such_namespace_err(parent)
413            }
414        } else {
415            for row in namespace_rows.iter() {
416                let nsp = row.try_get::<String, _>(0).map_err(from_sqlx_error)?;
417                let mut levels = nsp.split(".").collect::<Vec<&str>>();
418                if !levels.is_empty() {
419                    let first_level = levels.drain(..1).collect::<Vec<&str>>();
420                    namespaces.insert(NamespaceIdent::from_strs(first_level)?);
421                }
422            }
423
424            Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
425        }
426    }
427
428    async fn create_namespace(
429        &self,
430        namespace: &NamespaceIdent,
431        properties: HashMap<String, String>,
432    ) -> Result<Namespace> {
433        let exists = self.namespace_exists(namespace).await?;
434
435        if exists {
436            return Err(Error::new(
437                iceberg::ErrorKind::Unexpected,
438                format!("Namespace {namespace:?} already exists"),
439            ));
440        }
441
442        let namespace_str = namespace.join(".");
443        let insert = format!(
444            "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE})
445             VALUES (?, ?, ?, ?)");
446        if !properties.is_empty() {
447            let mut insert_properties = properties.clone();
448            insert_properties.insert("exists".to_string(), "true".to_string());
449
450            let mut query_args = Vec::with_capacity(insert_properties.len() * 4);
451            let mut insert_stmt = insert.clone();
452            for (index, (key, value)) in insert_properties.iter().enumerate() {
453                query_args.extend_from_slice(&[
454                    Some(self.name.as_str()),
455                    Some(namespace_str.as_str()),
456                    Some(key.as_str()),
457                    Some(value.as_str()),
458                ]);
459                if index > 0 {
460                    insert_stmt.push_str(", (?, ?, ?, ?)");
461                }
462            }
463
464            self.execute(&insert_stmt, query_args, None).await?;
465
466            Ok(Namespace::with_properties(
467                namespace.clone(),
468                insert_properties,
469            ))
470        } else {
471            // set a default property of exists = true
472            self.execute(
473                &insert,
474                vec![
475                    Some(&self.name),
476                    Some(&namespace_str),
477                    Some("exists"),
478                    Some("true"),
479                ],
480                None,
481            )
482            .await?;
483            Ok(Namespace::with_properties(namespace.clone(), properties))
484        }
485    }
486
487    async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
488        let exists = self.namespace_exists(namespace).await?;
489        if exists {
490            let namespace_props = self
491                .fetch_rows(
492                    &format!(
493                        "SELECT
494                            {NAMESPACE_FIELD_NAME},
495                            {NAMESPACE_FIELD_PROPERTY_KEY},
496                            {NAMESPACE_FIELD_PROPERTY_VALUE}
497                            FROM {NAMESPACE_TABLE_NAME}
498                            WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
499                            AND {NAMESPACE_FIELD_NAME} = ?"
500                    ),
501                    vec![Some(&self.name), Some(&namespace.join("."))],
502                )
503                .await?;
504
505            let mut properties = HashMap::with_capacity(namespace_props.len());
506
507            for row in namespace_props {
508                let key = row
509                    .try_get::<String, _>(NAMESPACE_FIELD_PROPERTY_KEY)
510                    .map_err(from_sqlx_error)?;
511                let value = row
512                    .try_get::<String, _>(NAMESPACE_FIELD_PROPERTY_VALUE)
513                    .map_err(from_sqlx_error)?;
514
515                properties.insert(key, value);
516            }
517
518            Ok(Namespace::with_properties(namespace.clone(), properties))
519        } else {
520            no_such_namespace_err(namespace)
521        }
522    }
523
524    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
525        let namespace_str = namespace.join(".");
526
527        let table_namespaces = self
528            .fetch_rows(
529                &format!(
530                    "SELECT 1 FROM {CATALOG_TABLE_NAME}
531                     WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
532                      AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
533                     LIMIT 1"
534                ),
535                vec![Some(&self.name), Some(&namespace_str)],
536            )
537            .await?;
538
539        if !table_namespaces.is_empty() {
540            Ok(true)
541        } else {
542            let namespaces = self
543                .fetch_rows(
544                    &format!(
545                        "SELECT 1 FROM {NAMESPACE_TABLE_NAME}
546                         WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
547                          AND {NAMESPACE_FIELD_NAME} = ?
548                         LIMIT 1"
549                    ),
550                    vec![Some(&self.name), Some(&namespace_str)],
551                )
552                .await?;
553            if !namespaces.is_empty() {
554                Ok(true)
555            } else {
556                Ok(false)
557            }
558        }
559    }
560
561    async fn update_namespace(
562        &self,
563        namespace: &NamespaceIdent,
564        properties: HashMap<String, String>,
565    ) -> Result<()> {
566        let exists = self.namespace_exists(namespace).await?;
567        if exists {
568            let existing_properties = self.get_namespace(namespace).await?.properties().clone();
569            let namespace_str = namespace.join(".");
570
571            let mut updates = vec![];
572            let mut inserts = vec![];
573
574            for (key, value) in properties.iter() {
575                if existing_properties.contains_key(key) {
576                    if existing_properties.get(key) != Some(value) {
577                        updates.push((key, value));
578                    }
579                } else {
580                    inserts.push((key, value));
581                }
582            }
583
584            let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?;
585            let update_stmt = format!(
586                "UPDATE {NAMESPACE_TABLE_NAME} SET {NAMESPACE_FIELD_PROPERTY_VALUE} = ?
587                 WHERE {CATALOG_FIELD_CATALOG_NAME} = ? 
588                 AND {NAMESPACE_FIELD_NAME} = ?
589                 AND {NAMESPACE_FIELD_PROPERTY_KEY} = ?"
590            );
591
592            let insert_stmt = format!(
593                "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE})
594                 VALUES (?, ?, ?, ?)"
595            );
596
597            for (key, value) in updates {
598                self.execute(
599                    &update_stmt,
600                    vec![
601                        Some(value),
602                        Some(&self.name),
603                        Some(&namespace_str),
604                        Some(key),
605                    ],
606                    Some(&mut tx),
607                )
608                .await?;
609            }
610
611            for (key, value) in inserts {
612                self.execute(
613                    &insert_stmt,
614                    vec![
615                        Some(&self.name),
616                        Some(&namespace_str),
617                        Some(key),
618                        Some(value),
619                    ],
620                    Some(&mut tx),
621                )
622                .await?;
623            }
624
625            let _ = tx.commit().await.map_err(from_sqlx_error)?;
626
627            Ok(())
628        } else {
629            no_such_namespace_err(namespace)
630        }
631    }
632
633    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
634        let exists = self.namespace_exists(namespace).await?;
635        if exists {
636            // if there are tables in the namespace, don't allow drop.
637            let tables = self.list_tables(namespace).await?;
638            if !tables.is_empty() {
639                return Err(Error::new(
640                    iceberg::ErrorKind::Unexpected,
641                    format!(
642                        "Namespace {:?} is not empty. {} tables exist.",
643                        namespace,
644                        tables.len()
645                    ),
646                ));
647            }
648
649            self.execute(
650                &format!(
651                    "DELETE FROM {NAMESPACE_TABLE_NAME}
652                     WHERE {NAMESPACE_FIELD_NAME} = ?
653                      AND {CATALOG_FIELD_CATALOG_NAME} = ?"
654                ),
655                vec![Some(&namespace.join(".")), Some(&self.name)],
656                None,
657            )
658            .await?;
659
660            Ok(())
661        } else {
662            no_such_namespace_err(namespace)
663        }
664    }
665
666    async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
667        let exists = self.namespace_exists(namespace).await?;
668        if exists {
669            let rows = self
670                .fetch_rows(
671                    &format!(
672                        "SELECT {CATALOG_FIELD_TABLE_NAME},
673                                {CATALOG_FIELD_TABLE_NAMESPACE}
674                         FROM {CATALOG_TABLE_NAME}
675                         WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
676                          AND {CATALOG_FIELD_CATALOG_NAME} = ?
677                          AND (
678                                {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' 
679                                OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
680                          )",
681                    ),
682                    vec![Some(&namespace.join(".")), Some(&self.name)],
683                )
684                .await?;
685
686            let mut tables = HashSet::<TableIdent>::with_capacity(rows.len());
687
688            for row in rows.iter() {
689                let tbl = row
690                    .try_get::<String, _>(CATALOG_FIELD_TABLE_NAME)
691                    .map_err(from_sqlx_error)?;
692                let ns_strs = row
693                    .try_get::<String, _>(CATALOG_FIELD_TABLE_NAMESPACE)
694                    .map_err(from_sqlx_error)?;
695                let ns = NamespaceIdent::from_strs(ns_strs.split("."))?;
696                tables.insert(TableIdent::new(ns, tbl));
697            }
698
699            Ok(tables.into_iter().collect::<Vec<TableIdent>>())
700        } else {
701            no_such_namespace_err(namespace)
702        }
703    }
704
705    async fn table_exists(&self, identifier: &TableIdent) -> Result<bool> {
706        let namespace = identifier.namespace().join(".");
707        let table_name = identifier.name();
708        let table_counts = self
709            .fetch_rows(
710                &format!(
711                    "SELECT 1
712                     FROM {CATALOG_TABLE_NAME}
713                     WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
714                      AND {CATALOG_FIELD_CATALOG_NAME} = ?
715                      AND {CATALOG_FIELD_TABLE_NAME} = ?
716                      AND (
717                        {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' 
718                        OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
719                      )"
720                ),
721                vec![Some(&namespace), Some(&self.name), Some(table_name)],
722            )
723            .await?;
724
725        if !table_counts.is_empty() {
726            Ok(true)
727        } else {
728            Ok(false)
729        }
730    }
731
732    async fn drop_table(&self, identifier: &TableIdent) -> Result<()> {
733        if !self.table_exists(identifier).await? {
734            return no_such_table_err(identifier);
735        }
736
737        self.execute(
738            &format!(
739                "DELETE FROM {CATALOG_TABLE_NAME}
740                 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
741                  AND {CATALOG_FIELD_TABLE_NAME} = ?
742                  AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
743                  AND (
744                    {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' 
745                    OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
746                  )"
747            ),
748            vec![
749                Some(&self.name),
750                Some(identifier.name()),
751                Some(&identifier.namespace().join(".")),
752            ],
753            None,
754        )
755        .await?;
756
757        Ok(())
758    }
759
760    async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
761        if !self.table_exists(identifier).await? {
762            return no_such_table_err(identifier);
763        }
764
765        let rows = self
766            .fetch_rows(
767                &format!(
768                    "SELECT {CATALOG_FIELD_METADATA_LOCATION_PROP}
769                     FROM {CATALOG_TABLE_NAME}
770                     WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
771                      AND {CATALOG_FIELD_TABLE_NAME} = ?
772                      AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
773                      AND (
774                        {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' 
775                        OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
776                      )"
777                ),
778                vec![
779                    Some(&self.name),
780                    Some(identifier.name()),
781                    Some(&identifier.namespace().join(".")),
782                ],
783            )
784            .await?;
785
786        if rows.is_empty() {
787            return no_such_table_err(identifier);
788        }
789
790        let row = &rows[0];
791        let tbl_metadata_location = row
792            .try_get::<String, _>(CATALOG_FIELD_METADATA_LOCATION_PROP)
793            .map_err(from_sqlx_error)?;
794
795        let metadata = TableMetadata::read_from(&self.fileio, &tbl_metadata_location).await?;
796
797        Ok(Table::builder()
798            .file_io(self.fileio.clone())
799            .identifier(identifier.clone())
800            .metadata_location(tbl_metadata_location)
801            .metadata(metadata)
802            .build()?)
803    }
804
805    async fn create_table(
806        &self,
807        namespace: &NamespaceIdent,
808        creation: TableCreation,
809    ) -> Result<Table> {
810        if !self.namespace_exists(namespace).await? {
811            return no_such_namespace_err(namespace);
812        }
813
814        let tbl_name = creation.name.clone();
815        let tbl_ident = TableIdent::new(namespace.clone(), tbl_name.clone());
816
817        if self.table_exists(&tbl_ident).await? {
818            return table_already_exists_err(&tbl_ident);
819        }
820
821        let (tbl_creation, location) = match creation.location.clone() {
822            Some(location) => (creation, location),
823            None => {
824                // fall back to namespace-specific location
825                // and then to warehouse location
826                let nsp_properties = self.get_namespace(namespace).await?.properties().clone();
827                let nsp_location = match nsp_properties.get(NAMESPACE_LOCATION_PROPERTY_KEY) {
828                    Some(location) => location.clone(),
829                    None => {
830                        format!(
831                            "{}/{}",
832                            self.warehouse_location.clone(),
833                            namespace.join("/")
834                        )
835                    }
836                };
837
838                let tbl_location = format!("{}/{}", nsp_location, tbl_ident.name());
839
840                (
841                    TableCreation {
842                        location: Some(tbl_location.clone()),
843                        ..creation
844                    },
845                    tbl_location,
846                )
847            }
848        };
849
850        let tbl_metadata = TableMetadataBuilder::from_table_creation(tbl_creation)?
851            .build()?
852            .metadata;
853        let tbl_metadata_location =
854            MetadataLocation::new_with_table_location(location.clone()).to_string();
855
856        tbl_metadata
857            .write_to(&self.fileio, &tbl_metadata_location)
858            .await?;
859
860        self.execute(&format!(
861            "INSERT INTO {CATALOG_TABLE_NAME}
862             ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE})
863             VALUES (?, ?, ?, ?, ?)
864            "), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name.clone()), Some(&tbl_metadata_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?;
865
866        Ok(Table::builder()
867            .file_io(self.fileio.clone())
868            .metadata_location(tbl_metadata_location)
869            .identifier(tbl_ident)
870            .metadata(tbl_metadata)
871            .build()?)
872    }
873
874    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
875        if src == dest {
876            return Ok(());
877        }
878
879        if !self.table_exists(src).await? {
880            return no_such_table_err(src);
881        }
882
883        if !self.namespace_exists(dest.namespace()).await? {
884            return no_such_namespace_err(dest.namespace());
885        }
886
887        if self.table_exists(dest).await? {
888            return table_already_exists_err(dest);
889        }
890
891        self.execute(
892            &format!(
893                "UPDATE {CATALOG_TABLE_NAME}
894                 SET {CATALOG_FIELD_TABLE_NAME} = ?, {CATALOG_FIELD_TABLE_NAMESPACE} = ?
895                 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
896                  AND {CATALOG_FIELD_TABLE_NAME} = ?
897                  AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
898                  AND (
899                    {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
900                    OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
901                )"
902            ),
903            vec![
904                Some(dest.name()),
905                Some(&dest.namespace().join(".")),
906                Some(&self.name),
907                Some(src.name()),
908                Some(&src.namespace().join(".")),
909            ],
910            None,
911        )
912        .await?;
913
914        Ok(())
915    }
916
917    async fn register_table(
918        &self,
919        table_ident: &TableIdent,
920        metadata_location: String,
921    ) -> Result<Table> {
922        if self.table_exists(table_ident).await? {
923            return table_already_exists_err(table_ident);
924        }
925
926        let metadata = TableMetadata::read_from(&self.fileio, &metadata_location).await?;
927
928        let namespace = table_ident.namespace();
929        let tbl_name = table_ident.name().to_string();
930
931        self.execute(&format!(
932            "INSERT INTO {CATALOG_TABLE_NAME}
933             ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE})
934             VALUES (?, ?, ?, ?, ?)
935            "), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name), Some(&metadata_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?;
936
937        Ok(Table::builder()
938            .identifier(table_ident.clone())
939            .metadata_location(metadata_location)
940            .metadata(metadata)
941            .file_io(self.fileio.clone())
942            .build()?)
943    }
944
945    /// Updates an existing table within the SQL catalog.
946    async fn update_table(&self, commit: TableCommit) -> Result<Table> {
947        let table_ident = commit.identifier().clone();
948        let current_table = self.load_table(&table_ident).await?;
949        let current_metadata_location = current_table.metadata_location_result()?.to_string();
950
951        let staged_table = commit.apply(current_table)?;
952        let staged_metadata_location = staged_table.metadata_location_result()?;
953
954        staged_table
955            .metadata()
956            .write_to(staged_table.file_io(), &staged_metadata_location)
957            .await?;
958
959        let update_result = self
960            .execute(
961                &format!(
962                    "UPDATE {CATALOG_TABLE_NAME}
963                     SET {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?, {CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} = ?
964                     WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
965                      AND {CATALOG_FIELD_TABLE_NAME} = ?
966                      AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
967                      AND (
968                        {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
969                        OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
970                      )
971                      AND {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?"
972                ),
973                vec![
974                    Some(staged_metadata_location),
975                    Some(current_metadata_location.as_str()),
976                    Some(&self.name),
977                    Some(table_ident.name()),
978                    Some(&table_ident.namespace().join(".")),
979                    Some(current_metadata_location.as_str()),
980                ],
981                None,
982            )
983            .await?;
984
985        if update_result.rows_affected() == 0 {
986            return Err(Error::new(
987                ErrorKind::CatalogCommitConflicts,
988                format!("Commit conflicted for table: {table_ident}"),
989            )
990            .with_retryable(true));
991        }
992
993        Ok(staged_table)
994    }
995}
996
997#[cfg(test)]
998mod tests {
999    use std::collections::{HashMap, HashSet};
1000    use std::hash::Hash;
1001    use std::sync::Arc;
1002
1003    use iceberg::io::LocalFsStorageFactory;
1004    use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
1005    use iceberg::table::Table;
1006    use iceberg::transaction::{ApplyTransactionAction, Transaction};
1007    use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCreation, TableIdent};
1008    use itertools::Itertools;
1009    use regex::Regex;
1010    use sqlx::migrate::MigrateDatabase;
1011    use tempfile::TempDir;
1012
1013    use crate::catalog::{
1014        NAMESPACE_LOCATION_PROPERTY_KEY, SQL_CATALOG_PROP_BIND_STYLE, SQL_CATALOG_PROP_URI,
1015        SQL_CATALOG_PROP_WAREHOUSE,
1016    };
1017    use crate::{SqlBindStyle, SqlCatalogBuilder};
1018
1019    const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
1020
1021    fn temp_path() -> String {
1022        let temp_dir = TempDir::new().unwrap();
1023        temp_dir.path().to_str().unwrap().to_string()
1024    }
1025
1026    fn to_set<T: std::cmp::Eq + Hash>(vec: Vec<T>) -> HashSet<T> {
1027        HashSet::from_iter(vec)
1028    }
1029
1030    fn default_properties() -> HashMap<String, String> {
1031        HashMap::from([("exists".to_string(), "true".to_string())])
1032    }
1033
1034    /// Create a new SQLite catalog for testing. If name is not specified it defaults to "iceberg".
1035    async fn new_sql_catalog(
1036        warehouse_location: String,
1037        name: Option<impl ToString>,
1038    ) -> impl Catalog {
1039        let name = if let Some(name) = name {
1040            name.to_string()
1041        } else {
1042            "iceberg".to_string()
1043        };
1044        let sql_lite_uri = format!("sqlite:{}", temp_path());
1045        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1046
1047        let props = HashMap::from_iter([
1048            (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.to_string()),
1049            (SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location),
1050            (
1051                SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1052                SqlBindStyle::DollarNumeric.to_string(),
1053            ),
1054        ]);
1055        SqlCatalogBuilder::default()
1056            .with_storage_factory(Arc::new(LocalFsStorageFactory))
1057            .load(&name, props)
1058            .await
1059            .unwrap()
1060    }
1061
1062    async fn create_namespace<C: Catalog>(catalog: &C, namespace_ident: &NamespaceIdent) {
1063        let _ = catalog
1064            .create_namespace(namespace_ident, HashMap::new())
1065            .await
1066            .unwrap();
1067    }
1068
1069    async fn create_namespaces<C: Catalog>(catalog: &C, namespace_idents: &Vec<&NamespaceIdent>) {
1070        for namespace_ident in namespace_idents {
1071            let _ = create_namespace(catalog, namespace_ident).await;
1072        }
1073    }
1074
1075    fn simple_table_schema() -> Schema {
1076        Schema::builder()
1077            .with_fields(vec![
1078                NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
1079            ])
1080            .build()
1081            .unwrap()
1082    }
1083
1084    async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) {
1085        let _ = catalog
1086            .create_table(
1087                &table_ident.namespace,
1088                TableCreation::builder()
1089                    .name(table_ident.name().into())
1090                    .schema(simple_table_schema())
1091                    .location(temp_path())
1092                    .build(),
1093            )
1094            .await
1095            .unwrap();
1096    }
1097
1098    async fn create_tables<C: Catalog>(catalog: &C, table_idents: Vec<&TableIdent>) {
1099        for table_ident in table_idents {
1100            create_table(catalog, table_ident).await;
1101        }
1102    }
1103
1104    fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) {
1105        assert_eq!(table.identifier(), expected_table_ident);
1106
1107        let metadata = table.metadata();
1108
1109        assert_eq!(metadata.current_schema().as_ref(), expected_schema);
1110
1111        let expected_partition_spec = PartitionSpec::builder(expected_schema.clone())
1112            .with_spec_id(0)
1113            .build()
1114            .unwrap();
1115
1116        assert_eq!(
1117            metadata
1118                .partition_specs_iter()
1119                .map(|p| p.as_ref())
1120                .collect_vec(),
1121            vec![&expected_partition_spec]
1122        );
1123
1124        let expected_sorted_order = SortOrder::builder()
1125            .with_order_id(0)
1126            .with_fields(vec![])
1127            .build(expected_schema)
1128            .unwrap();
1129
1130        assert_eq!(
1131            metadata
1132                .sort_orders_iter()
1133                .map(|s| s.as_ref())
1134                .collect_vec(),
1135            vec![&expected_sorted_order]
1136        );
1137
1138        assert_eq!(metadata.properties(), &HashMap::new());
1139
1140        assert!(!table.readonly());
1141    }
1142
1143    fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) {
1144        let actual = table.metadata_location().unwrap().to_string();
1145        let regex = Regex::new(regex_str).unwrap();
1146        assert!(regex.is_match(&actual))
1147    }
1148
1149    #[tokio::test]
1150    async fn test_initialized() {
1151        let warehouse_loc = temp_path();
1152        new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1153        // catalog instantiation should not fail even if tables exist
1154        new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1155        new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1156    }
1157
1158    #[tokio::test]
1159    async fn test_builder_method() {
1160        let sql_lite_uri = format!("sqlite:{}", temp_path());
1161        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1162        let warehouse_location = temp_path();
1163
1164        let catalog = SqlCatalogBuilder::default()
1165            .with_storage_factory(Arc::new(LocalFsStorageFactory))
1166            .uri(sql_lite_uri.to_string())
1167            .warehouse_location(warehouse_location.clone())
1168            .sql_bind_style(SqlBindStyle::QMark)
1169            .load("iceberg", HashMap::default())
1170            .await;
1171        assert!(catalog.is_ok());
1172
1173        let catalog = catalog.unwrap();
1174        assert!(catalog.warehouse_location == warehouse_location);
1175        assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1176    }
1177
1178    /// Overwriting an sqlite database with a non-existent path causes
1179    /// catalog generation to fail
1180    #[tokio::test]
1181    async fn test_builder_props_non_existent_path_fails() {
1182        let sql_lite_uri = format!("sqlite:{}", temp_path());
1183        let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1184        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1185        let warehouse_location = temp_path();
1186
1187        let catalog = SqlCatalogBuilder::default()
1188            .with_storage_factory(Arc::new(LocalFsStorageFactory))
1189            .uri(sql_lite_uri)
1190            .warehouse_location(warehouse_location)
1191            .load(
1192                "iceberg",
1193                HashMap::from_iter([(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2)]),
1194            )
1195            .await;
1196        assert!(catalog.is_err());
1197    }
1198
1199    /// Even when an invalid URI is specified in a builder method,
1200    /// it can be successfully overridden with a valid URI in props
1201    /// for catalog generation to succeed.
1202    #[tokio::test]
1203    async fn test_builder_props_set_valid_uri() {
1204        let sql_lite_uri = format!("sqlite:{}", temp_path());
1205        let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1206        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1207        let warehouse_location = temp_path();
1208
1209        let catalog = SqlCatalogBuilder::default()
1210            .with_storage_factory(Arc::new(LocalFsStorageFactory))
1211            .uri(sql_lite_uri2)
1212            .warehouse_location(warehouse_location)
1213            .load(
1214                "iceberg",
1215                HashMap::from_iter([(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone())]),
1216            )
1217            .await;
1218        assert!(catalog.is_ok());
1219    }
1220
1221    /// values assigned via props take precedence
1222    #[tokio::test]
1223    async fn test_builder_props_take_precedence() {
1224        let sql_lite_uri = format!("sqlite:{}", temp_path());
1225        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1226        let warehouse_location = temp_path();
1227        let warehouse_location2 = temp_path();
1228
1229        let catalog = SqlCatalogBuilder::default()
1230            .with_storage_factory(Arc::new(LocalFsStorageFactory))
1231            .warehouse_location(warehouse_location2)
1232            .sql_bind_style(SqlBindStyle::DollarNumeric)
1233            .load(
1234                "iceberg",
1235                HashMap::from_iter([
1236                    (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri),
1237                    (
1238                        SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1239                        warehouse_location.clone(),
1240                    ),
1241                    (
1242                        SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1243                        SqlBindStyle::QMark.to_string(),
1244                    ),
1245                ]),
1246            )
1247            .await;
1248
1249        assert!(catalog.is_ok());
1250
1251        let catalog = catalog.unwrap();
1252        assert!(catalog.warehouse_location == warehouse_location);
1253        assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1254    }
1255
1256    /// values assigned via props take precedence
1257    #[tokio::test]
1258    async fn test_builder_props_take_precedence_props() {
1259        let sql_lite_uri = format!("sqlite:{}", temp_path());
1260        let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1261        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1262        let warehouse_location = temp_path();
1263        let warehouse_location2 = temp_path();
1264
1265        let props = HashMap::from_iter([
1266            (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone()),
1267            (
1268                SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1269                warehouse_location.clone(),
1270            ),
1271            (
1272                SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1273                SqlBindStyle::QMark.to_string(),
1274            ),
1275        ]);
1276        let props2 = HashMap::from_iter([
1277            (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2.clone()),
1278            (
1279                SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1280                warehouse_location2.clone(),
1281            ),
1282            (
1283                SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1284                SqlBindStyle::DollarNumeric.to_string(),
1285            ),
1286        ]);
1287
1288        let catalog = SqlCatalogBuilder::default()
1289            .with_storage_factory(Arc::new(LocalFsStorageFactory))
1290            .props(props2)
1291            .load("iceberg", props)
1292            .await;
1293
1294        assert!(catalog.is_ok());
1295
1296        let catalog = catalog.unwrap();
1297        assert!(catalog.warehouse_location == warehouse_location);
1298        assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1299    }
1300
1301    /// values assigned via props take precedence
1302    #[tokio::test]
1303    async fn test_builder_props_take_precedence_prop() {
1304        let sql_lite_uri = format!("sqlite:{}", temp_path());
1305        let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1306        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1307        let warehouse_location = temp_path();
1308        let warehouse_location2 = temp_path();
1309
1310        let props = HashMap::from_iter([
1311            (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone()),
1312            (
1313                SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1314                warehouse_location.clone(),
1315            ),
1316            (
1317                SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1318                SqlBindStyle::QMark.to_string(),
1319            ),
1320        ]);
1321
1322        let catalog = SqlCatalogBuilder::default()
1323            .with_storage_factory(Arc::new(LocalFsStorageFactory))
1324            .prop(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2)
1325            .prop(SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location2)
1326            .prop(
1327                SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1328                SqlBindStyle::DollarNumeric.to_string(),
1329            )
1330            .load("iceberg", props)
1331            .await;
1332
1333        assert!(catalog.is_ok());
1334
1335        let catalog = catalog.unwrap();
1336        assert!(catalog.warehouse_location == warehouse_location);
1337        assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1338    }
1339
1340    /// invalid value for `SqlBindStyle` causes catalog creation to fail
1341    #[tokio::test]
1342    async fn test_builder_props_invalid_bind_style_fails() {
1343        let sql_lite_uri = format!("sqlite:{}", temp_path());
1344        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1345        let warehouse_location = temp_path();
1346
1347        let catalog = SqlCatalogBuilder::default()
1348            .with_storage_factory(Arc::new(LocalFsStorageFactory))
1349            .load(
1350                "iceberg",
1351                HashMap::from_iter([
1352                    (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri),
1353                    (SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location),
1354                    (SQL_CATALOG_PROP_BIND_STYLE.to_string(), "AAA".to_string()),
1355                ]),
1356            )
1357            .await;
1358
1359        assert!(catalog.is_err());
1360    }
1361
1362    #[tokio::test]
1363    async fn test_list_namespaces_returns_empty_vector() {
1364        let warehouse_loc = temp_path();
1365        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1366
1367        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
1368    }
1369
1370    #[tokio::test]
1371    async fn test_list_namespaces_returns_empty_different_name() {
1372        let warehouse_loc = temp_path();
1373        let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1374        let namespace_ident_1 = NamespaceIdent::new("a".into());
1375        let namespace_ident_2 = NamespaceIdent::new("b".into());
1376        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1377        assert_eq!(
1378            to_set(catalog.list_namespaces(None).await.unwrap()),
1379            to_set(vec![namespace_ident_1, namespace_ident_2])
1380        );
1381
1382        let catalog2 = new_sql_catalog(warehouse_loc, Some("test")).await;
1383        assert_eq!(catalog2.list_namespaces(None).await.unwrap(), vec![]);
1384    }
1385
1386    #[tokio::test]
1387    async fn test_list_namespaces_returns_multiple_namespaces() {
1388        let warehouse_loc = temp_path();
1389        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1390        let namespace_ident_1 = NamespaceIdent::new("a".into());
1391        let namespace_ident_2 = NamespaceIdent::new("b".into());
1392        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1393
1394        assert_eq!(
1395            to_set(catalog.list_namespaces(None).await.unwrap()),
1396            to_set(vec![namespace_ident_1, namespace_ident_2])
1397        );
1398    }
1399
1400    #[tokio::test]
1401    async fn test_list_namespaces_returns_only_top_level_namespaces() {
1402        let warehouse_loc = temp_path();
1403        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1404        let namespace_ident_1 = NamespaceIdent::new("a".into());
1405        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1406        let namespace_ident_3 = NamespaceIdent::new("b".into());
1407        create_namespaces(&catalog, &vec![
1408            &namespace_ident_1,
1409            &namespace_ident_2,
1410            &namespace_ident_3,
1411        ])
1412        .await;
1413
1414        assert_eq!(
1415            to_set(catalog.list_namespaces(None).await.unwrap()),
1416            to_set(vec![namespace_ident_1, namespace_ident_3])
1417        );
1418    }
1419
1420    #[tokio::test]
1421    async fn test_list_namespaces_returns_no_namespaces_under_parent() {
1422        let warehouse_loc = temp_path();
1423        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1424        let namespace_ident_1 = NamespaceIdent::new("a".into());
1425        let namespace_ident_2 = NamespaceIdent::new("b".into());
1426        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1427
1428        assert_eq!(
1429            catalog
1430                .list_namespaces(Some(&namespace_ident_1))
1431                .await
1432                .unwrap(),
1433            vec![]
1434        );
1435    }
1436
1437    #[tokio::test]
1438    async fn test_list_namespaces_returns_namespace_under_parent() {
1439        let warehouse_loc = temp_path();
1440        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1441        let namespace_ident_1 = NamespaceIdent::new("a".into());
1442        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1443        let namespace_ident_3 = NamespaceIdent::new("c".into());
1444        create_namespaces(&catalog, &vec![
1445            &namespace_ident_1,
1446            &namespace_ident_2,
1447            &namespace_ident_3,
1448        ])
1449        .await;
1450
1451        assert_eq!(
1452            to_set(catalog.list_namespaces(None).await.unwrap()),
1453            to_set(vec![namespace_ident_1.clone(), namespace_ident_3])
1454        );
1455
1456        assert_eq!(
1457            catalog
1458                .list_namespaces(Some(&namespace_ident_1))
1459                .await
1460                .unwrap(),
1461            vec![NamespaceIdent::from_strs(vec!["a", "b"]).unwrap()]
1462        );
1463    }
1464
1465    #[tokio::test]
1466    async fn test_list_namespaces_returns_multiple_namespaces_under_parent() {
1467        let warehouse_loc = temp_path();
1468        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1469        let namespace_ident_1 = NamespaceIdent::new("a".to_string());
1470        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap();
1471        let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1472        let namespace_ident_4 = NamespaceIdent::from_strs(vec!["a", "c"]).unwrap();
1473        let namespace_ident_5 = NamespaceIdent::new("b".into());
1474        create_namespaces(&catalog, &vec![
1475            &namespace_ident_1,
1476            &namespace_ident_2,
1477            &namespace_ident_3,
1478            &namespace_ident_4,
1479            &namespace_ident_5,
1480        ])
1481        .await;
1482
1483        assert_eq!(
1484            to_set(
1485                catalog
1486                    .list_namespaces(Some(&namespace_ident_1))
1487                    .await
1488                    .unwrap()
1489            ),
1490            to_set(vec![
1491                NamespaceIdent::from_strs(vec!["a", "a"]).unwrap(),
1492                NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(),
1493                NamespaceIdent::from_strs(vec!["a", "c"]).unwrap(),
1494            ])
1495        );
1496    }
1497
1498    #[tokio::test]
1499    async fn test_namespace_exists_returns_false() {
1500        let warehouse_loc = temp_path();
1501        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1502        let namespace_ident = NamespaceIdent::new("a".into());
1503        create_namespace(&catalog, &namespace_ident).await;
1504
1505        assert!(
1506            !catalog
1507                .namespace_exists(&NamespaceIdent::new("b".into()))
1508                .await
1509                .unwrap()
1510        );
1511    }
1512
1513    #[tokio::test]
1514    async fn test_namespace_exists_returns_true() {
1515        let warehouse_loc = temp_path();
1516        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1517        let namespace_ident = NamespaceIdent::new("a".into());
1518        create_namespace(&catalog, &namespace_ident).await;
1519
1520        assert!(catalog.namespace_exists(&namespace_ident).await.unwrap());
1521    }
1522
1523    #[tokio::test]
1524    async fn test_create_namespace_with_properties() {
1525        let warehouse_loc = temp_path();
1526        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1527        let namespace_ident = NamespaceIdent::new("abc".into());
1528
1529        let mut properties = default_properties();
1530        properties.insert("k".into(), "v".into());
1531
1532        assert_eq!(
1533            catalog
1534                .create_namespace(&namespace_ident, properties.clone())
1535                .await
1536                .unwrap(),
1537            Namespace::with_properties(namespace_ident.clone(), properties.clone())
1538        );
1539
1540        assert_eq!(
1541            catalog.get_namespace(&namespace_ident).await.unwrap(),
1542            Namespace::with_properties(namespace_ident, properties)
1543        );
1544    }
1545
1546    #[tokio::test]
1547    async fn test_create_namespace_throws_error_if_namespace_already_exists() {
1548        let warehouse_loc = temp_path();
1549        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1550        let namespace_ident = NamespaceIdent::new("a".into());
1551        create_namespace(&catalog, &namespace_ident).await;
1552
1553        assert_eq!(
1554            catalog
1555                .create_namespace(&namespace_ident, HashMap::new())
1556                .await
1557                .unwrap_err()
1558                .to_string(),
1559            format!(
1560                "Unexpected => Namespace {:?} already exists",
1561                &namespace_ident
1562            )
1563        );
1564
1565        assert_eq!(
1566            catalog.get_namespace(&namespace_ident).await.unwrap(),
1567            Namespace::with_properties(namespace_ident, default_properties())
1568        );
1569    }
1570
1571    #[tokio::test]
1572    async fn test_create_nested_namespace() {
1573        let warehouse_loc = temp_path();
1574        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1575        let parent_namespace_ident = NamespaceIdent::new("a".into());
1576        create_namespace(&catalog, &parent_namespace_ident).await;
1577
1578        let child_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1579
1580        assert_eq!(
1581            catalog
1582                .create_namespace(&child_namespace_ident, HashMap::new())
1583                .await
1584                .unwrap(),
1585            Namespace::new(child_namespace_ident.clone())
1586        );
1587
1588        assert_eq!(
1589            catalog.get_namespace(&child_namespace_ident).await.unwrap(),
1590            Namespace::with_properties(child_namespace_ident, default_properties())
1591        );
1592    }
1593
1594    #[tokio::test]
1595    async fn test_create_deeply_nested_namespace() {
1596        let warehouse_loc = temp_path();
1597        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1598        let namespace_ident_a = NamespaceIdent::new("a".into());
1599        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1600        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1601
1602        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1603
1604        assert_eq!(
1605            catalog
1606                .create_namespace(&namespace_ident_a_b_c, HashMap::new())
1607                .await
1608                .unwrap(),
1609            Namespace::new(namespace_ident_a_b_c.clone())
1610        );
1611
1612        assert_eq!(
1613            catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
1614            Namespace::with_properties(namespace_ident_a_b_c, default_properties())
1615        );
1616    }
1617
1618    #[tokio::test]
1619    async fn test_update_namespace_noop() {
1620        let warehouse_loc = temp_path();
1621        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1622        let namespace_ident = NamespaceIdent::new("a".into());
1623        create_namespace(&catalog, &namespace_ident).await;
1624
1625        catalog
1626            .update_namespace(&namespace_ident, HashMap::new())
1627            .await
1628            .unwrap();
1629
1630        assert_eq!(
1631            *catalog
1632                .get_namespace(&namespace_ident)
1633                .await
1634                .unwrap()
1635                .properties(),
1636            HashMap::from_iter([("exists".to_string(), "true".to_string())])
1637        )
1638    }
1639
1640    #[tokio::test]
1641    async fn test_update_namespace() {
1642        let warehouse_loc = temp_path();
1643        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1644        let namespace_ident = NamespaceIdent::new("a".into());
1645        create_namespace(&catalog, &namespace_ident).await;
1646
1647        let mut props = HashMap::from_iter([
1648            ("prop1".to_string(), "val1".to_string()),
1649            ("prop2".into(), "val2".into()),
1650        ]);
1651
1652        catalog
1653            .update_namespace(&namespace_ident, props.clone())
1654            .await
1655            .unwrap();
1656
1657        props.insert("exists".into(), "true".into());
1658
1659        assert_eq!(
1660            *catalog
1661                .get_namespace(&namespace_ident)
1662                .await
1663                .unwrap()
1664                .properties(),
1665            props
1666        )
1667    }
1668
1669    #[tokio::test]
1670    async fn test_update_nested_namespace() {
1671        let warehouse_loc = temp_path();
1672        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1673        let namespace_ident = NamespaceIdent::from_strs(["a", "b"]).unwrap();
1674        create_namespace(&catalog, &namespace_ident).await;
1675
1676        let mut props = HashMap::from_iter([
1677            ("prop1".to_string(), "val1".to_string()),
1678            ("prop2".into(), "val2".into()),
1679        ]);
1680
1681        catalog
1682            .update_namespace(&namespace_ident, props.clone())
1683            .await
1684            .unwrap();
1685
1686        props.insert("exists".into(), "true".into());
1687
1688        assert_eq!(
1689            *catalog
1690                .get_namespace(&namespace_ident)
1691                .await
1692                .unwrap()
1693                .properties(),
1694            props
1695        )
1696    }
1697
1698    #[tokio::test]
1699    async fn test_update_namespace_errors_if_namespace_doesnt_exist() {
1700        let warehouse_loc = temp_path();
1701        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1702        let namespace_ident = NamespaceIdent::new("a".into());
1703
1704        let props = HashMap::from_iter([
1705            ("prop1".to_string(), "val1".to_string()),
1706            ("prop2".into(), "val2".into()),
1707        ]);
1708
1709        let err = catalog
1710            .update_namespace(&namespace_ident, props)
1711            .await
1712            .unwrap_err();
1713
1714        assert_eq!(
1715            err.message(),
1716            format!("No such namespace: {namespace_ident:?}")
1717        );
1718    }
1719
1720    #[tokio::test]
1721    async fn test_update_namespace_errors_if_nested_namespace_doesnt_exist() {
1722        let warehouse_loc = temp_path();
1723        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1724        let namespace_ident = NamespaceIdent::from_strs(["a", "b"]).unwrap();
1725
1726        let props = HashMap::from_iter([
1727            ("prop1".to_string(), "val1".to_string()),
1728            ("prop2".into(), "val2".into()),
1729        ]);
1730
1731        let err = catalog
1732            .update_namespace(&namespace_ident, props)
1733            .await
1734            .unwrap_err();
1735
1736        assert_eq!(
1737            err.message(),
1738            format!("No such namespace: {namespace_ident:?}")
1739        );
1740    }
1741
1742    #[tokio::test]
1743    async fn test_drop_namespace() {
1744        let warehouse_loc = temp_path();
1745        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1746        let namespace_ident = NamespaceIdent::new("abc".into());
1747        create_namespace(&catalog, &namespace_ident).await;
1748
1749        catalog.drop_namespace(&namespace_ident).await.unwrap();
1750
1751        assert!(!catalog.namespace_exists(&namespace_ident).await.unwrap())
1752    }
1753
1754    #[tokio::test]
1755    async fn test_drop_nested_namespace() {
1756        let warehouse_loc = temp_path();
1757        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1758        let namespace_ident_a = NamespaceIdent::new("a".into());
1759        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1760        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1761
1762        catalog.drop_namespace(&namespace_ident_a_b).await.unwrap();
1763
1764        assert!(
1765            !catalog
1766                .namespace_exists(&namespace_ident_a_b)
1767                .await
1768                .unwrap()
1769        );
1770
1771        assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1772    }
1773
1774    #[tokio::test]
1775    async fn test_drop_deeply_nested_namespace() {
1776        let warehouse_loc = temp_path();
1777        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1778        let namespace_ident_a = NamespaceIdent::new("a".into());
1779        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1780        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1781        create_namespaces(&catalog, &vec![
1782            &namespace_ident_a,
1783            &namespace_ident_a_b,
1784            &namespace_ident_a_b_c,
1785        ])
1786        .await;
1787
1788        catalog
1789            .drop_namespace(&namespace_ident_a_b_c)
1790            .await
1791            .unwrap();
1792
1793        assert!(
1794            !catalog
1795                .namespace_exists(&namespace_ident_a_b_c)
1796                .await
1797                .unwrap()
1798        );
1799
1800        assert!(
1801            catalog
1802                .namespace_exists(&namespace_ident_a_b)
1803                .await
1804                .unwrap()
1805        );
1806
1807        assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1808    }
1809
1810    #[tokio::test]
1811    async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() {
1812        let warehouse_loc = temp_path();
1813        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1814
1815        let non_existent_namespace_ident = NamespaceIdent::new("abc".into());
1816        assert_eq!(
1817            catalog
1818                .drop_namespace(&non_existent_namespace_ident)
1819                .await
1820                .unwrap_err()
1821                .to_string(),
1822            format!("Unexpected => No such namespace: {non_existent_namespace_ident:?}")
1823        )
1824    }
1825
1826    #[tokio::test]
1827    async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() {
1828        let warehouse_loc = temp_path();
1829        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1830        create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
1831
1832        let non_existent_namespace_ident =
1833            NamespaceIdent::from_vec(vec!["a".into(), "b".into()]).unwrap();
1834        assert_eq!(
1835            catalog
1836                .drop_namespace(&non_existent_namespace_ident)
1837                .await
1838                .unwrap_err()
1839                .to_string(),
1840            format!("Unexpected => No such namespace: {non_existent_namespace_ident:?}")
1841        )
1842    }
1843
1844    #[tokio::test]
1845    async fn test_dropping_a_namespace_does_not_drop_namespaces_nested_under_that_one() {
1846        let warehouse_loc = temp_path();
1847        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1848        let namespace_ident_a = NamespaceIdent::new("a".into());
1849        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1850        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1851
1852        catalog.drop_namespace(&namespace_ident_a).await.unwrap();
1853
1854        assert!(!catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1855
1856        assert!(
1857            catalog
1858                .namespace_exists(&namespace_ident_a_b)
1859                .await
1860                .unwrap()
1861        );
1862    }
1863
1864    #[tokio::test]
1865    async fn test_list_tables_returns_empty_vector() {
1866        let warehouse_loc = temp_path();
1867        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1868        let namespace_ident = NamespaceIdent::new("a".into());
1869        create_namespace(&catalog, &namespace_ident).await;
1870
1871        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![]);
1872    }
1873
1874    #[tokio::test]
1875    async fn test_list_tables_throws_error_if_namespace_doesnt_exist() {
1876        let warehouse_loc = temp_path();
1877        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1878
1879        let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1880
1881        assert_eq!(
1882            catalog
1883                .list_tables(&non_existent_namespace_ident)
1884                .await
1885                .unwrap_err()
1886                .to_string(),
1887            format!("Unexpected => No such namespace: {non_existent_namespace_ident:?}"),
1888        );
1889    }
1890
1891    #[tokio::test]
1892    async fn test_create_table_with_location() {
1893        let warehouse_loc = temp_path();
1894        let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1895        let namespace_ident = NamespaceIdent::new("a".into());
1896        create_namespace(&catalog, &namespace_ident).await;
1897
1898        let table_name = "abc";
1899        let location = warehouse_loc.clone();
1900        let table_creation = TableCreation::builder()
1901            .name(table_name.into())
1902            .location(location.clone())
1903            .schema(simple_table_schema())
1904            .build();
1905
1906        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1907
1908        assert_table_eq(
1909            &catalog
1910                .create_table(&namespace_ident, table_creation)
1911                .await
1912                .unwrap(),
1913            &expected_table_ident,
1914            &simple_table_schema(),
1915        );
1916
1917        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1918
1919        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1920
1921        assert!(
1922            table
1923                .metadata_location()
1924                .unwrap()
1925                .to_string()
1926                .starts_with(&location)
1927        )
1928    }
1929
1930    #[tokio::test]
1931    async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() {
1932        let warehouse_loc = temp_path();
1933        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1934
1935        let namespace_ident = NamespaceIdent::new("a".into());
1936        let mut namespace_properties = HashMap::new();
1937        let namespace_location = temp_path();
1938        namespace_properties.insert(
1939            NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1940            namespace_location.to_string(),
1941        );
1942        catalog
1943            .create_namespace(&namespace_ident, namespace_properties)
1944            .await
1945            .unwrap();
1946
1947        let table_name = "tbl1";
1948        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1949        let expected_table_metadata_location_regex =
1950            format!("^{namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",);
1951
1952        let table = catalog
1953            .create_table(
1954                &namespace_ident,
1955                TableCreation::builder()
1956                    .name(table_name.into())
1957                    .schema(simple_table_schema())
1958                    // no location specified for table
1959                    .build(),
1960            )
1961            .await
1962            .unwrap();
1963        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1964        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1965
1966        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1967        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1968        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1969    }
1970
1971    #[tokio::test]
1972    async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing()
1973     {
1974        let warehouse_loc = temp_path();
1975        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1976
1977        let namespace_ident = NamespaceIdent::new("a".into());
1978        let mut namespace_properties = HashMap::new();
1979        let namespace_location = temp_path();
1980        namespace_properties.insert(
1981            NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1982            namespace_location.to_string(),
1983        );
1984        catalog
1985            .create_namespace(&namespace_ident, namespace_properties)
1986            .await
1987            .unwrap();
1988
1989        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1990        let mut nested_namespace_properties = HashMap::new();
1991        let nested_namespace_location = temp_path();
1992        nested_namespace_properties.insert(
1993            NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1994            nested_namespace_location.to_string(),
1995        );
1996        catalog
1997            .create_namespace(&nested_namespace_ident, nested_namespace_properties)
1998            .await
1999            .unwrap();
2000
2001        let table_name = "tbl1";
2002        let expected_table_ident =
2003            TableIdent::new(nested_namespace_ident.clone(), table_name.into());
2004        let expected_table_metadata_location_regex = format!(
2005            "^{nested_namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",
2006        );
2007
2008        let table = catalog
2009            .create_table(
2010                &nested_namespace_ident,
2011                TableCreation::builder()
2012                    .name(table_name.into())
2013                    .schema(simple_table_schema())
2014                    // no location specified for table
2015                    .build(),
2016            )
2017            .await
2018            .unwrap();
2019        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
2020        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
2021
2022        let table = catalog.load_table(&expected_table_ident).await.unwrap();
2023        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
2024        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
2025    }
2026
2027    #[tokio::test]
2028    async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
2029     {
2030        let warehouse_loc = temp_path();
2031        let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
2032
2033        let namespace_ident = NamespaceIdent::new("a".into());
2034        // note: no location specified in namespace_properties
2035        let namespace_properties = HashMap::new();
2036        catalog
2037            .create_namespace(&namespace_ident, namespace_properties)
2038            .await
2039            .unwrap();
2040
2041        let table_name = "tbl1";
2042        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2043        let expected_table_metadata_location_regex =
2044            format!("^{warehouse_loc}/a/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
2045
2046        let table = catalog
2047            .create_table(
2048                &namespace_ident,
2049                TableCreation::builder()
2050                    .name(table_name.into())
2051                    .schema(simple_table_schema())
2052                    // no location specified for table
2053                    .build(),
2054            )
2055            .await
2056            .unwrap();
2057        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
2058        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
2059
2060        let table = catalog.load_table(&expected_table_ident).await.unwrap();
2061        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
2062        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
2063    }
2064
2065    #[tokio::test]
2066    async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
2067     {
2068        let warehouse_loc = temp_path();
2069        let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
2070
2071        let namespace_ident = NamespaceIdent::new("a".into());
2072        create_namespace(&catalog, &namespace_ident).await;
2073
2074        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
2075        create_namespace(&catalog, &nested_namespace_ident).await;
2076
2077        let table_name = "tbl1";
2078        let expected_table_ident =
2079            TableIdent::new(nested_namespace_ident.clone(), table_name.into());
2080        let expected_table_metadata_location_regex =
2081            format!("^{warehouse_loc}/a/b/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
2082
2083        let table = catalog
2084            .create_table(
2085                &nested_namespace_ident,
2086                TableCreation::builder()
2087                    .name(table_name.into())
2088                    .schema(simple_table_schema())
2089                    // no location specified for table
2090                    .build(),
2091            )
2092            .await
2093            .unwrap();
2094        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
2095        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
2096
2097        let table = catalog.load_table(&expected_table_ident).await.unwrap();
2098        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
2099        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
2100    }
2101
2102    #[tokio::test]
2103    async fn test_create_table_throws_error_if_table_with_same_name_already_exists() {
2104        let warehouse_loc = temp_path();
2105        let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
2106        let namespace_ident = NamespaceIdent::new("a".into());
2107        create_namespace(&catalog, &namespace_ident).await;
2108        let table_name = "tbl1";
2109        let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2110        create_table(&catalog, &table_ident).await;
2111
2112        let tmp_dir = TempDir::new().unwrap();
2113        let location = tmp_dir.path().to_str().unwrap().to_string();
2114
2115        assert_eq!(
2116            catalog
2117                .create_table(
2118                    &namespace_ident,
2119                    TableCreation::builder()
2120                        .name(table_name.into())
2121                        .schema(simple_table_schema())
2122                        .location(location)
2123                        .build()
2124                )
2125                .await
2126                .unwrap_err()
2127                .to_string(),
2128            format!("Unexpected => Table {:?} already exists.", &table_ident)
2129        );
2130    }
2131
2132    #[tokio::test]
2133    async fn test_rename_table_in_same_namespace() {
2134        let warehouse_loc = temp_path();
2135        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2136        let namespace_ident = NamespaceIdent::new("n1".into());
2137        create_namespace(&catalog, &namespace_ident).await;
2138        let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
2139        let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
2140        create_table(&catalog, &src_table_ident).await;
2141
2142        catalog
2143            .rename_table(&src_table_ident, &dst_table_ident)
2144            .await
2145            .unwrap();
2146
2147        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
2148            dst_table_ident
2149        ],);
2150    }
2151
2152    #[tokio::test]
2153    async fn test_rename_table_across_namespaces() {
2154        let warehouse_loc = temp_path();
2155        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2156        let src_namespace_ident = NamespaceIdent::new("a".into());
2157        let dst_namespace_ident = NamespaceIdent::new("b".into());
2158        create_namespaces(&catalog, &vec![&src_namespace_ident, &dst_namespace_ident]).await;
2159        let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
2160        let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl2".into());
2161        create_table(&catalog, &src_table_ident).await;
2162
2163        catalog
2164            .rename_table(&src_table_ident, &dst_table_ident)
2165            .await
2166            .unwrap();
2167
2168        assert_eq!(
2169            catalog.list_tables(&src_namespace_ident).await.unwrap(),
2170            vec![],
2171        );
2172
2173        assert_eq!(
2174            catalog.list_tables(&dst_namespace_ident).await.unwrap(),
2175            vec![dst_table_ident],
2176        );
2177    }
2178
2179    #[tokio::test]
2180    async fn test_rename_table_src_table_is_same_as_dst_table() {
2181        let warehouse_loc = temp_path();
2182        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2183        let namespace_ident = NamespaceIdent::new("n1".into());
2184        create_namespace(&catalog, &namespace_ident).await;
2185        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into());
2186        create_table(&catalog, &table_ident).await;
2187
2188        catalog
2189            .rename_table(&table_ident, &table_ident)
2190            .await
2191            .unwrap();
2192
2193        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
2194            table_ident
2195        ],);
2196    }
2197
2198    #[tokio::test]
2199    async fn test_rename_table_across_nested_namespaces() {
2200        let warehouse_loc = temp_path();
2201        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2202        let namespace_ident_a = NamespaceIdent::new("a".into());
2203        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
2204        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
2205        create_namespaces(&catalog, &vec![
2206            &namespace_ident_a,
2207            &namespace_ident_a_b,
2208            &namespace_ident_a_b_c,
2209        ])
2210        .await;
2211
2212        let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(), "tbl1".into());
2213        create_tables(&catalog, vec![&src_table_ident]).await;
2214
2215        let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
2216        catalog
2217            .rename_table(&src_table_ident, &dst_table_ident)
2218            .await
2219            .unwrap();
2220
2221        assert!(!catalog.table_exists(&src_table_ident).await.unwrap());
2222
2223        assert!(catalog.table_exists(&dst_table_ident).await.unwrap());
2224    }
2225
2226    #[tokio::test]
2227    async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() {
2228        let warehouse_loc = temp_path();
2229        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2230        let src_namespace_ident = NamespaceIdent::new("n1".into());
2231        let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
2232        create_namespace(&catalog, &src_namespace_ident).await;
2233        create_table(&catalog, &src_table_ident).await;
2234
2235        let non_existent_dst_namespace_ident = NamespaceIdent::new("n2".into());
2236        let dst_table_ident =
2237            TableIdent::new(non_existent_dst_namespace_ident.clone(), "tbl1".into());
2238        assert_eq!(
2239            catalog
2240                .rename_table(&src_table_ident, &dst_table_ident)
2241                .await
2242                .unwrap_err()
2243                .to_string(),
2244            format!("Unexpected => No such namespace: {non_existent_dst_namespace_ident:?}"),
2245        );
2246    }
2247
2248    #[tokio::test]
2249    async fn test_rename_table_throws_error_if_src_table_doesnt_exist() {
2250        let warehouse_loc = temp_path();
2251        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2252        let namespace_ident = NamespaceIdent::new("n1".into());
2253        create_namespace(&catalog, &namespace_ident).await;
2254        let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
2255        let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
2256
2257        assert_eq!(
2258            catalog
2259                .rename_table(&src_table_ident, &dst_table_ident)
2260                .await
2261                .unwrap_err()
2262                .to_string(),
2263            format!("Unexpected => No such table: {src_table_ident:?}"),
2264        );
2265    }
2266
2267    #[tokio::test]
2268    async fn test_rename_table_throws_error_if_dst_table_already_exists() {
2269        let warehouse_loc = temp_path();
2270        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2271        let namespace_ident = NamespaceIdent::new("n1".into());
2272        create_namespace(&catalog, &namespace_ident).await;
2273        let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
2274        let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
2275        create_tables(&catalog, vec![&src_table_ident, &dst_table_ident]).await;
2276
2277        assert_eq!(
2278            catalog
2279                .rename_table(&src_table_ident, &dst_table_ident)
2280                .await
2281                .unwrap_err()
2282                .to_string(),
2283            format!("Unexpected => Table {:?} already exists.", &dst_table_ident),
2284        );
2285    }
2286
2287    #[tokio::test]
2288    async fn test_drop_table_throws_error_if_table_not_exist() {
2289        let warehouse_loc = temp_path();
2290        let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
2291        let namespace_ident = NamespaceIdent::new("a".into());
2292        let table_name = "tbl1";
2293        let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2294        create_namespace(&catalog, &namespace_ident).await;
2295
2296        let err = catalog
2297            .drop_table(&table_ident)
2298            .await
2299            .unwrap_err()
2300            .to_string();
2301        assert_eq!(
2302            err,
2303            "Unexpected => No such table: TableIdent { namespace: NamespaceIdent([\"a\"]), name: \"tbl1\" }"
2304        );
2305    }
2306
2307    #[tokio::test]
2308    async fn test_drop_table() {
2309        let warehouse_loc = temp_path();
2310        let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
2311        let namespace_ident = NamespaceIdent::new("a".into());
2312        let table_name = "tbl1";
2313        let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2314        create_namespace(&catalog, &namespace_ident).await;
2315
2316        let location = warehouse_loc.clone();
2317        let table_creation = TableCreation::builder()
2318            .name(table_name.into())
2319            .location(location.clone())
2320            .schema(simple_table_schema())
2321            .build();
2322
2323        catalog
2324            .create_table(&namespace_ident, table_creation)
2325            .await
2326            .unwrap();
2327
2328        let table = catalog.load_table(&table_ident).await.unwrap();
2329        assert_table_eq(&table, &table_ident, &simple_table_schema());
2330
2331        catalog.drop_table(&table_ident).await.unwrap();
2332        let err = catalog
2333            .load_table(&table_ident)
2334            .await
2335            .unwrap_err()
2336            .to_string();
2337        assert_eq!(
2338            err,
2339            "Unexpected => No such table: TableIdent { namespace: NamespaceIdent([\"a\"]), name: \"tbl1\" }"
2340        );
2341    }
2342
2343    #[tokio::test]
2344    async fn test_register_table_throws_error_if_table_with_same_name_already_exists() {
2345        let warehouse_loc = temp_path();
2346        let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
2347        let namespace_ident = NamespaceIdent::new("a".into());
2348        create_namespace(&catalog, &namespace_ident).await;
2349        let table_name = "tbl1";
2350        let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2351        create_table(&catalog, &table_ident).await;
2352
2353        assert_eq!(
2354            catalog
2355                .register_table(&table_ident, warehouse_loc)
2356                .await
2357                .unwrap_err()
2358                .to_string(),
2359            format!("Unexpected => Table {:?} already exists.", &table_ident)
2360        );
2361    }
2362
2363    #[tokio::test]
2364    async fn test_register_table() {
2365        let warehouse_loc = temp_path();
2366        let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
2367        let namespace_ident = NamespaceIdent::new("a".into());
2368        create_namespace(&catalog, &namespace_ident).await;
2369
2370        let table_name = "abc";
2371        let location = warehouse_loc.clone();
2372        let table_creation = TableCreation::builder()
2373            .name(table_name.into())
2374            .location(location.clone())
2375            .schema(simple_table_schema())
2376            .build();
2377
2378        let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2379        let expected_table = catalog
2380            .create_table(&namespace_ident, table_creation)
2381            .await
2382            .unwrap();
2383
2384        let metadata_location = expected_table
2385            .metadata_location()
2386            .expect("Expected metadata location to be set")
2387            .to_string();
2388
2389        assert_table_eq(&expected_table, &table_ident, &simple_table_schema());
2390
2391        let _ = catalog.drop_table(&table_ident).await;
2392
2393        let table = catalog
2394            .register_table(&table_ident, metadata_location.clone())
2395            .await
2396            .unwrap();
2397
2398        assert_eq!(table.identifier(), expected_table.identifier());
2399        assert_eq!(table.metadata_location(), Some(metadata_location.as_str()));
2400    }
2401
2402    #[tokio::test]
2403    async fn test_update_table() {
2404        let warehouse_loc = temp_path();
2405        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2406
2407        // Create a test namespace and table
2408        let namespace_ident = NamespaceIdent::new("ns1".into());
2409        create_namespace(&catalog, &namespace_ident).await;
2410        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
2411        create_table(&catalog, &table_ident).await;
2412
2413        let table = catalog.load_table(&table_ident).await.unwrap();
2414
2415        // Store the original metadata location for comparison
2416        let original_metadata_location = table.metadata_location().unwrap().to_string();
2417
2418        // Create a transaction to update the table
2419        let tx = Transaction::new(&table);
2420        let tx = tx
2421            .update_table_properties()
2422            .set("test_property".to_string(), "test_value".to_string())
2423            .apply(tx)
2424            .unwrap();
2425
2426        // Commit the transaction to the catalog
2427        let updated_table = tx.commit(&catalog).await.unwrap();
2428
2429        // Verify the update was successful
2430        assert_eq!(
2431            updated_table.metadata().properties().get("test_property"),
2432            Some(&"test_value".to_string())
2433        );
2434        // Verify the metadata location has been updated
2435        assert_ne!(
2436            updated_table.metadata_location().unwrap(),
2437            original_metadata_location.as_str()
2438        );
2439
2440        // Load the table again from the catalog to verify changes were persisted
2441        let reloaded = catalog.load_table(&table_ident).await.unwrap();
2442
2443        // Verify the reloaded table matches the updated table
2444        assert_eq!(
2445            reloaded.metadata().properties().get("test_property"),
2446            Some(&"test_value".to_string())
2447        );
2448        assert_eq!(
2449            reloaded.metadata_location(),
2450            updated_table.metadata_location()
2451        );
2452    }
2453}