Skip to main content

iceberg_sql_catalog/
lib.rs

1use std::{
2    collections::HashMap,
3    sync::{Arc, RwLock},
4};
5
6use async_trait::async_trait;
7use iceberg_rust::{
8    catalog::{
9        commit::{
10            apply_table_updates, apply_view_updates, check_table_requirements,
11            check_view_requirements, CommitTable, CommitView, TableRequirement,
12        },
13        create::{CreateMaterializedView, CreateTable, CreateView},
14        identifier::Identifier,
15        namespace::Namespace,
16        tabular::Tabular,
17        Catalog, CatalogList,
18    },
19    error::Error as IcebergError,
20    materialized_view::MaterializedView,
21    object_store::{store::IcebergStore, Bucket, ObjectStoreBuilder},
22    spec::{
23        materialized_view_metadata::MaterializedViewMetadata,
24        table_metadata::{new_metadata_location, TableMetadata},
25        tabular::TabularMetadata,
26        util::strip_prefix,
27        view_metadata::ViewMetadata,
28    },
29    table::Table,
30    view::View,
31};
32use object_store::ObjectStoreExt;
33use sqlx::{
34    any::{install_default_drivers, AnyPoolOptions, AnyRow},
35    pool::PoolOptions,
36    AnyPool, Executor, Row,
37};
38
39use crate::error::Error;
40
41#[derive(Debug)]
42pub struct SqlCatalog {
43    name: String,
44    pool: AnyPool,
45    object_store: ObjectStoreBuilder,
46    cache: Arc<RwLock<HashMap<Identifier, (String, TabularMetadata)>>>,
47}
48
49pub mod error;
50
51impl SqlCatalog {
52    pub async fn new(
53        url: &str,
54        name: &str,
55        object_store: ObjectStoreBuilder,
56    ) -> Result<Self, Error> {
57        install_default_drivers();
58
59        let mut pool_options = PoolOptions::new();
60
61        if url == "sqlite://" {
62            pool_options = pool_options.max_connections(1);
63        }
64
65        let pool = AnyPoolOptions::after_connect(pool_options, |connection, _| {
66            Box::pin(async move {
67                connection
68                    .execute(
69                        "create table if not exists iceberg_tables (
70                                catalog_name varchar(255) not null,
71                                table_namespace varchar(255) not null,
72                                table_name varchar(255) not null,
73                                metadata_location varchar(255) not null,
74                                previous_metadata_location varchar(255),
75                                primary key (catalog_name, table_namespace, table_name)
76                            );",
77                    )
78                    .await?;
79                connection
80                    .execute(
81                        "create table if not exists iceberg_namespace_properties (
82                                catalog_name varchar(255) not null,
83                                namespace varchar(255) not null,
84                                property_key varchar(255),
85                                property_value varchar(255),
86                                primary key (catalog_name, namespace, property_key)
87                            );",
88                    )
89                    .await?;
90                Ok(())
91            })
92        })
93        .connect_lazy(url)?;
94
95        Ok(SqlCatalog {
96            name: name.to_owned(),
97            pool,
98            object_store,
99            cache: Arc::new(RwLock::new(HashMap::new())),
100        })
101    }
102
103    pub fn catalog_list(&self) -> Arc<SqlCatalogList> {
104        Arc::new(SqlCatalogList {
105            pool: self.pool.clone(),
106            object_store: self.object_store.clone(),
107        })
108    }
109    fn default_object_store(&self, bucket: Bucket) -> Arc<dyn object_store::ObjectStore> {
110        Arc::new(self.object_store.build(bucket).unwrap())
111    }
112}
113
114#[derive(Debug)]
115struct TableRef {
116    table_namespace: String,
117    table_name: String,
118    metadata_location: String,
119    _previous_metadata_location: Option<String>,
120}
121
122fn query_map(row: &AnyRow) -> Result<TableRef, sqlx::Error> {
123    Ok(TableRef {
124        table_namespace: row.try_get(0)?,
125        table_name: row.try_get(1)?,
126        metadata_location: row.try_get(2)?,
127        _previous_metadata_location: row.try_get::<String, _>(3).map(Some).or_else(|err| {
128            if let sqlx::Error::ColumnDecode {
129                index: _,
130                source: _,
131            } = err
132            {
133                Ok(None)
134            } else {
135                Err(err)
136            }
137        })?,
138    })
139}
140
141#[async_trait]
142impl Catalog for SqlCatalog {
143    /// Catalog name
144    fn name(&self) -> &str {
145        &self.name
146    }
147    /// Create a namespace in the catalog
148    async fn create_namespace(
149        &self,
150        namespace: &Namespace,
151        properties: Option<HashMap<String, String>>,
152    ) -> Result<HashMap<String, String>, IcebergError> {
153        let catalog_name = self.name.clone();
154        let namespace_str = namespace.to_string();
155        let properties = properties.unwrap_or_default();
156
157        // Insert namespace properties into the database
158        for (key, value) in &properties {
159            sqlx::query(&format!(
160                "insert into iceberg_namespace_properties (catalog_name, namespace, property_key, property_value) values ('{catalog_name}', '{namespace_str}', '{key}', '{value}');"
161            ))
162            .execute(&self.pool)
163            .await
164            .map_err(Error::from)?;
165        }
166
167        // If no properties were provided, still create an entry to mark the namespace as existing
168        if properties.is_empty() {
169            sqlx::query(&format!(
170                "insert into iceberg_namespace_properties (catalog_name, namespace, property_key, property_value) values ('{catalog_name}', '{namespace_str}', 'exists', 'true');"
171            ))
172            .execute(&self.pool)
173            .await
174            .map_err(Error::from)?;
175        }
176
177        Ok(properties)
178    }
179    /// Drop a namespace in the catalog
180    async fn drop_namespace(&self, _namespace: &Namespace) -> Result<(), IcebergError> {
181        todo!()
182    }
183    /// Load the namespace properties from the catalog
184    async fn load_namespace(
185        &self,
186        _namespace: &Namespace,
187    ) -> Result<HashMap<String, String>, IcebergError> {
188        todo!()
189    }
190    /// Update the namespace properties in the catalog
191    async fn update_namespace(
192        &self,
193        _namespace: &Namespace,
194        _updates: Option<HashMap<String, String>>,
195        _removals: Option<Vec<String>>,
196    ) -> Result<(), IcebergError> {
197        todo!()
198    }
199    /// Check if a namespace exists
200    async fn namespace_exists(&self, _namespace: &Namespace) -> Result<bool, IcebergError> {
201        todo!()
202    }
203    async fn list_tabulars(&self, namespace: &Namespace) -> Result<Vec<Identifier>, IcebergError> {
204        let name = self.name.clone();
205        let namespace = namespace.to_string();
206
207        let rows = {
208            sqlx::query(&format!("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = '{}' and table_namespace = '{}';",&name, &namespace)).fetch_all(&self.pool).await.map_err(Error::from)?
209        };
210        let iter = rows.iter().map(query_map);
211
212        Ok(iter
213            .map(|x| {
214                x.and_then(|y| {
215                    Identifier::parse(&(y.table_namespace.to_string() + "." + &y.table_name), None)
216                        .map_err(|err| sqlx::Error::Decode(Box::new(err)))
217                })
218            })
219            .collect::<Result<_, sqlx::Error>>()
220            .map_err(Error::from)?)
221    }
222    async fn list_namespaces(&self, _parent: Option<&str>) -> Result<Vec<Namespace>, IcebergError> {
223        let name = self.name.clone();
224
225        let rows = {
226            sqlx::query(&format!(
227                "select distinct namespace from iceberg_namespace_properties where catalog_name = '{name}';",
228            ))
229            .fetch_all(&self.pool)
230            .await
231            .map_err(Error::from)?
232        };
233        let iter = rows.iter().map(|row| row.try_get::<String, _>(0));
234
235        Ok(iter
236            .map(|x| {
237                x.and_then(|y| {
238                    Namespace::try_new(&y.split('.').map(ToString::to_string).collect::<Vec<_>>())
239                        .map_err(|err| sqlx::Error::Decode(Box::new(err)))
240                })
241            })
242            .collect::<Result<_, sqlx::Error>>()
243            .map_err(Error::from)?)
244    }
245    async fn tabular_exists(&self, identifier: &Identifier) -> Result<bool, IcebergError> {
246        let catalog_name = self.name.clone();
247        let namespace = identifier.namespace().to_string();
248        let name = identifier.name().to_string();
249
250        let rows = {
251            sqlx::query(&format!("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name,
252                &namespace,
253                &name)).fetch_all(&self.pool).await.map_err(Error::from)?
254        };
255        let mut iter = rows.iter().map(query_map);
256
257        Ok(iter.next().is_some())
258    }
259    async fn drop_table(&self, identifier: &Identifier) -> Result<(), IcebergError> {
260        let catalog_name = self.name.clone();
261        let namespace = identifier.namespace().to_string();
262        let name = identifier.name().to_string();
263
264        sqlx::query(&format!("delete from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name,
265                &namespace,
266                &name)).execute(&self.pool).await.map_err(Error::from)?;
267        Ok(())
268    }
269    async fn drop_view(&self, identifier: &Identifier) -> Result<(), IcebergError> {
270        let catalog_name = self.name.clone();
271        let namespace = identifier.namespace().to_string();
272        let name = identifier.name().to_string();
273
274        sqlx::query(&format!("delete from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name,
275                &namespace,
276                &name)).execute(&self.pool).await.map_err(Error::from)?;
277        Ok(())
278    }
279    async fn drop_materialized_view(&self, identifier: &Identifier) -> Result<(), IcebergError> {
280        let catalog_name = self.name.clone();
281        let namespace = identifier.namespace().to_string();
282        let name = identifier.name().to_string();
283
284        sqlx::query(&format!("delete from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name,
285                &namespace,
286                &name)).execute(&self.pool).await.map_err(Error::from)?;
287        Ok(())
288    }
289    async fn load_tabular(
290        self: Arc<Self>,
291        identifier: &Identifier,
292    ) -> Result<Tabular, IcebergError> {
293        let path = {
294            let catalog_name = self.name.clone();
295            let namespace = identifier.namespace().to_string();
296            let name = identifier.name().to_string();
297
298            let row = {
299                sqlx::query(&format!("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name,
300                    &namespace,
301                    &name)).fetch_one(&self.pool).await.map_err(|_| IcebergError::CatalogNotFound)?
302            };
303            let row = query_map(&row).map_err(Error::from)?;
304
305            row.metadata_location
306        };
307
308        let bucket = Bucket::from_path(&path)?;
309        let object_store = self.default_object_store(bucket);
310
311        let bytes = object_store
312            .get(&strip_prefix(&path).as_str().into())
313            .await?
314            .bytes()
315            .await?;
316        let metadata: TabularMetadata = serde_json::from_slice(&bytes)?;
317        self.cache
318            .write()
319            .unwrap()
320            .insert(identifier.clone(), (path.clone(), metadata.clone()));
321        match metadata {
322            TabularMetadata::Table(metadata) => Ok(Tabular::Table(
323                Table::new(
324                    identifier.clone(),
325                    self.clone(),
326                    object_store.clone(),
327                    metadata,
328                )
329                .await?,
330            )),
331            TabularMetadata::View(metadata) => Ok(Tabular::View(
332                View::new(identifier.clone(), self.clone(), metadata).await?,
333            )),
334            TabularMetadata::MaterializedView(metadata) => Ok(Tabular::MaterializedView(
335                MaterializedView::new(identifier.clone(), self.clone(), metadata).await?,
336            )),
337        }
338    }
339
340    async fn create_table(
341        self: Arc<Self>,
342        identifier: Identifier,
343        create_table: CreateTable,
344    ) -> Result<Table, IcebergError> {
345        let metadata: TableMetadata = create_table.try_into()?;
346        // Create metadata
347        let location = metadata.location.to_string();
348
349        // Write metadata to object_store
350        let bucket = Bucket::from_path(&location)?;
351        let object_store = self.default_object_store(bucket);
352
353        let metadata_location = new_metadata_location(&metadata);
354        object_store
355            .put_metadata(&metadata_location, metadata.as_ref())
356            .await?;
357
358        object_store.put_version_hint(&metadata_location).await.ok();
359        {
360            let catalog_name = self.name.clone();
361            let namespace = identifier.namespace().to_string();
362            let name = identifier.name().to_string();
363            let metadata_location = metadata_location.to_string();
364
365            sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{catalog_name}', '{namespace}', '{name}', '{metadata_location}');")).execute(&self.pool).await.map_err(Error::from)?;
366        }
367        self.cache.write().unwrap().insert(
368            identifier.clone(),
369            (metadata_location.clone(), metadata.clone().into()),
370        );
371        Ok(Table::new(
372            identifier.clone(),
373            self.clone(),
374            object_store.clone(),
375            metadata,
376        )
377        .await?)
378    }
379
380    async fn create_view(
381        self: Arc<Self>,
382        identifier: Identifier,
383        create_view: CreateView<Option<()>>,
384    ) -> Result<View, IcebergError> {
385        let metadata: ViewMetadata = create_view.try_into()?;
386        // Create metadata
387        let location = metadata.location.to_string();
388
389        // Write metadata to object_store
390        let bucket = Bucket::from_path(&location)?;
391        let object_store = self.default_object_store(bucket);
392
393        let metadata_location = new_metadata_location(&metadata);
394        object_store
395            .put_metadata(&metadata_location, metadata.as_ref())
396            .await?;
397
398        object_store.put_version_hint(&metadata_location).await.ok();
399        {
400            let catalog_name = self.name.clone();
401            let namespace = identifier.namespace().to_string();
402            let name = identifier.name().to_string();
403            let metadata_location = metadata_location.to_string();
404
405            sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{catalog_name}', '{namespace}', '{name}', '{metadata_location}');")).execute(&self.pool).await.map_err(Error::from)?;
406        }
407        self.cache.write().unwrap().insert(
408            identifier.clone(),
409            (metadata_location.clone(), metadata.clone().into()),
410        );
411        Ok(View::new(identifier.clone(), self.clone(), metadata).await?)
412    }
413
414    async fn create_materialized_view(
415        self: Arc<Self>,
416        identifier: Identifier,
417        create_view: CreateMaterializedView,
418    ) -> Result<MaterializedView, IcebergError> {
419        let (create_view, create_table) = create_view.into();
420        let metadata: MaterializedViewMetadata = create_view.try_into()?;
421        let table_metadata: TableMetadata = create_table.try_into()?;
422        // Create metadata
423        let location = metadata.location.to_string();
424
425        // Write metadata to object_store
426        let bucket = Bucket::from_path(&location)?;
427        let object_store = self.default_object_store(bucket);
428
429        let metadata_location = new_metadata_location(&metadata);
430
431        let table_metadata_location = new_metadata_location(&table_metadata);
432        let table_identifier = metadata.current_version(None)?.storage_table();
433        object_store
434            .put_metadata(&metadata_location, metadata.as_ref())
435            .await?;
436        object_store.put_version_hint(&metadata_location).await.ok();
437
438        object_store
439            .put_metadata(&table_metadata_location, table_metadata.as_ref())
440            .await?;
441        {
442            let mut transaction = self.pool.begin().await.map_err(Error::from)?;
443            let catalog_name = self.name.clone();
444            let namespace = identifier.namespace().to_string();
445            let name = identifier.name().to_string();
446            let metadata_location = metadata_location.to_string();
447
448            sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{catalog_name}', '{namespace}', '{name}', '{metadata_location}');")).execute(&mut *transaction).await.map_err(Error::from)?;
449
450            let table_catalog_name = self.name.clone();
451            let table_namespace = table_identifier.namespace().to_string();
452            let table_name = table_identifier.name().to_string();
453            let table_metadata_location = table_metadata_location.to_string();
454
455            sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{table_catalog_name}', '{table_namespace}', '{table_name}', '{table_metadata_location}');")).execute(&mut *transaction).await.map_err(Error::from)?;
456
457            transaction.commit().await.map_err(Error::from)?;
458        }
459        self.cache.write().unwrap().insert(
460            identifier.clone(),
461            (metadata_location.clone(), metadata.clone().into()),
462        );
463        Ok(MaterializedView::new(identifier.clone(), self.clone(), metadata).await?)
464    }
465
466    async fn update_table(self: Arc<Self>, commit: CommitTable) -> Result<Table, IcebergError> {
467        let identifier = commit.identifier;
468        let Some(entry) = self.cache.read().unwrap().get(&identifier).cloned() else {
469            #[allow(clippy::if_same_then_else)]
470            if !matches!(commit.requirements[0], TableRequirement::AssertCreate) {
471                return Err(IcebergError::InvalidFormat(
472                    "Create table assertion".to_owned(),
473                ));
474            } else {
475                return Err(IcebergError::InvalidFormat(
476                    "Create table assertion".to_owned(),
477                ));
478            }
479        };
480        let (previous_metadata_location, metadata) = entry;
481
482        let bucket = Bucket::from_path(&previous_metadata_location)?;
483        let object_store = self.default_object_store(bucket);
484
485        let TabularMetadata::Table(mut metadata) = metadata else {
486            return Err(IcebergError::InvalidFormat(
487                "Table update on entity that is not a table".to_owned(),
488            ));
489        };
490        if !check_table_requirements(&commit.requirements, &metadata) {
491            return Err(IcebergError::InvalidFormat(
492                "Table requirements not valid".to_owned(),
493            ));
494        }
495        apply_table_updates(&mut metadata, commit.updates)?;
496        let metadata_location = new_metadata_location(&metadata);
497        object_store
498            .put_metadata(&metadata_location, metadata.as_ref())
499            .await?;
500        object_store.put_version_hint(&metadata_location).await.ok();
501
502        let catalog_name = self.name.clone();
503        let namespace = identifier.namespace().to_string();
504        let name = identifier.name().to_string();
505        let metadata_file_location = metadata_location.to_string();
506        let previous_metadata_file_location = previous_metadata_location.to_string();
507
508        sqlx::query(&format!("update iceberg_tables set metadata_location = '{metadata_file_location}', previous_metadata_location = '{previous_metadata_file_location}' where catalog_name = '{catalog_name}' and table_namespace = '{namespace}' and table_name = '{name}' and metadata_location = '{previous_metadata_file_location}';")).execute(&self.pool).await.map_err(Error::from)?;
509
510        self.cache.write().unwrap().insert(
511            identifier.clone(),
512            (metadata_location.clone(), metadata.clone().into()),
513        );
514
515        Ok(Table::new(
516            identifier.clone(),
517            self.clone(),
518            object_store.clone(),
519            metadata,
520        )
521        .await?)
522    }
523
524    async fn update_view(
525        self: Arc<Self>,
526        commit: CommitView<Option<()>>,
527    ) -> Result<View, IcebergError> {
528        let identifier = commit.identifier;
529        let Some(entry) = self.cache.read().unwrap().get(&identifier).cloned() else {
530            return Err(IcebergError::InvalidFormat(
531                "Create table assertion".to_owned(),
532            ));
533        };
534        let (previous_metadata_location, mut metadata) = entry;
535
536        let bucket = Bucket::from_path(&previous_metadata_location)?;
537        let object_store = self.default_object_store(bucket);
538
539        let metadata_location = match &mut metadata {
540            TabularMetadata::View(metadata) => {
541                if !check_view_requirements(&commit.requirements, metadata) {
542                    return Err(IcebergError::InvalidFormat(
543                        "View requirements not valid".to_owned(),
544                    ));
545                }
546                apply_view_updates(metadata, commit.updates)?;
547                let metadata_location = new_metadata_location(&*metadata);
548                object_store
549                    .put_metadata(&metadata_location, metadata.as_ref())
550                    .await?;
551                object_store.put_version_hint(&metadata_location).await.ok();
552
553                Ok(metadata_location)
554            }
555            _ => Err(IcebergError::InvalidFormat(
556                "View update on entity that is not a view".to_owned(),
557            )),
558        }?;
559
560        let catalog_name = self.name.clone();
561        let namespace = identifier.namespace().to_string();
562        let name = identifier.name().to_string();
563        let metadata_file_location = metadata_location.to_string();
564        let previous_metadata_file_location = previous_metadata_location.to_string();
565
566        sqlx::query(&format!("update iceberg_tables set metadata_location = '{metadata_file_location}', previous_metadata_location = '{previous_metadata_file_location}' where catalog_name = '{catalog_name}' and table_namespace = '{namespace}' and table_name = '{name}' and metadata_location = '{previous_metadata_file_location}';")).execute(&self.pool).await.map_err(Error::from)?;
567        self.cache.write().unwrap().insert(
568            identifier.clone(),
569            (metadata_location.clone(), metadata.clone()),
570        );
571        if let TabularMetadata::View(metadata) = metadata {
572            Ok(View::new(identifier.clone(), self.clone(), metadata).await?)
573        } else {
574            Err(IcebergError::InvalidFormat(
575                "Entity is not a view".to_owned(),
576            ))
577        }
578    }
579    async fn update_materialized_view(
580        self: Arc<Self>,
581        commit: CommitView<Identifier>,
582    ) -> Result<MaterializedView, IcebergError> {
583        let identifier = commit.identifier;
584        let Some(entry) = self.cache.read().unwrap().get(&identifier).cloned() else {
585            return Err(IcebergError::InvalidFormat(
586                "Create table assertion".to_owned(),
587            ));
588        };
589        let (previous_metadata_location, mut metadata) = entry;
590
591        let bucket = Bucket::from_path(&previous_metadata_location)?;
592        let object_store = self.default_object_store(bucket);
593
594        let metadata_location = match &mut metadata {
595            TabularMetadata::MaterializedView(metadata) => {
596                if !check_view_requirements(&commit.requirements, metadata) {
597                    return Err(IcebergError::InvalidFormat(
598                        "Materialized view requirements not valid".to_owned(),
599                    ));
600                }
601                apply_view_updates(metadata, commit.updates)?;
602
603                let metadata_location = new_metadata_location(&*metadata);
604                object_store
605                    .put_metadata(&metadata_location, metadata.as_ref())
606                    .await?;
607                object_store.put_version_hint(&metadata_location).await.ok();
608
609                Ok(metadata_location)
610            }
611            _ => Err(IcebergError::InvalidFormat(
612                "Materialized view update on entity that is not a materialized view".to_owned(),
613            )),
614        }?;
615
616        let catalog_name = self.name.clone();
617        let namespace = identifier.namespace().to_string();
618        let name = identifier.name().to_string();
619        let metadata_file_location = metadata_location.to_string();
620        let previous_metadata_file_location = previous_metadata_location.to_string();
621
622        sqlx::query(&format!("update iceberg_tables set metadata_location = '{metadata_file_location}', previous_metadata_location = '{previous_metadata_file_location}' where catalog_name = '{catalog_name}' and table_namespace = '{namespace}' and table_name = '{name}' and metadata_location = '{previous_metadata_file_location}';")).execute(&self.pool).await.map_err(Error::from)?;
623        self.cache.write().unwrap().insert(
624            identifier.clone(),
625            (metadata_location.clone(), metadata.clone()),
626        );
627        if let TabularMetadata::MaterializedView(metadata) = metadata {
628            Ok(MaterializedView::new(identifier.clone(), self.clone(), metadata).await?)
629        } else {
630            Err(IcebergError::InvalidFormat(
631                "Entity is not a materialized view".to_owned(),
632            ))
633        }
634    }
635
636    async fn register_table(
637        self: Arc<Self>,
638        identifier: Identifier,
639        metadata_location: &str,
640    ) -> Result<Table, IcebergError> {
641        let bucket = Bucket::from_path(metadata_location)?;
642        let object_store = self.default_object_store(bucket);
643
644        let metadata: TableMetadata = serde_json::from_slice(
645            &object_store
646                .get(&metadata_location.into())
647                .await?
648                .bytes()
649                .await?,
650        )?;
651
652        {
653            let catalog_name = self.name.clone();
654            let namespace = identifier.namespace().to_string();
655            let name = identifier.name().to_string();
656            let metadata_location = metadata_location.to_string();
657
658            sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{catalog_name}', '{namespace}', '{name}', '{metadata_location}');")).execute(&self.pool).await.map_err(Error::from)?;
659        }
660        self.cache.write().unwrap().insert(
661            identifier.clone(),
662            (metadata_location.to_string(), metadata.clone().into()),
663        );
664        Ok(Table::new(
665            identifier.clone(),
666            self.clone(),
667            object_store.clone(),
668            metadata,
669        )
670        .await?)
671    }
672}
673
674impl SqlCatalog {
675    pub fn duplicate(&self, name: &str) -> Self {
676        Self {
677            name: name.to_owned(),
678            pool: self.pool.clone(),
679            object_store: self.object_store.clone(),
680            cache: Arc::new(RwLock::new(HashMap::new())),
681        }
682    }
683}
684
685#[derive(Debug)]
686pub struct SqlCatalogList {
687    pool: AnyPool,
688    object_store: ObjectStoreBuilder,
689}
690
691impl SqlCatalogList {
692    pub async fn new(url: &str, object_store: ObjectStoreBuilder) -> Result<Self, Error> {
693        install_default_drivers();
694
695        let mut pool_options = PoolOptions::new();
696
697        if url.starts_with("sqlite") {
698            pool_options = pool_options.max_connections(1);
699        }
700
701        let pool = AnyPoolOptions::after_connect(pool_options, |connection, _| {
702            Box::pin(async move {
703                connection
704                    .execute(
705                        "create table if not exists iceberg_tables (
706                                catalog_name varchar(255) not null,
707                                table_namespace varchar(255) not null,
708                                table_name varchar(255) not null,
709                                metadata_location varchar(255) not null,
710                                previous_metadata_location varchar(255),
711                                primary key (catalog_name, table_namespace, table_name)
712                            );",
713                    )
714                    .await?;
715                connection
716                    .execute(
717                        "create table if not exists iceberg_namespace_properties (
718                                catalog_name varchar(255) not null,
719                                namespace varchar(255) not null,
720                                property_key varchar(255),
721                                property_value varchar(255),
722                                primary key (catalog_name, namespace, property_key)
723                            );",
724                    )
725                    .await?;
726                Ok(())
727            })
728        })
729        .connect(url)
730        .await?;
731
732        Ok(SqlCatalogList { pool, object_store })
733    }
734}
735
736#[async_trait]
737impl CatalogList for SqlCatalogList {
738    fn catalog(&self, name: &str) -> Option<Arc<dyn Catalog>> {
739        Some(Arc::new(SqlCatalog {
740            name: name.to_owned(),
741            pool: self.pool.clone(),
742            object_store: self.object_store.clone(),
743            cache: Arc::new(RwLock::new(HashMap::new())),
744        }))
745    }
746    async fn list_catalogs(&self) -> Vec<String> {
747        let rows = {
748            sqlx::query("select distinct catalog_name from iceberg_tables;")
749                .fetch_all(&self.pool)
750                .await
751                .map_err(Error::from)
752                .unwrap_or_default()
753        };
754        let iter = rows.iter().map(|row| row.try_get::<String, _>(0));
755
756        iter.collect::<Result<_, sqlx::Error>>()
757            .map_err(Error::from)
758            .unwrap_or_default()
759    }
760}
761
762#[cfg(test)]
763pub mod tests {
764    use datafusion::{
765        arrow::array::{Float64Array, Int64Array},
766        common::tree_node::{TransformedResult, TreeNode},
767        execution::SessionStateBuilder,
768        prelude::SessionContext,
769    };
770    use datafusion_iceberg::{
771        catalog::catalog::IcebergCatalog,
772        planner::{iceberg_transform, IcebergQueryPlanner},
773    };
774    use iceberg_rust::{
775        catalog::{namespace::Namespace, Catalog},
776        object_store::ObjectStoreBuilder,
777        spec::util::strip_prefix,
778    };
779    use object_store::ObjectStoreExt;
780    use testcontainers::{core::ExecCommand, runners::AsyncRunner, ImageExt};
781    use testcontainers_modules::{localstack::LocalStack, postgres::Postgres};
782    use tokio::time::sleep;
783
784    use crate::SqlCatalog;
785    use iceberg_rust::object_store::store::version_hint_content;
786    use std::{sync::Arc, time::Duration};
787
788    #[tokio::test]
789    async fn test_create_update_drop_table() {
790        let localstack = LocalStack::default()
791            .with_env_var("SERVICES", "s3")
792            .with_env_var("AWS_ACCESS_KEY_ID", "user")
793            .with_env_var("AWS_SECRET_ACCESS_KEY", "password")
794            .start()
795            .await
796            .unwrap();
797
798        let command = localstack
799            .exec(ExecCommand::new(vec![
800                "awslocal",
801                "s3api",
802                "create-bucket",
803                "--bucket",
804                "warehouse",
805            ]))
806            .await
807            .unwrap();
808
809        while command.exit_code().await.unwrap().is_none() {
810            sleep(Duration::from_millis(100)).await;
811        }
812
813        let postgres = Postgres::default()
814            .with_db_name("postgres")
815            .with_user("postgres")
816            .with_password("postgres")
817            .start()
818            .await
819            .unwrap();
820
821        let postgres_host = postgres.get_host().await.unwrap();
822        let postgres_port = postgres.get_host_port_ipv4(5432).await.unwrap();
823
824        while command.exit_code().await.unwrap().is_none() {
825            sleep(Duration::from_millis(100)).await;
826        }
827
828        let localstack_host = localstack.get_host().await.unwrap();
829        let localstack_port = localstack.get_host_port_ipv4(4566).await.unwrap();
830
831        let object_store = ObjectStoreBuilder::s3()
832            .with_config("aws_access_key_id", "user")
833            .unwrap()
834            .with_config("aws_secret_access_key", "password")
835            .unwrap()
836            .with_config(
837                "endpoint",
838                format!("http://{localstack_host}:{localstack_port}"),
839            )
840            .unwrap()
841            .with_config("region", "us-east-1")
842            .unwrap()
843            .with_config("allow_http", "true")
844            .unwrap();
845
846        let iceberg_catalog = Arc::new(
847            SqlCatalog::new(
848                &format!("postgres://postgres:postgres@{postgres_host}:{postgres_port}/postgres"),
849                "warehouse",
850                object_store,
851            )
852            .await
853            .unwrap(),
854        );
855
856        let catalog = Arc::new(
857            IcebergCatalog::new(iceberg_catalog.clone(), None)
858                .await
859                .unwrap(),
860        );
861
862        let state = SessionStateBuilder::new()
863            .with_default_features()
864            .with_query_planner(Arc::new(IcebergQueryPlanner::new()))
865            .build();
866
867        let ctx = SessionContext::new_with_state(state);
868
869        ctx.register_catalog("warehouse", catalog);
870
871        let sql = &"CREATE SCHEMA warehouse.tpch;".to_string();
872
873        let plan = ctx.state().create_logical_plan(sql).await.unwrap();
874
875        let transformed = plan.transform(iceberg_transform).data().unwrap();
876
877        ctx.execute_logical_plan(transformed)
878            .await
879            .unwrap()
880            .collect()
881            .await
882            .expect("Failed to execute query plan.");
883
884        let sql = "CREATE EXTERNAL TABLE lineitem ( 
885    L_ORDERKEY BIGINT NOT NULL, 
886    L_PARTKEY BIGINT NOT NULL, 
887    L_SUPPKEY BIGINT NOT NULL, 
888    L_LINENUMBER INT NOT NULL, 
889    L_QUANTITY DOUBLE NOT NULL, 
890    L_EXTENDED_PRICE DOUBLE NOT NULL, 
891    L_DISCOUNT DOUBLE NOT NULL, 
892    L_TAX DOUBLE NOT NULL, 
893    L_RETURNFLAG CHAR NOT NULL, 
894    L_LINESTATUS CHAR NOT NULL, 
895    L_SHIPDATE DATE NOT NULL, 
896    L_COMMITDATE DATE NOT NULL, 
897    L_RECEIPTDATE DATE NOT NULL, 
898    L_SHIPINSTRUCT VARCHAR NOT NULL, 
899    L_SHIPMODE VARCHAR NOT NULL, 
900    L_COMMENT VARCHAR NOT NULL ) STORED AS CSV LOCATION '../../datafusion_iceberg/testdata/tpch/lineitem.csv' OPTIONS ('has_header' 'false');";
901
902        let plan = ctx.state().create_logical_plan(sql).await.unwrap();
903
904        let transformed = plan.transform(iceberg_transform).data().unwrap();
905
906        ctx.execute_logical_plan(transformed)
907            .await
908            .unwrap()
909            .collect()
910            .await
911            .expect("Failed to execute query plan.");
912
913        let sql = "CREATE EXTERNAL TABLE warehouse.tpch.lineitem ( 
914    L_ORDERKEY BIGINT NOT NULL, 
915    L_PARTKEY BIGINT NOT NULL, 
916    L_SUPPKEY BIGINT NOT NULL, 
917    L_LINENUMBER INT NOT NULL, 
918    L_QUANTITY DOUBLE NOT NULL, 
919    L_EXTENDED_PRICE DOUBLE NOT NULL, 
920    L_DISCOUNT DOUBLE NOT NULL, 
921    L_TAX DOUBLE NOT NULL, 
922    L_RETURNFLAG CHAR NOT NULL, 
923    L_LINESTATUS CHAR NOT NULL, 
924    L_SHIPDATE DATE NOT NULL, 
925    L_COMMITDATE DATE NOT NULL, 
926    L_RECEIPTDATE DATE NOT NULL, 
927    L_SHIPINSTRUCT VARCHAR NOT NULL, 
928    L_SHIPMODE VARCHAR NOT NULL, 
929    L_COMMENT VARCHAR NOT NULL ) STORED AS ICEBERG LOCATION 's3://warehouse/tpch/lineitem' PARTITIONED BY ( \"month(L_SHIPDATE)\" );";
930
931        let plan = ctx.state().create_logical_plan(sql).await.unwrap();
932
933        let transformed = plan.transform(iceberg_transform).data().unwrap();
934
935        ctx.execute_logical_plan(transformed)
936            .await
937            .unwrap()
938            .collect()
939            .await
940            .expect("Failed to execute query plan.");
941
942        let tables = iceberg_catalog
943            .clone()
944            .list_tabulars(
945                &Namespace::try_new(&["tpch".to_owned()]).expect("Failed to create namespace"),
946            )
947            .await
948            .expect("Failed to list Tables");
949        assert_eq!(tables[0].to_string(), "tpch.lineitem".to_owned());
950
951        let sql = "insert into warehouse.tpch.lineitem select * from lineitem;";
952
953        let plan = ctx.state().create_logical_plan(sql).await.unwrap();
954
955        let transformed = plan.transform(iceberg_transform).data().unwrap();
956
957        ctx.execute_logical_plan(transformed)
958            .await
959            .unwrap()
960            .collect()
961            .await
962            .expect("Failed to execute query plan.");
963
964        let batches = ctx
965        .sql("select sum(L_QUANTITY), L_PARTKEY from warehouse.tpch.lineitem group by L_PARTKEY;")
966        .await
967        .expect("Failed to create plan for select")
968        .collect()
969        .await
970        .expect("Failed to execute select query");
971
972        let mut once = false;
973
974        for batch in batches {
975            if batch.num_rows() != 0 {
976                let (amounts, product_ids) = (
977                    batch
978                        .column(0)
979                        .as_any()
980                        .downcast_ref::<Float64Array>()
981                        .unwrap(),
982                    batch
983                        .column(1)
984                        .as_any()
985                        .downcast_ref::<Int64Array>()
986                        .unwrap(),
987                );
988                for (product_id, amount) in product_ids.iter().zip(amounts) {
989                    if product_id.unwrap() == 24027 {
990                        assert_eq!(amount.unwrap(), 24.0)
991                    } else if product_id.unwrap() == 63700 {
992                        assert_eq!(amount.unwrap(), 23.0)
993                    }
994                }
995                once = true
996            }
997        }
998
999        assert!(once);
1000
1001        let object_store = iceberg_catalog
1002            .default_object_store(iceberg_rust::object_store::Bucket::S3("warehouse"));
1003
1004        let version_hint = object_store
1005            .get(&strip_prefix("s3://warehouse/tpch/lineitem/metadata/version-hint.text").into())
1006            .await
1007            .unwrap()
1008            .bytes()
1009            .await
1010            .unwrap();
1011
1012        let cache = iceberg_catalog.cache.read().unwrap();
1013        let keys = cache.values().collect::<Vec<_>>();
1014        let version = version_hint_content(&keys[0].clone().0);
1015
1016        assert_eq!(std::str::from_utf8(&version_hint).unwrap(), version);
1017    }
1018}