iceberg_catalog_sql/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::{HashMap, HashSet};
19use std::time::Duration;
20
21use async_trait::async_trait;
22use iceberg::io::FileIO;
23use iceberg::spec::{TableMetadata, TableMetadataBuilder};
24use iceberg::table::Table;
25use iceberg::{
26    Catalog, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, TableCommit,
27    TableCreation, TableIdent,
28};
29use sqlx::any::{AnyPoolOptions, AnyQueryResult, AnyRow, install_default_drivers};
30use sqlx::{Any, AnyPool, Row, Transaction};
31use typed_builder::TypedBuilder;
32
33use crate::error::{
34    from_sqlx_error, no_such_namespace_err, no_such_table_err, table_already_exists_err,
35};
36
37static CATALOG_TABLE_NAME: &str = "iceberg_tables";
38static CATALOG_FIELD_CATALOG_NAME: &str = "catalog_name";
39static CATALOG_FIELD_TABLE_NAME: &str = "table_name";
40static CATALOG_FIELD_TABLE_NAMESPACE: &str = "table_namespace";
41static CATALOG_FIELD_METADATA_LOCATION_PROP: &str = "metadata_location";
42static CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location";
43static CATALOG_FIELD_RECORD_TYPE: &str = "iceberg_type";
44static CATALOG_FIELD_TABLE_RECORD_TYPE: &str = "TABLE";
45
46static NAMESPACE_TABLE_NAME: &str = "iceberg_namespace_properties";
47static NAMESPACE_FIELD_NAME: &str = "namespace";
48static NAMESPACE_FIELD_PROPERTY_KEY: &str = "property_key";
49static NAMESPACE_FIELD_PROPERTY_VALUE: &str = "property_value";
50
51static NAMESPACE_LOCATION_PROPERTY_KEY: &str = "location";
52
53static MAX_CONNECTIONS: u32 = 10; // Default the SQL pool to 10 connections if not provided
54static IDLE_TIMEOUT: u64 = 10; // Default the maximum idle timeout per connection to 10s before it is closed
55static TEST_BEFORE_ACQUIRE: bool = true; // Default the health-check of each connection to enabled prior to returning
56
57/// A struct representing the SQL catalog configuration.
58///
59/// This struct contains various parameters that are used to configure a SQL catalog,
60/// such as the database URI, warehouse location, and file I/O settings.
61/// You are required to provide a `SqlBindStyle`, which determines how SQL statements will be bound to values in the catalog.
62/// The options available for this parameter include:
63/// - `SqlBindStyle::DollarNumeric`: Binds SQL statements using `$1`, `$2`, etc., as placeholders. This is for PostgreSQL databases.
64/// - `SqlBindStyle::QuestionMark`: Binds SQL statements using `?` as a placeholder. This is for MySQL and SQLite databases.
65#[derive(Debug, TypedBuilder)]
66pub struct SqlCatalogConfig {
67    uri: String,
68    name: String,
69    warehouse_location: String,
70    file_io: FileIO,
71    sql_bind_style: SqlBindStyle,
72    #[builder(default)]
73    props: HashMap<String, String>,
74}
75
76#[derive(Debug)]
77/// Sql catalog implementation.
78pub struct SqlCatalog {
79    name: String,
80    connection: AnyPool,
81    warehouse_location: String,
82    fileio: FileIO,
83    sql_bind_style: SqlBindStyle,
84}
85
86#[derive(Debug, PartialEq)]
87/// Set the SQL parameter bind style to either $1..$N (Postgres style) or ? (SQLite/MySQL/MariaDB)
88pub enum SqlBindStyle {
89    /// DollarNumeric uses parameters of the form `$1..$N``, which is the Postgres style
90    DollarNumeric,
91    /// QMark uses parameters of the form `?` which is the style for other dialects (SQLite/MySQL/MariaDB)
92    QMark,
93}
94
95impl SqlCatalog {
96    /// Create new sql catalog instance
97    pub async fn new(config: SqlCatalogConfig) -> Result<Self> {
98        install_default_drivers();
99        let max_connections: u32 = config
100            .props
101            .get("pool.max-connections")
102            .map(|v| v.parse().unwrap())
103            .unwrap_or(MAX_CONNECTIONS);
104        let idle_timeout: u64 = config
105            .props
106            .get("pool.idle-timeout")
107            .map(|v| v.parse().unwrap())
108            .unwrap_or(IDLE_TIMEOUT);
109        let test_before_acquire: bool = config
110            .props
111            .get("pool.test-before-acquire")
112            .map(|v| v.parse().unwrap())
113            .unwrap_or(TEST_BEFORE_ACQUIRE);
114
115        let pool = AnyPoolOptions::new()
116            .max_connections(max_connections)
117            .idle_timeout(Duration::from_secs(idle_timeout))
118            .test_before_acquire(test_before_acquire)
119            .connect(&config.uri)
120            .await
121            .map_err(from_sqlx_error)?;
122
123        sqlx::query(&format!(
124            "CREATE TABLE IF NOT EXISTS {CATALOG_TABLE_NAME} (
125                {CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL,
126                {CATALOG_FIELD_TABLE_NAMESPACE} VARCHAR(255) NOT NULL,
127                {CATALOG_FIELD_TABLE_NAME} VARCHAR(255) NOT NULL,
128                {CATALOG_FIELD_METADATA_LOCATION_PROP} VARCHAR(1000),
129                {CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} VARCHAR(1000),
130                {CATALOG_FIELD_RECORD_TYPE} VARCHAR(5),
131                PRIMARY KEY ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}))"
132        ))
133        .execute(&pool)
134        .await
135        .map_err(from_sqlx_error)?;
136
137        sqlx::query(&format!(
138            "CREATE TABLE IF NOT EXISTS {NAMESPACE_TABLE_NAME} (
139                {CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL,
140                {NAMESPACE_FIELD_NAME} VARCHAR(255) NOT NULL,
141                {NAMESPACE_FIELD_PROPERTY_KEY} VARCHAR(255),
142                {NAMESPACE_FIELD_PROPERTY_VALUE} VARCHAR(1000),
143                PRIMARY KEY ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}))"
144        ))
145        .execute(&pool)
146        .await
147        .map_err(from_sqlx_error)?;
148
149        Ok(SqlCatalog {
150            name: config.name.to_owned(),
151            connection: pool,
152            warehouse_location: config.warehouse_location,
153            fileio: config.file_io,
154            sql_bind_style: config.sql_bind_style,
155        })
156    }
157
158    /// SQLX Any does not implement PostgresSQL bindings, so we have to do this.
159    fn replace_placeholders(&self, query: &str) -> String {
160        match self.sql_bind_style {
161            SqlBindStyle::DollarNumeric => {
162                let mut count = 1;
163                query
164                    .chars()
165                    .fold(String::with_capacity(query.len()), |mut acc, c| {
166                        if c == '?' {
167                            acc.push('$');
168                            acc.push_str(&count.to_string());
169                            count += 1;
170                        } else {
171                            acc.push(c);
172                        }
173                        acc
174                    })
175            }
176            _ => query.to_owned(),
177        }
178    }
179
180    /// Fetch a vec of AnyRows from a given query
181    async fn fetch_rows(&self, query: &str, args: Vec<Option<&str>>) -> Result<Vec<AnyRow>> {
182        let query_with_placeholders = self.replace_placeholders(query);
183
184        let mut sqlx_query = sqlx::query(&query_with_placeholders);
185        for arg in args {
186            sqlx_query = sqlx_query.bind(arg);
187        }
188
189        sqlx_query
190            .fetch_all(&self.connection)
191            .await
192            .map_err(from_sqlx_error)
193    }
194
195    /// Execute statements in a transaction, provided or not
196    async fn execute(
197        &self,
198        query: &str,
199        args: Vec<Option<&str>>,
200        transaction: Option<&mut Transaction<'_, Any>>,
201    ) -> Result<AnyQueryResult> {
202        let query_with_placeholders = self.replace_placeholders(query);
203
204        let mut sqlx_query = sqlx::query(&query_with_placeholders);
205        for arg in args {
206            sqlx_query = sqlx_query.bind(arg);
207        }
208
209        match transaction {
210            Some(t) => sqlx_query.execute(&mut **t).await.map_err(from_sqlx_error),
211            None => {
212                let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?;
213                let result = sqlx_query.execute(&mut *tx).await.map_err(from_sqlx_error);
214                let _ = tx.commit().await.map_err(from_sqlx_error);
215                result
216            }
217        }
218    }
219}
220
221#[async_trait]
222impl Catalog for SqlCatalog {
223    async fn list_namespaces(
224        &self,
225        parent: Option<&NamespaceIdent>,
226    ) -> Result<Vec<NamespaceIdent>> {
227        // UNION will remove duplicates.
228        let all_namespaces_stmt = format!(
229            "SELECT {CATALOG_FIELD_TABLE_NAMESPACE}
230             FROM {CATALOG_TABLE_NAME}
231             WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
232             UNION
233             SELECT {NAMESPACE_FIELD_NAME}
234             FROM {NAMESPACE_TABLE_NAME}
235             WHERE {CATALOG_FIELD_CATALOG_NAME} = ?"
236        );
237
238        let namespace_rows = self
239            .fetch_rows(&all_namespaces_stmt, vec![
240                Some(&self.name),
241                Some(&self.name),
242            ])
243            .await?;
244
245        let mut namespaces = HashSet::<NamespaceIdent>::with_capacity(namespace_rows.len());
246
247        if let Some(parent) = parent {
248            if self.namespace_exists(parent).await? {
249                let parent_str = parent.join(".");
250
251                for row in namespace_rows.iter() {
252                    let nsp = row.try_get::<String, _>(0).map_err(from_sqlx_error)?;
253                    // if parent = a, then we only want to see a.b, a.c returned.
254                    if nsp != parent_str && nsp.starts_with(&parent_str) {
255                        namespaces.insert(NamespaceIdent::from_strs(nsp.split("."))?);
256                    }
257                }
258
259                Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
260            } else {
261                no_such_namespace_err(parent)
262            }
263        } else {
264            for row in namespace_rows.iter() {
265                let nsp = row.try_get::<String, _>(0).map_err(from_sqlx_error)?;
266                let mut levels = nsp.split(".").collect::<Vec<&str>>();
267                if !levels.is_empty() {
268                    let first_level = levels.drain(..1).collect::<Vec<&str>>();
269                    namespaces.insert(NamespaceIdent::from_strs(first_level)?);
270                }
271            }
272
273            Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
274        }
275    }
276
277    async fn create_namespace(
278        &self,
279        namespace: &NamespaceIdent,
280        properties: HashMap<String, String>,
281    ) -> Result<Namespace> {
282        let exists = self.namespace_exists(namespace).await?;
283
284        if exists {
285            return Err(Error::new(
286                iceberg::ErrorKind::Unexpected,
287                format!("Namespace {:?} already exists", namespace),
288            ));
289        }
290
291        let namespace_str = namespace.join(".");
292        let insert = format!(
293            "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE})
294             VALUES (?, ?, ?, ?)");
295        if !properties.is_empty() {
296            let mut insert_properties = properties.clone();
297            insert_properties.insert("exists".to_string(), "true".to_string());
298
299            let mut query_args = Vec::with_capacity(insert_properties.len() * 4);
300            let mut insert_stmt = insert.clone();
301            for (index, (key, value)) in insert_properties.iter().enumerate() {
302                query_args.extend_from_slice(&[
303                    Some(self.name.as_str()),
304                    Some(namespace_str.as_str()),
305                    Some(key.as_str()),
306                    Some(value.as_str()),
307                ]);
308                if index > 0 {
309                    insert_stmt.push_str(", (?, ?, ?, ?)");
310                }
311            }
312
313            self.execute(&insert_stmt, query_args, None).await?;
314
315            Ok(Namespace::with_properties(
316                namespace.clone(),
317                insert_properties,
318            ))
319        } else {
320            // set a default property of exists = true
321            self.execute(
322                &insert,
323                vec![
324                    Some(&self.name),
325                    Some(&namespace_str),
326                    Some("exists"),
327                    Some("true"),
328                ],
329                None,
330            )
331            .await?;
332            Ok(Namespace::with_properties(namespace.clone(), properties))
333        }
334    }
335
336    async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
337        let exists = self.namespace_exists(namespace).await?;
338        if exists {
339            let namespace_props = self
340                .fetch_rows(
341                    &format!(
342                        "SELECT
343                            {NAMESPACE_FIELD_NAME},
344                            {NAMESPACE_FIELD_PROPERTY_KEY},
345                            {NAMESPACE_FIELD_PROPERTY_VALUE}
346                            FROM {NAMESPACE_TABLE_NAME}
347                            WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
348                            AND {NAMESPACE_FIELD_NAME} = ?"
349                    ),
350                    vec![Some(&self.name), Some(&namespace.join("."))],
351                )
352                .await?;
353
354            let mut properties = HashMap::with_capacity(namespace_props.len());
355
356            for row in namespace_props {
357                let key = row
358                    .try_get::<String, _>(NAMESPACE_FIELD_PROPERTY_KEY)
359                    .map_err(from_sqlx_error)?;
360                let value = row
361                    .try_get::<String, _>(NAMESPACE_FIELD_PROPERTY_VALUE)
362                    .map_err(from_sqlx_error)?;
363
364                properties.insert(key, value);
365            }
366
367            Ok(Namespace::with_properties(namespace.clone(), properties))
368        } else {
369            no_such_namespace_err(namespace)
370        }
371    }
372
373    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
374        let namespace_str = namespace.join(".");
375
376        let table_namespaces = self
377            .fetch_rows(
378                &format!(
379                    "SELECT 1 FROM {CATALOG_TABLE_NAME}
380                     WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
381                      AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
382                     LIMIT 1"
383                ),
384                vec![Some(&self.name), Some(&namespace_str)],
385            )
386            .await?;
387
388        if !table_namespaces.is_empty() {
389            Ok(true)
390        } else {
391            let namespaces = self
392                .fetch_rows(
393                    &format!(
394                        "SELECT 1 FROM {NAMESPACE_TABLE_NAME}
395                         WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
396                          AND {NAMESPACE_FIELD_NAME} = ?
397                         LIMIT 1"
398                    ),
399                    vec![Some(&self.name), Some(&namespace_str)],
400                )
401                .await?;
402            if !namespaces.is_empty() {
403                Ok(true)
404            } else {
405                Ok(false)
406            }
407        }
408    }
409
410    async fn update_namespace(
411        &self,
412        namespace: &NamespaceIdent,
413        properties: HashMap<String, String>,
414    ) -> Result<()> {
415        let exists = self.namespace_exists(namespace).await?;
416        if exists {
417            let existing_properties = self.get_namespace(namespace).await?.properties().clone();
418            let namespace_str = namespace.join(".");
419
420            let mut updates = vec![];
421            let mut inserts = vec![];
422
423            for (key, value) in properties.iter() {
424                if existing_properties.contains_key(key) {
425                    if existing_properties.get(key) != Some(value) {
426                        updates.push((key, value));
427                    }
428                } else {
429                    inserts.push((key, value));
430                }
431            }
432
433            let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?;
434            let update_stmt = format!(
435                "UPDATE {NAMESPACE_TABLE_NAME} SET {NAMESPACE_FIELD_PROPERTY_VALUE} = ?
436                 WHERE {CATALOG_FIELD_CATALOG_NAME} = ? 
437                 AND {NAMESPACE_FIELD_NAME} = ?
438                 AND {NAMESPACE_FIELD_PROPERTY_KEY} = ?"
439            );
440
441            let insert_stmt = format!(
442                "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE})
443                 VALUES (?, ?, ?, ?)"
444            );
445
446            for (key, value) in updates {
447                self.execute(
448                    &update_stmt,
449                    vec![
450                        Some(value),
451                        Some(&self.name),
452                        Some(&namespace_str),
453                        Some(key),
454                    ],
455                    Some(&mut tx),
456                )
457                .await?;
458            }
459
460            for (key, value) in inserts {
461                self.execute(
462                    &insert_stmt,
463                    vec![
464                        Some(&self.name),
465                        Some(&namespace_str),
466                        Some(key),
467                        Some(value),
468                    ],
469                    Some(&mut tx),
470                )
471                .await?;
472            }
473
474            let _ = tx.commit().await.map_err(from_sqlx_error)?;
475
476            Ok(())
477        } else {
478            no_such_namespace_err(namespace)
479        }
480    }
481
482    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
483        let exists = self.namespace_exists(namespace).await?;
484        if exists {
485            // if there are tables in the namespace, don't allow drop.
486            let tables = self.list_tables(namespace).await?;
487            if !tables.is_empty() {
488                return Err(Error::new(
489                    iceberg::ErrorKind::Unexpected,
490                    format!(
491                        "Namespace {:?} is not empty. {} tables exist.",
492                        namespace,
493                        tables.len()
494                    ),
495                ));
496            }
497
498            self.execute(
499                &format!(
500                    "DELETE FROM {NAMESPACE_TABLE_NAME}
501                     WHERE {NAMESPACE_FIELD_NAME} = ?
502                      AND {CATALOG_FIELD_CATALOG_NAME} = ?"
503                ),
504                vec![Some(&namespace.join(".")), Some(&self.name)],
505                None,
506            )
507            .await?;
508
509            Ok(())
510        } else {
511            no_such_namespace_err(namespace)
512        }
513    }
514
515    async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
516        let exists = self.namespace_exists(namespace).await?;
517        if exists {
518            let rows = self
519                .fetch_rows(
520                    &format!(
521                        "SELECT {CATALOG_FIELD_TABLE_NAME},
522                                {CATALOG_FIELD_TABLE_NAMESPACE}
523                         FROM {CATALOG_TABLE_NAME}
524                         WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
525                          AND {CATALOG_FIELD_CATALOG_NAME} = ?
526                          AND (
527                                {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' 
528                                OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
529                          )",
530                    ),
531                    vec![Some(&namespace.join(".")), Some(&self.name)],
532                )
533                .await?;
534
535            let mut tables = HashSet::<TableIdent>::with_capacity(rows.len());
536
537            for row in rows.iter() {
538                let tbl = row
539                    .try_get::<String, _>(CATALOG_FIELD_TABLE_NAME)
540                    .map_err(from_sqlx_error)?;
541                let ns_strs = row
542                    .try_get::<String, _>(CATALOG_FIELD_TABLE_NAMESPACE)
543                    .map_err(from_sqlx_error)?;
544                let ns = NamespaceIdent::from_strs(ns_strs.split("."))?;
545                tables.insert(TableIdent::new(ns, tbl));
546            }
547
548            Ok(tables.into_iter().collect::<Vec<TableIdent>>())
549        } else {
550            no_such_namespace_err(namespace)
551        }
552    }
553
554    async fn table_exists(&self, identifier: &TableIdent) -> Result<bool> {
555        let namespace = identifier.namespace().join(".");
556        let table_name = identifier.name();
557        let table_counts = self
558            .fetch_rows(
559                &format!(
560                    "SELECT 1
561                     FROM {CATALOG_TABLE_NAME}
562                     WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
563                      AND {CATALOG_FIELD_CATALOG_NAME} = ?
564                      AND {CATALOG_FIELD_TABLE_NAME} = ?
565                      AND (
566                        {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' 
567                        OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
568                      )"
569                ),
570                vec![Some(&namespace), Some(&self.name), Some(table_name)],
571            )
572            .await?;
573
574        if !table_counts.is_empty() {
575            Ok(true)
576        } else {
577            Ok(false)
578        }
579    }
580
581    async fn drop_table(&self, identifier: &TableIdent) -> Result<()> {
582        if !self.table_exists(identifier).await? {
583            return no_such_table_err(identifier);
584        }
585
586        self.execute(
587            &format!(
588                "DELETE FROM {CATALOG_TABLE_NAME}
589                 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
590                  AND {CATALOG_FIELD_TABLE_NAME} = ?
591                  AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
592                  AND (
593                    {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' 
594                    OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
595                  )"
596            ),
597            vec![
598                Some(&self.name),
599                Some(identifier.name()),
600                Some(&identifier.namespace().join(".")),
601            ],
602            None,
603        )
604        .await?;
605
606        Ok(())
607    }
608
609    async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
610        if !self.table_exists(identifier).await? {
611            return no_such_table_err(identifier);
612        }
613
614        let rows = self
615            .fetch_rows(
616                &format!(
617                    "SELECT {CATALOG_FIELD_METADATA_LOCATION_PROP}
618                     FROM {CATALOG_TABLE_NAME}
619                     WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
620                      AND {CATALOG_FIELD_TABLE_NAME} = ?
621                      AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
622                      AND (
623                        {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' 
624                        OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
625                      )"
626                ),
627                vec![
628                    Some(&self.name),
629                    Some(identifier.name()),
630                    Some(&identifier.namespace().join(".")),
631                ],
632            )
633            .await?;
634
635        if rows.is_empty() {
636            return no_such_table_err(identifier);
637        }
638
639        let row = &rows[0];
640        let tbl_metadata_location = row
641            .try_get::<String, _>(CATALOG_FIELD_METADATA_LOCATION_PROP)
642            .map_err(from_sqlx_error)?;
643
644        let metadata = TableMetadata::read_from(&self.fileio, &tbl_metadata_location).await?;
645
646        Ok(Table::builder()
647            .file_io(self.fileio.clone())
648            .identifier(identifier.clone())
649            .metadata_location(tbl_metadata_location)
650            .metadata(metadata)
651            .build()?)
652    }
653
654    async fn create_table(
655        &self,
656        namespace: &NamespaceIdent,
657        creation: TableCreation,
658    ) -> Result<Table> {
659        if !self.namespace_exists(namespace).await? {
660            return no_such_namespace_err(namespace);
661        }
662
663        let tbl_name = creation.name.clone();
664        let tbl_ident = TableIdent::new(namespace.clone(), tbl_name.clone());
665
666        if self.table_exists(&tbl_ident).await? {
667            return table_already_exists_err(&tbl_ident);
668        }
669
670        let (tbl_creation, location) = match creation.location.clone() {
671            Some(location) => (creation, location),
672            None => {
673                // fall back to namespace-specific location
674                // and then to warehouse location
675                let nsp_properties = self.get_namespace(namespace).await?.properties().clone();
676                let nsp_location = match nsp_properties.get(NAMESPACE_LOCATION_PROPERTY_KEY) {
677                    Some(location) => location.clone(),
678                    None => {
679                        format!(
680                            "{}/{}",
681                            self.warehouse_location.clone(),
682                            namespace.join("/")
683                        )
684                    }
685                };
686
687                let tbl_location = format!("{}/{}", nsp_location, tbl_ident.name());
688
689                (
690                    TableCreation {
691                        location: Some(tbl_location.clone()),
692                        ..creation
693                    },
694                    tbl_location,
695                )
696            }
697        };
698
699        let tbl_metadata = TableMetadataBuilder::from_table_creation(tbl_creation)?
700            .build()?
701            .metadata;
702        let tbl_metadata_location =
703            MetadataLocation::new_with_table_location(location.clone()).to_string();
704
705        tbl_metadata
706            .write_to(&self.fileio, &tbl_metadata_location)
707            .await?;
708
709        self.execute(&format!(
710            "INSERT INTO {CATALOG_TABLE_NAME}
711             ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE})
712             VALUES (?, ?, ?, ?, ?)
713            "), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name.clone()), Some(&tbl_metadata_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?;
714
715        Ok(Table::builder()
716            .file_io(self.fileio.clone())
717            .metadata_location(tbl_metadata_location)
718            .identifier(tbl_ident)
719            .metadata(tbl_metadata)
720            .build()?)
721    }
722
723    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
724        if src == dest {
725            return Ok(());
726        }
727
728        if !self.table_exists(src).await? {
729            return no_such_table_err(src);
730        }
731
732        if !self.namespace_exists(dest.namespace()).await? {
733            return no_such_namespace_err(dest.namespace());
734        }
735
736        if self.table_exists(dest).await? {
737            return table_already_exists_err(dest);
738        }
739
740        self.execute(
741            &format!(
742                "UPDATE {CATALOG_TABLE_NAME}
743                 SET {CATALOG_FIELD_TABLE_NAME} = ?, {CATALOG_FIELD_TABLE_NAMESPACE} = ?
744                 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
745                  AND {CATALOG_FIELD_TABLE_NAME} = ?
746                  AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
747                  AND (
748                    {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
749                    OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
750                )"
751            ),
752            vec![
753                Some(dest.name()),
754                Some(&dest.namespace().join(".")),
755                Some(&self.name),
756                Some(src.name()),
757                Some(&src.namespace().join(".")),
758            ],
759            None,
760        )
761        .await?;
762
763        Ok(())
764    }
765
766    async fn register_table(
767        &self,
768        _table_ident: &TableIdent,
769        _metadata_location: String,
770    ) -> Result<Table> {
771        Err(Error::new(
772            ErrorKind::FeatureUnsupported,
773            "Registering a table is not supported yet",
774        ))
775    }
776
777    async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
778        Err(Error::new(
779            ErrorKind::FeatureUnsupported,
780            "Updating a table is not supported yet",
781        ))
782    }
783}
784
785#[cfg(test)]
786mod tests {
787    use std::collections::{HashMap, HashSet};
788    use std::hash::Hash;
789
790    use iceberg::io::FileIOBuilder;
791    use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
792    use iceberg::table::Table;
793    use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation, TableIdent};
794    use itertools::Itertools;
795    use regex::Regex;
796    use sqlx::migrate::MigrateDatabase;
797    use tempfile::TempDir;
798
799    use crate::catalog::NAMESPACE_LOCATION_PROPERTY_KEY;
800    use crate::{SqlBindStyle, SqlCatalog, SqlCatalogConfig};
801
802    const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
803
804    fn temp_path() -> String {
805        let temp_dir = TempDir::new().unwrap();
806        temp_dir.path().to_str().unwrap().to_string()
807    }
808
809    fn to_set<T: std::cmp::Eq + Hash>(vec: Vec<T>) -> HashSet<T> {
810        HashSet::from_iter(vec)
811    }
812
813    fn default_properties() -> HashMap<String, String> {
814        HashMap::from([("exists".to_string(), "true".to_string())])
815    }
816
817    async fn new_sql_catalog(warehouse_location: String) -> impl Catalog {
818        let sql_lite_uri = format!("sqlite:{}", temp_path());
819        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
820
821        let config = SqlCatalogConfig::builder()
822            .uri(sql_lite_uri.to_string())
823            .name("iceberg".to_string())
824            .warehouse_location(warehouse_location)
825            .file_io(FileIOBuilder::new_fs_io().build().unwrap())
826            .sql_bind_style(SqlBindStyle::QMark)
827            .build();
828
829        SqlCatalog::new(config).await.unwrap()
830    }
831
832    async fn create_namespace<C: Catalog>(catalog: &C, namespace_ident: &NamespaceIdent) {
833        let _ = catalog
834            .create_namespace(namespace_ident, HashMap::new())
835            .await
836            .unwrap();
837    }
838
839    async fn create_namespaces<C: Catalog>(catalog: &C, namespace_idents: &Vec<&NamespaceIdent>) {
840        for namespace_ident in namespace_idents {
841            let _ = create_namespace(catalog, namespace_ident).await;
842        }
843    }
844
845    fn simple_table_schema() -> Schema {
846        Schema::builder()
847            .with_fields(vec![
848                NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
849            ])
850            .build()
851            .unwrap()
852    }
853
854    async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) {
855        let _ = catalog
856            .create_table(
857                &table_ident.namespace,
858                TableCreation::builder()
859                    .name(table_ident.name().into())
860                    .schema(simple_table_schema())
861                    .location(temp_path())
862                    .build(),
863            )
864            .await
865            .unwrap();
866    }
867
868    async fn create_tables<C: Catalog>(catalog: &C, table_idents: Vec<&TableIdent>) {
869        for table_ident in table_idents {
870            create_table(catalog, table_ident).await;
871        }
872    }
873
874    fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) {
875        assert_eq!(table.identifier(), expected_table_ident);
876
877        let metadata = table.metadata();
878
879        assert_eq!(metadata.current_schema().as_ref(), expected_schema);
880
881        let expected_partition_spec = PartitionSpec::builder(expected_schema.clone())
882            .with_spec_id(0)
883            .build()
884            .unwrap();
885
886        assert_eq!(
887            metadata
888                .partition_specs_iter()
889                .map(|p| p.as_ref())
890                .collect_vec(),
891            vec![&expected_partition_spec]
892        );
893
894        let expected_sorted_order = SortOrder::builder()
895            .with_order_id(0)
896            .with_fields(vec![])
897            .build(expected_schema)
898            .unwrap();
899
900        assert_eq!(
901            metadata
902                .sort_orders_iter()
903                .map(|s| s.as_ref())
904                .collect_vec(),
905            vec![&expected_sorted_order]
906        );
907
908        assert_eq!(metadata.properties(), &HashMap::new());
909
910        assert!(!table.readonly());
911    }
912
913    fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) {
914        let actual = table.metadata_location().unwrap().to_string();
915        let regex = Regex::new(regex_str).unwrap();
916        assert!(regex.is_match(&actual))
917    }
918
919    #[tokio::test]
920    async fn test_initialized() {
921        let warehouse_loc = temp_path();
922        new_sql_catalog(warehouse_loc.clone()).await;
923        // catalog instantiation should not fail even if tables exist
924        new_sql_catalog(warehouse_loc.clone()).await;
925        new_sql_catalog(warehouse_loc.clone()).await;
926    }
927
928    #[tokio::test]
929    async fn test_list_namespaces_returns_empty_vector() {
930        let warehouse_loc = temp_path();
931        let catalog = new_sql_catalog(warehouse_loc).await;
932
933        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
934    }
935
936    #[tokio::test]
937    async fn test_list_namespaces_returns_multiple_namespaces() {
938        let warehouse_loc = temp_path();
939        let catalog = new_sql_catalog(warehouse_loc).await;
940        let namespace_ident_1 = NamespaceIdent::new("a".into());
941        let namespace_ident_2 = NamespaceIdent::new("b".into());
942        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
943
944        assert_eq!(
945            to_set(catalog.list_namespaces(None).await.unwrap()),
946            to_set(vec![namespace_ident_1, namespace_ident_2])
947        );
948    }
949
950    #[tokio::test]
951    async fn test_list_namespaces_returns_only_top_level_namespaces() {
952        let warehouse_loc = temp_path();
953        let catalog = new_sql_catalog(warehouse_loc).await;
954        let namespace_ident_1 = NamespaceIdent::new("a".into());
955        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
956        let namespace_ident_3 = NamespaceIdent::new("b".into());
957        create_namespaces(&catalog, &vec![
958            &namespace_ident_1,
959            &namespace_ident_2,
960            &namespace_ident_3,
961        ])
962        .await;
963
964        assert_eq!(
965            to_set(catalog.list_namespaces(None).await.unwrap()),
966            to_set(vec![namespace_ident_1, namespace_ident_3])
967        );
968    }
969
970    #[tokio::test]
971    async fn test_list_namespaces_returns_no_namespaces_under_parent() {
972        let warehouse_loc = temp_path();
973        let catalog = new_sql_catalog(warehouse_loc).await;
974        let namespace_ident_1 = NamespaceIdent::new("a".into());
975        let namespace_ident_2 = NamespaceIdent::new("b".into());
976        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
977
978        assert_eq!(
979            catalog
980                .list_namespaces(Some(&namespace_ident_1))
981                .await
982                .unwrap(),
983            vec![]
984        );
985    }
986
987    #[tokio::test]
988    async fn test_list_namespaces_returns_namespace_under_parent() {
989        let warehouse_loc = temp_path();
990        let catalog = new_sql_catalog(warehouse_loc).await;
991        let namespace_ident_1 = NamespaceIdent::new("a".into());
992        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
993        let namespace_ident_3 = NamespaceIdent::new("c".into());
994        create_namespaces(&catalog, &vec![
995            &namespace_ident_1,
996            &namespace_ident_2,
997            &namespace_ident_3,
998        ])
999        .await;
1000
1001        assert_eq!(
1002            to_set(catalog.list_namespaces(None).await.unwrap()),
1003            to_set(vec![namespace_ident_1.clone(), namespace_ident_3])
1004        );
1005
1006        assert_eq!(
1007            catalog
1008                .list_namespaces(Some(&namespace_ident_1))
1009                .await
1010                .unwrap(),
1011            vec![NamespaceIdent::from_strs(vec!["a", "b"]).unwrap()]
1012        );
1013    }
1014
1015    #[tokio::test]
1016    async fn test_list_namespaces_returns_multiple_namespaces_under_parent() {
1017        let warehouse_loc = temp_path();
1018        let catalog = new_sql_catalog(warehouse_loc).await;
1019        let namespace_ident_1 = NamespaceIdent::new("a".to_string());
1020        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap();
1021        let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1022        let namespace_ident_4 = NamespaceIdent::from_strs(vec!["a", "c"]).unwrap();
1023        let namespace_ident_5 = NamespaceIdent::new("b".into());
1024        create_namespaces(&catalog, &vec![
1025            &namespace_ident_1,
1026            &namespace_ident_2,
1027            &namespace_ident_3,
1028            &namespace_ident_4,
1029            &namespace_ident_5,
1030        ])
1031        .await;
1032
1033        assert_eq!(
1034            to_set(
1035                catalog
1036                    .list_namespaces(Some(&namespace_ident_1))
1037                    .await
1038                    .unwrap()
1039            ),
1040            to_set(vec![
1041                NamespaceIdent::from_strs(vec!["a", "a"]).unwrap(),
1042                NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(),
1043                NamespaceIdent::from_strs(vec!["a", "c"]).unwrap(),
1044            ])
1045        );
1046    }
1047
1048    #[tokio::test]
1049    async fn test_namespace_exists_returns_false() {
1050        let warehouse_loc = temp_path();
1051        let catalog = new_sql_catalog(warehouse_loc).await;
1052        let namespace_ident = NamespaceIdent::new("a".into());
1053        create_namespace(&catalog, &namespace_ident).await;
1054
1055        assert!(
1056            !catalog
1057                .namespace_exists(&NamespaceIdent::new("b".into()))
1058                .await
1059                .unwrap()
1060        );
1061    }
1062
1063    #[tokio::test]
1064    async fn test_namespace_exists_returns_true() {
1065        let warehouse_loc = temp_path();
1066        let catalog = new_sql_catalog(warehouse_loc).await;
1067        let namespace_ident = NamespaceIdent::new("a".into());
1068        create_namespace(&catalog, &namespace_ident).await;
1069
1070        assert!(catalog.namespace_exists(&namespace_ident).await.unwrap());
1071    }
1072
1073    #[tokio::test]
1074    async fn test_create_namespace_with_properties() {
1075        let warehouse_loc = temp_path();
1076        let catalog = new_sql_catalog(warehouse_loc).await;
1077        let namespace_ident = NamespaceIdent::new("abc".into());
1078
1079        let mut properties = default_properties();
1080        properties.insert("k".into(), "v".into());
1081
1082        assert_eq!(
1083            catalog
1084                .create_namespace(&namespace_ident, properties.clone())
1085                .await
1086                .unwrap(),
1087            Namespace::with_properties(namespace_ident.clone(), properties.clone())
1088        );
1089
1090        assert_eq!(
1091            catalog.get_namespace(&namespace_ident).await.unwrap(),
1092            Namespace::with_properties(namespace_ident, properties)
1093        );
1094    }
1095
1096    #[tokio::test]
1097    async fn test_create_namespace_throws_error_if_namespace_already_exists() {
1098        let warehouse_loc = temp_path();
1099        let catalog = new_sql_catalog(warehouse_loc).await;
1100        let namespace_ident = NamespaceIdent::new("a".into());
1101        create_namespace(&catalog, &namespace_ident).await;
1102
1103        assert_eq!(
1104            catalog
1105                .create_namespace(&namespace_ident, HashMap::new())
1106                .await
1107                .unwrap_err()
1108                .to_string(),
1109            format!(
1110                "Unexpected => Namespace {:?} already exists",
1111                &namespace_ident
1112            )
1113        );
1114
1115        assert_eq!(
1116            catalog.get_namespace(&namespace_ident).await.unwrap(),
1117            Namespace::with_properties(namespace_ident, default_properties())
1118        );
1119    }
1120
1121    #[tokio::test]
1122    async fn test_create_nested_namespace() {
1123        let warehouse_loc = temp_path();
1124        let catalog = new_sql_catalog(warehouse_loc).await;
1125        let parent_namespace_ident = NamespaceIdent::new("a".into());
1126        create_namespace(&catalog, &parent_namespace_ident).await;
1127
1128        let child_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1129
1130        assert_eq!(
1131            catalog
1132                .create_namespace(&child_namespace_ident, HashMap::new())
1133                .await
1134                .unwrap(),
1135            Namespace::new(child_namespace_ident.clone())
1136        );
1137
1138        assert_eq!(
1139            catalog.get_namespace(&child_namespace_ident).await.unwrap(),
1140            Namespace::with_properties(child_namespace_ident, default_properties())
1141        );
1142    }
1143
1144    #[tokio::test]
1145    async fn test_create_deeply_nested_namespace() {
1146        let warehouse_loc = temp_path();
1147        let catalog = new_sql_catalog(warehouse_loc).await;
1148        let namespace_ident_a = NamespaceIdent::new("a".into());
1149        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1150        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1151
1152        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1153
1154        assert_eq!(
1155            catalog
1156                .create_namespace(&namespace_ident_a_b_c, HashMap::new())
1157                .await
1158                .unwrap(),
1159            Namespace::new(namespace_ident_a_b_c.clone())
1160        );
1161
1162        assert_eq!(
1163            catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
1164            Namespace::with_properties(namespace_ident_a_b_c, default_properties())
1165        );
1166    }
1167
1168    #[tokio::test]
1169    async fn test_update_namespace_noop() {
1170        let warehouse_loc = temp_path();
1171        let catalog = new_sql_catalog(warehouse_loc).await;
1172        let namespace_ident = NamespaceIdent::new("a".into());
1173        create_namespace(&catalog, &namespace_ident).await;
1174
1175        catalog
1176            .update_namespace(&namespace_ident, HashMap::new())
1177            .await
1178            .unwrap();
1179
1180        assert_eq!(
1181            *catalog
1182                .get_namespace(&namespace_ident)
1183                .await
1184                .unwrap()
1185                .properties(),
1186            HashMap::from_iter([("exists".to_string(), "true".to_string())])
1187        )
1188    }
1189
1190    #[tokio::test]
1191    async fn test_update_namespace() {
1192        let warehouse_loc = temp_path();
1193        let catalog = new_sql_catalog(warehouse_loc).await;
1194        let namespace_ident = NamespaceIdent::new("a".into());
1195        create_namespace(&catalog, &namespace_ident).await;
1196
1197        let mut props = HashMap::from_iter([
1198            ("prop1".to_string(), "val1".to_string()),
1199            ("prop2".into(), "val2".into()),
1200        ]);
1201
1202        catalog
1203            .update_namespace(&namespace_ident, props.clone())
1204            .await
1205            .unwrap();
1206
1207        props.insert("exists".into(), "true".into());
1208
1209        assert_eq!(
1210            *catalog
1211                .get_namespace(&namespace_ident)
1212                .await
1213                .unwrap()
1214                .properties(),
1215            props
1216        )
1217    }
1218
1219    #[tokio::test]
1220    async fn test_update_nested_namespace() {
1221        let warehouse_loc = temp_path();
1222        let catalog = new_sql_catalog(warehouse_loc).await;
1223        let namespace_ident = NamespaceIdent::from_strs(["a", "b"]).unwrap();
1224        create_namespace(&catalog, &namespace_ident).await;
1225
1226        let mut props = HashMap::from_iter([
1227            ("prop1".to_string(), "val1".to_string()),
1228            ("prop2".into(), "val2".into()),
1229        ]);
1230
1231        catalog
1232            .update_namespace(&namespace_ident, props.clone())
1233            .await
1234            .unwrap();
1235
1236        props.insert("exists".into(), "true".into());
1237
1238        assert_eq!(
1239            *catalog
1240                .get_namespace(&namespace_ident)
1241                .await
1242                .unwrap()
1243                .properties(),
1244            props
1245        )
1246    }
1247
1248    #[tokio::test]
1249    async fn test_update_namespace_errors_if_namespace_doesnt_exist() {
1250        let warehouse_loc = temp_path();
1251        let catalog = new_sql_catalog(warehouse_loc).await;
1252        let namespace_ident = NamespaceIdent::new("a".into());
1253
1254        let props = HashMap::from_iter([
1255            ("prop1".to_string(), "val1".to_string()),
1256            ("prop2".into(), "val2".into()),
1257        ]);
1258
1259        let err = catalog
1260            .update_namespace(&namespace_ident, props)
1261            .await
1262            .unwrap_err();
1263
1264        assert_eq!(
1265            err.message(),
1266            format!("No such namespace: {:?}", namespace_ident)
1267        );
1268    }
1269
1270    #[tokio::test]
1271    async fn test_update_namespace_errors_if_nested_namespace_doesnt_exist() {
1272        let warehouse_loc = temp_path();
1273        let catalog = new_sql_catalog(warehouse_loc).await;
1274        let namespace_ident = NamespaceIdent::from_strs(["a", "b"]).unwrap();
1275
1276        let props = HashMap::from_iter([
1277            ("prop1".to_string(), "val1".to_string()),
1278            ("prop2".into(), "val2".into()),
1279        ]);
1280
1281        let err = catalog
1282            .update_namespace(&namespace_ident, props)
1283            .await
1284            .unwrap_err();
1285
1286        assert_eq!(
1287            err.message(),
1288            format!("No such namespace: {:?}", namespace_ident)
1289        );
1290    }
1291
1292    #[tokio::test]
1293    async fn test_drop_namespace() {
1294        let warehouse_loc = temp_path();
1295        let catalog = new_sql_catalog(warehouse_loc).await;
1296        let namespace_ident = NamespaceIdent::new("abc".into());
1297        create_namespace(&catalog, &namespace_ident).await;
1298
1299        catalog.drop_namespace(&namespace_ident).await.unwrap();
1300
1301        assert!(!catalog.namespace_exists(&namespace_ident).await.unwrap())
1302    }
1303
1304    #[tokio::test]
1305    async fn test_drop_nested_namespace() {
1306        let warehouse_loc = temp_path();
1307        let catalog = new_sql_catalog(warehouse_loc).await;
1308        let namespace_ident_a = NamespaceIdent::new("a".into());
1309        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1310        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1311
1312        catalog.drop_namespace(&namespace_ident_a_b).await.unwrap();
1313
1314        assert!(
1315            !catalog
1316                .namespace_exists(&namespace_ident_a_b)
1317                .await
1318                .unwrap()
1319        );
1320
1321        assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1322    }
1323
1324    #[tokio::test]
1325    async fn test_drop_deeply_nested_namespace() {
1326        let warehouse_loc = temp_path();
1327        let catalog = new_sql_catalog(warehouse_loc).await;
1328        let namespace_ident_a = NamespaceIdent::new("a".into());
1329        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1330        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1331        create_namespaces(&catalog, &vec![
1332            &namespace_ident_a,
1333            &namespace_ident_a_b,
1334            &namespace_ident_a_b_c,
1335        ])
1336        .await;
1337
1338        catalog
1339            .drop_namespace(&namespace_ident_a_b_c)
1340            .await
1341            .unwrap();
1342
1343        assert!(
1344            !catalog
1345                .namespace_exists(&namespace_ident_a_b_c)
1346                .await
1347                .unwrap()
1348        );
1349
1350        assert!(
1351            catalog
1352                .namespace_exists(&namespace_ident_a_b)
1353                .await
1354                .unwrap()
1355        );
1356
1357        assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1358    }
1359
1360    #[tokio::test]
1361    async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() {
1362        let warehouse_loc = temp_path();
1363        let catalog = new_sql_catalog(warehouse_loc).await;
1364
1365        let non_existent_namespace_ident = NamespaceIdent::new("abc".into());
1366        assert_eq!(
1367            catalog
1368                .drop_namespace(&non_existent_namespace_ident)
1369                .await
1370                .unwrap_err()
1371                .to_string(),
1372            format!(
1373                "Unexpected => No such namespace: {:?}",
1374                non_existent_namespace_ident
1375            )
1376        )
1377    }
1378
1379    #[tokio::test]
1380    async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() {
1381        let warehouse_loc = temp_path();
1382        let catalog = new_sql_catalog(warehouse_loc).await;
1383        create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
1384
1385        let non_existent_namespace_ident =
1386            NamespaceIdent::from_vec(vec!["a".into(), "b".into()]).unwrap();
1387        assert_eq!(
1388            catalog
1389                .drop_namespace(&non_existent_namespace_ident)
1390                .await
1391                .unwrap_err()
1392                .to_string(),
1393            format!(
1394                "Unexpected => No such namespace: {:?}",
1395                non_existent_namespace_ident
1396            )
1397        )
1398    }
1399
1400    #[tokio::test]
1401    async fn test_dropping_a_namespace_does_not_drop_namespaces_nested_under_that_one() {
1402        let warehouse_loc = temp_path();
1403        let catalog = new_sql_catalog(warehouse_loc).await;
1404        let namespace_ident_a = NamespaceIdent::new("a".into());
1405        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1406        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1407
1408        catalog.drop_namespace(&namespace_ident_a).await.unwrap();
1409
1410        assert!(!catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1411
1412        assert!(
1413            catalog
1414                .namespace_exists(&namespace_ident_a_b)
1415                .await
1416                .unwrap()
1417        );
1418    }
1419
1420    #[tokio::test]
1421    async fn test_list_tables_returns_empty_vector() {
1422        let warehouse_loc = temp_path();
1423        let catalog = new_sql_catalog(warehouse_loc).await;
1424        let namespace_ident = NamespaceIdent::new("a".into());
1425        create_namespace(&catalog, &namespace_ident).await;
1426
1427        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![]);
1428    }
1429
1430    #[tokio::test]
1431    async fn test_list_tables_throws_error_if_namespace_doesnt_exist() {
1432        let warehouse_loc = temp_path();
1433        let catalog = new_sql_catalog(warehouse_loc).await;
1434
1435        let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1436
1437        assert_eq!(
1438            catalog
1439                .list_tables(&non_existent_namespace_ident)
1440                .await
1441                .unwrap_err()
1442                .to_string(),
1443            format!(
1444                "Unexpected => No such namespace: {:?}",
1445                non_existent_namespace_ident
1446            ),
1447        );
1448    }
1449
1450    #[tokio::test]
1451    async fn test_create_table_with_location() {
1452        let warehouse_loc = temp_path();
1453        let catalog = new_sql_catalog(warehouse_loc.clone()).await;
1454        let namespace_ident = NamespaceIdent::new("a".into());
1455        create_namespace(&catalog, &namespace_ident).await;
1456
1457        let table_name = "abc";
1458        let location = warehouse_loc.clone();
1459        let table_creation = TableCreation::builder()
1460            .name(table_name.into())
1461            .location(location.clone())
1462            .schema(simple_table_schema())
1463            .build();
1464
1465        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1466
1467        assert_table_eq(
1468            &catalog
1469                .create_table(&namespace_ident, table_creation)
1470                .await
1471                .unwrap(),
1472            &expected_table_ident,
1473            &simple_table_schema(),
1474        );
1475
1476        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1477
1478        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1479
1480        assert!(
1481            table
1482                .metadata_location()
1483                .unwrap()
1484                .to_string()
1485                .starts_with(&location)
1486        )
1487    }
1488
1489    #[tokio::test]
1490    async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() {
1491        let warehouse_loc = temp_path();
1492        let catalog = new_sql_catalog(warehouse_loc).await;
1493
1494        let namespace_ident = NamespaceIdent::new("a".into());
1495        let mut namespace_properties = HashMap::new();
1496        let namespace_location = temp_path();
1497        namespace_properties.insert(
1498            NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1499            namespace_location.to_string(),
1500        );
1501        catalog
1502            .create_namespace(&namespace_ident, namespace_properties)
1503            .await
1504            .unwrap();
1505
1506        let table_name = "tbl1";
1507        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1508        let expected_table_metadata_location_regex = format!(
1509            "^{}/tbl1/metadata/00000-{}.metadata.json$",
1510            namespace_location, UUID_REGEX_STR,
1511        );
1512
1513        let table = catalog
1514            .create_table(
1515                &namespace_ident,
1516                TableCreation::builder()
1517                    .name(table_name.into())
1518                    .schema(simple_table_schema())
1519                    // no location specified for table
1520                    .build(),
1521            )
1522            .await
1523            .unwrap();
1524        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1525        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1526
1527        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1528        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1529        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1530    }
1531
1532    #[tokio::test]
1533    async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing()
1534     {
1535        let warehouse_loc = temp_path();
1536        let catalog = new_sql_catalog(warehouse_loc).await;
1537
1538        let namespace_ident = NamespaceIdent::new("a".into());
1539        let mut namespace_properties = HashMap::new();
1540        let namespace_location = temp_path();
1541        namespace_properties.insert(
1542            NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1543            namespace_location.to_string(),
1544        );
1545        catalog
1546            .create_namespace(&namespace_ident, namespace_properties)
1547            .await
1548            .unwrap();
1549
1550        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1551        let mut nested_namespace_properties = HashMap::new();
1552        let nested_namespace_location = temp_path();
1553        nested_namespace_properties.insert(
1554            NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1555            nested_namespace_location.to_string(),
1556        );
1557        catalog
1558            .create_namespace(&nested_namespace_ident, nested_namespace_properties)
1559            .await
1560            .unwrap();
1561
1562        let table_name = "tbl1";
1563        let expected_table_ident =
1564            TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1565        let expected_table_metadata_location_regex = format!(
1566            "^{}/tbl1/metadata/00000-{}.metadata.json$",
1567            nested_namespace_location, UUID_REGEX_STR,
1568        );
1569
1570        let table = catalog
1571            .create_table(
1572                &nested_namespace_ident,
1573                TableCreation::builder()
1574                    .name(table_name.into())
1575                    .schema(simple_table_schema())
1576                    // no location specified for table
1577                    .build(),
1578            )
1579            .await
1580            .unwrap();
1581        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1582        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1583
1584        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1585        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1586        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1587    }
1588
1589    #[tokio::test]
1590    async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1591     {
1592        let warehouse_loc = temp_path();
1593        let catalog = new_sql_catalog(warehouse_loc.clone()).await;
1594
1595        let namespace_ident = NamespaceIdent::new("a".into());
1596        // note: no location specified in namespace_properties
1597        let namespace_properties = HashMap::new();
1598        catalog
1599            .create_namespace(&namespace_ident, namespace_properties)
1600            .await
1601            .unwrap();
1602
1603        let table_name = "tbl1";
1604        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1605        let expected_table_metadata_location_regex = format!(
1606            "^{}/a/tbl1/metadata/00000-{}.metadata.json$",
1607            warehouse_loc, UUID_REGEX_STR
1608        );
1609
1610        let table = catalog
1611            .create_table(
1612                &namespace_ident,
1613                TableCreation::builder()
1614                    .name(table_name.into())
1615                    .schema(simple_table_schema())
1616                    // no location specified for table
1617                    .build(),
1618            )
1619            .await
1620            .unwrap();
1621        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1622        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1623
1624        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1625        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1626        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1627    }
1628
1629    #[tokio::test]
1630    async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1631     {
1632        let warehouse_loc = temp_path();
1633        let catalog = new_sql_catalog(warehouse_loc.clone()).await;
1634
1635        let namespace_ident = NamespaceIdent::new("a".into());
1636        create_namespace(&catalog, &namespace_ident).await;
1637
1638        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1639        create_namespace(&catalog, &nested_namespace_ident).await;
1640
1641        let table_name = "tbl1";
1642        let expected_table_ident =
1643            TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1644        let expected_table_metadata_location_regex = format!(
1645            "^{}/a/b/tbl1/metadata/00000-{}.metadata.json$",
1646            warehouse_loc, UUID_REGEX_STR
1647        );
1648
1649        let table = catalog
1650            .create_table(
1651                &nested_namespace_ident,
1652                TableCreation::builder()
1653                    .name(table_name.into())
1654                    .schema(simple_table_schema())
1655                    // no location specified for table
1656                    .build(),
1657            )
1658            .await
1659            .unwrap();
1660        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1661        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1662
1663        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1664        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1665        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1666    }
1667
1668    #[tokio::test]
1669    async fn test_create_table_throws_error_if_table_with_same_name_already_exists() {
1670        let warehouse_loc = temp_path();
1671        let catalog = new_sql_catalog(warehouse_loc.clone()).await;
1672        let namespace_ident = NamespaceIdent::new("a".into());
1673        create_namespace(&catalog, &namespace_ident).await;
1674        let table_name = "tbl1";
1675        let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1676        create_table(&catalog, &table_ident).await;
1677
1678        let tmp_dir = TempDir::new().unwrap();
1679        let location = tmp_dir.path().to_str().unwrap().to_string();
1680
1681        assert_eq!(
1682            catalog
1683                .create_table(
1684                    &namespace_ident,
1685                    TableCreation::builder()
1686                        .name(table_name.into())
1687                        .schema(simple_table_schema())
1688                        .location(location)
1689                        .build()
1690                )
1691                .await
1692                .unwrap_err()
1693                .to_string(),
1694            format!("Unexpected => Table {:?} already exists.", &table_ident)
1695        );
1696    }
1697
1698    #[tokio::test]
1699    async fn test_rename_table_in_same_namespace() {
1700        let warehouse_loc = temp_path();
1701        let catalog = new_sql_catalog(warehouse_loc).await;
1702        let namespace_ident = NamespaceIdent::new("n1".into());
1703        create_namespace(&catalog, &namespace_ident).await;
1704        let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1705        let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1706        create_table(&catalog, &src_table_ident).await;
1707
1708        catalog
1709            .rename_table(&src_table_ident, &dst_table_ident)
1710            .await
1711            .unwrap();
1712
1713        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1714            dst_table_ident
1715        ],);
1716    }
1717
1718    #[tokio::test]
1719    async fn test_rename_table_across_namespaces() {
1720        let warehouse_loc = temp_path();
1721        let catalog = new_sql_catalog(warehouse_loc).await;
1722        let src_namespace_ident = NamespaceIdent::new("a".into());
1723        let dst_namespace_ident = NamespaceIdent::new("b".into());
1724        create_namespaces(&catalog, &vec![&src_namespace_ident, &dst_namespace_ident]).await;
1725        let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
1726        let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl2".into());
1727        create_table(&catalog, &src_table_ident).await;
1728
1729        catalog
1730            .rename_table(&src_table_ident, &dst_table_ident)
1731            .await
1732            .unwrap();
1733
1734        assert_eq!(
1735            catalog.list_tables(&src_namespace_ident).await.unwrap(),
1736            vec![],
1737        );
1738
1739        assert_eq!(
1740            catalog.list_tables(&dst_namespace_ident).await.unwrap(),
1741            vec![dst_table_ident],
1742        );
1743    }
1744
1745    #[tokio::test]
1746    async fn test_rename_table_src_table_is_same_as_dst_table() {
1747        let warehouse_loc = temp_path();
1748        let catalog = new_sql_catalog(warehouse_loc).await;
1749        let namespace_ident = NamespaceIdent::new("n1".into());
1750        create_namespace(&catalog, &namespace_ident).await;
1751        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into());
1752        create_table(&catalog, &table_ident).await;
1753
1754        catalog
1755            .rename_table(&table_ident, &table_ident)
1756            .await
1757            .unwrap();
1758
1759        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1760            table_ident
1761        ],);
1762    }
1763
1764    #[tokio::test]
1765    async fn test_rename_table_across_nested_namespaces() {
1766        let warehouse_loc = temp_path();
1767        let catalog = new_sql_catalog(warehouse_loc).await;
1768        let namespace_ident_a = NamespaceIdent::new("a".into());
1769        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1770        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1771        create_namespaces(&catalog, &vec![
1772            &namespace_ident_a,
1773            &namespace_ident_a_b,
1774            &namespace_ident_a_b_c,
1775        ])
1776        .await;
1777
1778        let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(), "tbl1".into());
1779        create_tables(&catalog, vec![&src_table_ident]).await;
1780
1781        let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1782        catalog
1783            .rename_table(&src_table_ident, &dst_table_ident)
1784            .await
1785            .unwrap();
1786
1787        assert!(!catalog.table_exists(&src_table_ident).await.unwrap());
1788
1789        assert!(catalog.table_exists(&dst_table_ident).await.unwrap());
1790    }
1791
1792    #[tokio::test]
1793    async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() {
1794        let warehouse_loc = temp_path();
1795        let catalog = new_sql_catalog(warehouse_loc).await;
1796        let src_namespace_ident = NamespaceIdent::new("n1".into());
1797        let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
1798        create_namespace(&catalog, &src_namespace_ident).await;
1799        create_table(&catalog, &src_table_ident).await;
1800
1801        let non_existent_dst_namespace_ident = NamespaceIdent::new("n2".into());
1802        let dst_table_ident =
1803            TableIdent::new(non_existent_dst_namespace_ident.clone(), "tbl1".into());
1804        assert_eq!(
1805            catalog
1806                .rename_table(&src_table_ident, &dst_table_ident)
1807                .await
1808                .unwrap_err()
1809                .to_string(),
1810            format!(
1811                "Unexpected => No such namespace: {:?}",
1812                non_existent_dst_namespace_ident
1813            ),
1814        );
1815    }
1816
1817    #[tokio::test]
1818    async fn test_rename_table_throws_error_if_src_table_doesnt_exist() {
1819        let warehouse_loc = temp_path();
1820        let catalog = new_sql_catalog(warehouse_loc).await;
1821        let namespace_ident = NamespaceIdent::new("n1".into());
1822        create_namespace(&catalog, &namespace_ident).await;
1823        let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1824        let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1825
1826        assert_eq!(
1827            catalog
1828                .rename_table(&src_table_ident, &dst_table_ident)
1829                .await
1830                .unwrap_err()
1831                .to_string(),
1832            format!("Unexpected => No such table: {:?}", src_table_ident),
1833        );
1834    }
1835
1836    #[tokio::test]
1837    async fn test_rename_table_throws_error_if_dst_table_already_exists() {
1838        let warehouse_loc = temp_path();
1839        let catalog = new_sql_catalog(warehouse_loc).await;
1840        let namespace_ident = NamespaceIdent::new("n1".into());
1841        create_namespace(&catalog, &namespace_ident).await;
1842        let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1843        let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1844        create_tables(&catalog, vec![&src_table_ident, &dst_table_ident]).await;
1845
1846        assert_eq!(
1847            catalog
1848                .rename_table(&src_table_ident, &dst_table_ident)
1849                .await
1850                .unwrap_err()
1851                .to_string(),
1852            format!("Unexpected => Table {:?} already exists.", &dst_table_ident),
1853        );
1854    }
1855
1856    #[tokio::test]
1857    async fn test_drop_table_throws_error_if_table_not_exist() {
1858        let warehouse_loc = temp_path();
1859        let catalog = new_sql_catalog(warehouse_loc.clone()).await;
1860        let namespace_ident = NamespaceIdent::new("a".into());
1861        let table_name = "tbl1";
1862        let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1863        create_namespace(&catalog, &namespace_ident).await;
1864
1865        let err = catalog
1866            .drop_table(&table_ident)
1867            .await
1868            .unwrap_err()
1869            .to_string();
1870        assert_eq!(
1871            err,
1872            "Unexpected => No such table: TableIdent { namespace: NamespaceIdent([\"a\"]), name: \"tbl1\" }"
1873        );
1874    }
1875
1876    #[tokio::test]
1877    async fn test_drop_table() {
1878        let warehouse_loc = temp_path();
1879        let catalog = new_sql_catalog(warehouse_loc.clone()).await;
1880        let namespace_ident = NamespaceIdent::new("a".into());
1881        let table_name = "tbl1";
1882        let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1883        create_namespace(&catalog, &namespace_ident).await;
1884
1885        let location = warehouse_loc.clone();
1886        let table_creation = TableCreation::builder()
1887            .name(table_name.into())
1888            .location(location.clone())
1889            .schema(simple_table_schema())
1890            .build();
1891
1892        catalog
1893            .create_table(&namespace_ident, table_creation)
1894            .await
1895            .unwrap();
1896
1897        let table = catalog.load_table(&table_ident).await.unwrap();
1898        assert_table_eq(&table, &table_ident, &simple_table_schema());
1899
1900        catalog.drop_table(&table_ident).await.unwrap();
1901        let err = catalog
1902            .load_table(&table_ident)
1903            .await
1904            .unwrap_err()
1905            .to_string();
1906        assert_eq!(
1907            err,
1908            "Unexpected => No such table: TableIdent { namespace: NamespaceIdent([\"a\"]), name: \"tbl1\" }"
1909        );
1910    }
1911}