iceberg_sql_catalog/
lib.rs

1use std::{
2    collections::HashMap,
3    sync::{Arc, RwLock},
4};
5
6use async_trait::async_trait;
7use iceberg_rust::{
8    catalog::{
9        commit::{
10            apply_table_updates, apply_view_updates, check_table_requirements,
11            check_view_requirements, CommitTable, CommitView, TableRequirement,
12        },
13        create::{CreateMaterializedView, CreateTable, CreateView},
14        identifier::Identifier,
15        namespace::Namespace,
16        tabular::Tabular,
17        Catalog, CatalogList,
18    },
19    error::Error as IcebergError,
20    materialized_view::MaterializedView,
21    object_store::{store::IcebergStore, Bucket, ObjectStoreBuilder},
22    spec::{
23        identifier::FullIdentifier,
24        materialized_view_metadata::MaterializedViewMetadata,
25        table_metadata::{new_metadata_location, TableMetadata},
26        tabular::TabularMetadata,
27        util::strip_prefix,
28        view_metadata::ViewMetadata,
29    },
30    table::Table,
31    view::View,
32};
33use object_store::ObjectStore;
34use sqlx::{
35    any::{install_default_drivers, AnyPoolOptions, AnyRow},
36    pool::PoolOptions,
37    AnyPool, Executor, Row,
38};
39
40use crate::error::Error;
41
42#[derive(Debug)]
43pub struct SqlCatalog {
44    name: String,
45    pool: AnyPool,
46    object_store: ObjectStoreBuilder,
47    cache: Arc<RwLock<HashMap<Identifier, (String, TabularMetadata)>>>,
48}
49
50pub mod error;
51
52impl SqlCatalog {
53    pub async fn new(
54        url: &str,
55        name: &str,
56        object_store: ObjectStoreBuilder,
57    ) -> Result<Self, Error> {
58        install_default_drivers();
59
60        let mut pool_options = PoolOptions::new();
61
62        if url == "sqlite://" {
63            pool_options = pool_options.max_connections(1);
64        }
65
66        let pool = AnyPoolOptions::after_connect(pool_options, |connection, _| {
67            Box::pin(async move {
68                connection
69                    .execute(
70                        "create table if not exists iceberg_tables (
71                                catalog_name varchar(255) not null,
72                                table_namespace varchar(255) not null,
73                                table_name varchar(255) not null,
74                                metadata_location varchar(255) not null,
75                                previous_metadata_location varchar(255),
76                                primary key (catalog_name, table_namespace, table_name)
77                            );",
78                    )
79                    .await?;
80                connection
81                    .execute(
82                        "create table if not exists iceberg_namespace_properties (
83                                catalog_name varchar(255) not null,
84                                namespace varchar(255) not null,
85                                property_key varchar(255),
86                                property_value varchar(255),
87                                primary key (catalog_name, namespace, property_key)
88                            );",
89                    )
90                    .await?;
91                Ok(())
92            })
93        })
94        .connect_lazy(url)?;
95
96        Ok(SqlCatalog {
97            name: name.to_owned(),
98            pool,
99            object_store,
100            cache: Arc::new(RwLock::new(HashMap::new())),
101        })
102    }
103
104    pub fn catalog_list(&self) -> Arc<SqlCatalogList> {
105        Arc::new(SqlCatalogList {
106            pool: self.pool.clone(),
107            object_store: self.object_store.clone(),
108        })
109    }
110    fn default_object_store(&self, bucket: Bucket) -> Arc<dyn object_store::ObjectStore> {
111        Arc::new(self.object_store.build(bucket).unwrap())
112    }
113}
114
115#[derive(Debug)]
116struct TableRef {
117    table_namespace: String,
118    table_name: String,
119    metadata_location: String,
120    _previous_metadata_location: Option<String>,
121}
122
123fn query_map(row: &AnyRow) -> Result<TableRef, sqlx::Error> {
124    Ok(TableRef {
125        table_namespace: row.try_get(0)?,
126        table_name: row.try_get(1)?,
127        metadata_location: row.try_get(2)?,
128        _previous_metadata_location: row.try_get::<String, _>(3).map(Some).or_else(|err| {
129            if let sqlx::Error::ColumnDecode {
130                index: _,
131                source: _,
132            } = err
133            {
134                Ok(None)
135            } else {
136                Err(err)
137            }
138        })?,
139    })
140}
141
142#[async_trait]
143impl Catalog for SqlCatalog {
144    /// Catalog name
145    fn name(&self) -> &str {
146        &self.name
147    }
148    /// Create a namespace in the catalog
149    async fn create_namespace(
150        &self,
151        namespace: &Namespace,
152        properties: Option<HashMap<String, String>>,
153    ) -> Result<HashMap<String, String>, IcebergError> {
154        let catalog_name = self.name.clone();
155        let namespace_str = namespace.to_string();
156        let properties = properties.unwrap_or_default();
157
158        // Insert namespace properties into the database
159        for (key, value) in &properties {
160            sqlx::query(&format!(
161                "insert into iceberg_namespace_properties (catalog_name, namespace, property_key, property_value) values ('{catalog_name}', '{namespace_str}', '{key}', '{value}');"
162            ))
163            .execute(&self.pool)
164            .await
165            .map_err(Error::from)?;
166        }
167
168        // If no properties were provided, still create an entry to mark the namespace as existing
169        if properties.is_empty() {
170            sqlx::query(&format!(
171                "insert into iceberg_namespace_properties (catalog_name, namespace, property_key, property_value) values ('{catalog_name}', '{namespace_str}', 'exists', 'true');"
172            ))
173            .execute(&self.pool)
174            .await
175            .map_err(Error::from)?;
176        }
177
178        Ok(properties)
179    }
180    /// Drop a namespace in the catalog
181    async fn drop_namespace(&self, _namespace: &Namespace) -> Result<(), IcebergError> {
182        todo!()
183    }
184    /// Load the namespace properties from the catalog
185    async fn load_namespace(
186        &self,
187        _namespace: &Namespace,
188    ) -> Result<HashMap<String, String>, IcebergError> {
189        todo!()
190    }
191    /// Update the namespace properties in the catalog
192    async fn update_namespace(
193        &self,
194        _namespace: &Namespace,
195        _updates: Option<HashMap<String, String>>,
196        _removals: Option<Vec<String>>,
197    ) -> Result<(), IcebergError> {
198        todo!()
199    }
200    /// Check if a namespace exists
201    async fn namespace_exists(&self, _namespace: &Namespace) -> Result<bool, IcebergError> {
202        todo!()
203    }
204    async fn list_tabulars(&self, namespace: &Namespace) -> Result<Vec<Identifier>, IcebergError> {
205        let name = self.name.clone();
206        let namespace = namespace.to_string();
207
208        let rows = {
209            sqlx::query(&format!("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = '{}' and table_namespace = '{}';",&name, &namespace)).fetch_all(&self.pool).await.map_err(Error::from)?
210        };
211        let iter = rows.iter().map(query_map);
212
213        Ok(iter
214            .map(|x| {
215                x.and_then(|y| {
216                    Identifier::parse(&(y.table_namespace.to_string() + "." + &y.table_name), None)
217                        .map_err(|err| sqlx::Error::Decode(Box::new(err)))
218                })
219            })
220            .collect::<Result<_, sqlx::Error>>()
221            .map_err(Error::from)?)
222    }
223    async fn list_namespaces(&self, _parent: Option<&str>) -> Result<Vec<Namespace>, IcebergError> {
224        let name = self.name.clone();
225
226        let rows = {
227            sqlx::query(&format!(
228                "select distinct namespace from iceberg_namespace_properties where catalog_name = '{name}';",
229            ))
230            .fetch_all(&self.pool)
231            .await
232            .map_err(Error::from)?
233        };
234        let iter = rows.iter().map(|row| row.try_get::<String, _>(0));
235
236        Ok(iter
237            .map(|x| {
238                x.and_then(|y| {
239                    Namespace::try_new(&y.split('.').map(ToString::to_string).collect::<Vec<_>>())
240                        .map_err(|err| sqlx::Error::Decode(Box::new(err)))
241                })
242            })
243            .collect::<Result<_, sqlx::Error>>()
244            .map_err(Error::from)?)
245    }
246    async fn tabular_exists(&self, identifier: &Identifier) -> Result<bool, IcebergError> {
247        let catalog_name = self.name.clone();
248        let namespace = identifier.namespace().to_string();
249        let name = identifier.name().to_string();
250
251        let rows = {
252            sqlx::query(&format!("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name,
253                &namespace,
254                &name)).fetch_all(&self.pool).await.map_err(Error::from)?
255        };
256        let mut iter = rows.iter().map(query_map);
257
258        Ok(iter.next().is_some())
259    }
260    async fn drop_table(&self, identifier: &Identifier) -> Result<(), IcebergError> {
261        let catalog_name = self.name.clone();
262        let namespace = identifier.namespace().to_string();
263        let name = identifier.name().to_string();
264
265        sqlx::query(&format!("delete from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name,
266                &namespace,
267                &name)).execute(&self.pool).await.map_err(Error::from)?;
268        Ok(())
269    }
270    async fn drop_view(&self, identifier: &Identifier) -> Result<(), IcebergError> {
271        let catalog_name = self.name.clone();
272        let namespace = identifier.namespace().to_string();
273        let name = identifier.name().to_string();
274
275        sqlx::query(&format!("delete from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name,
276                &namespace,
277                &name)).execute(&self.pool).await.map_err(Error::from)?;
278        Ok(())
279    }
280    async fn drop_materialized_view(&self, identifier: &Identifier) -> Result<(), IcebergError> {
281        let catalog_name = self.name.clone();
282        let namespace = identifier.namespace().to_string();
283        let name = identifier.name().to_string();
284
285        sqlx::query(&format!("delete from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name,
286                &namespace,
287                &name)).execute(&self.pool).await.map_err(Error::from)?;
288        Ok(())
289    }
290    async fn load_tabular(
291        self: Arc<Self>,
292        identifier: &Identifier,
293    ) -> Result<Tabular, IcebergError> {
294        let path = {
295            let catalog_name = self.name.clone();
296            let namespace = identifier.namespace().to_string();
297            let name = identifier.name().to_string();
298
299            let row = {
300                sqlx::query(&format!("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name,
301                    &namespace,
302                    &name)).fetch_one(&self.pool).await.map_err(|_| IcebergError::CatalogNotFound)?
303            };
304            let row = query_map(&row).map_err(Error::from)?;
305
306            row.metadata_location
307        };
308
309        let bucket = Bucket::from_path(&path)?;
310        let object_store = self.default_object_store(bucket);
311
312        let bytes = object_store
313            .get(&strip_prefix(&path).as_str().into())
314            .await?
315            .bytes()
316            .await?;
317        let metadata: TabularMetadata = serde_json::from_slice(&bytes)?;
318        self.cache
319            .write()
320            .unwrap()
321            .insert(identifier.clone(), (path.clone(), metadata.clone()));
322        match metadata {
323            TabularMetadata::Table(metadata) => Ok(Tabular::Table(
324                Table::new(
325                    identifier.clone(),
326                    self.clone(),
327                    object_store.clone(),
328                    metadata,
329                )
330                .await?,
331            )),
332            TabularMetadata::View(metadata) => Ok(Tabular::View(
333                View::new(identifier.clone(), self.clone(), metadata).await?,
334            )),
335            TabularMetadata::MaterializedView(metadata) => Ok(Tabular::MaterializedView(
336                MaterializedView::new(identifier.clone(), self.clone(), metadata).await?,
337            )),
338        }
339    }
340
341    async fn create_table(
342        self: Arc<Self>,
343        identifier: Identifier,
344        create_table: CreateTable,
345    ) -> Result<Table, IcebergError> {
346        let metadata: TableMetadata = create_table.try_into()?;
347        // Create metadata
348        let location = metadata.location.to_string();
349
350        // Write metadata to object_store
351        let bucket = Bucket::from_path(&location)?;
352        let object_store = self.default_object_store(bucket);
353
354        let metadata_location = new_metadata_location(&metadata);
355        object_store
356            .put_metadata(&metadata_location, metadata.as_ref())
357            .await?;
358
359        object_store.put_version_hint(&metadata_location).await.ok();
360        {
361            let catalog_name = self.name.clone();
362            let namespace = identifier.namespace().to_string();
363            let name = identifier.name().to_string();
364            let metadata_location = metadata_location.to_string();
365
366            sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{catalog_name}', '{namespace}', '{name}', '{metadata_location}');")).execute(&self.pool).await.map_err(Error::from)?;
367        }
368        self.cache.write().unwrap().insert(
369            identifier.clone(),
370            (metadata_location.clone(), metadata.clone().into()),
371        );
372        Ok(Table::new(
373            identifier.clone(),
374            self.clone(),
375            object_store.clone(),
376            metadata,
377        )
378        .await?)
379    }
380
381    async fn create_view(
382        self: Arc<Self>,
383        identifier: Identifier,
384        create_view: CreateView<Option<()>>,
385    ) -> Result<View, IcebergError> {
386        let metadata: ViewMetadata = create_view.try_into()?;
387        // Create metadata
388        let location = metadata.location.to_string();
389
390        // Write metadata to object_store
391        let bucket = Bucket::from_path(&location)?;
392        let object_store = self.default_object_store(bucket);
393
394        let metadata_location = new_metadata_location(&metadata);
395        object_store
396            .put_metadata(&metadata_location, metadata.as_ref())
397            .await?;
398
399        object_store.put_version_hint(&metadata_location).await.ok();
400        {
401            let catalog_name = self.name.clone();
402            let namespace = identifier.namespace().to_string();
403            let name = identifier.name().to_string();
404            let metadata_location = metadata_location.to_string();
405
406            sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{catalog_name}', '{namespace}', '{name}', '{metadata_location}');")).execute(&self.pool).await.map_err(Error::from)?;
407        }
408        self.cache.write().unwrap().insert(
409            identifier.clone(),
410            (metadata_location.clone(), metadata.clone().into()),
411        );
412        Ok(View::new(identifier.clone(), self.clone(), metadata).await?)
413    }
414
415    async fn create_materialized_view(
416        self: Arc<Self>,
417        identifier: Identifier,
418        create_view: CreateMaterializedView,
419    ) -> Result<MaterializedView, IcebergError> {
420        let (create_view, create_table) = create_view.into();
421        let metadata: MaterializedViewMetadata = create_view.try_into()?;
422        let table_metadata: TableMetadata = create_table.try_into()?;
423        // Create metadata
424        let location = metadata.location.to_string();
425
426        // Write metadata to object_store
427        let bucket = Bucket::from_path(&location)?;
428        let object_store = self.default_object_store(bucket);
429
430        let metadata_location = new_metadata_location(&metadata);
431
432        let table_metadata_location = new_metadata_location(&table_metadata);
433        let table_identifier = metadata.current_version(None)?.storage_table();
434        object_store
435            .put_metadata(&metadata_location, metadata.as_ref())
436            .await?;
437        object_store.put_version_hint(&metadata_location).await.ok();
438
439        object_store
440            .put_metadata(&table_metadata_location, table_metadata.as_ref())
441            .await?;
442        {
443            let mut transaction = self.pool.begin().await.map_err(Error::from)?;
444            let catalog_name = self.name.clone();
445            let namespace = identifier.namespace().to_string();
446            let name = identifier.name().to_string();
447            let metadata_location = metadata_location.to_string();
448
449            sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{catalog_name}', '{namespace}', '{name}', '{metadata_location}');")).execute(&mut *transaction).await.map_err(Error::from)?;
450
451            let table_catalog_name = self.name.clone();
452            let table_namespace = table_identifier.namespace().to_string();
453            let table_name = table_identifier.name().to_string();
454            let table_metadata_location = table_metadata_location.to_string();
455
456            sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{table_catalog_name}', '{table_namespace}', '{table_name}', '{table_metadata_location}');")).execute(&mut *transaction).await.map_err(Error::from)?;
457
458            transaction.commit().await.map_err(Error::from)?;
459        }
460        self.cache.write().unwrap().insert(
461            identifier.clone(),
462            (metadata_location.clone(), metadata.clone().into()),
463        );
464        Ok(MaterializedView::new(identifier.clone(), self.clone(), metadata).await?)
465    }
466
467    async fn update_table(self: Arc<Self>, commit: CommitTable) -> Result<Table, IcebergError> {
468        let identifier = commit.identifier;
469        let Some(entry) = self.cache.read().unwrap().get(&identifier).cloned() else {
470            #[allow(clippy::if_same_then_else)]
471            if !matches!(commit.requirements[0], TableRequirement::AssertCreate) {
472                return Err(IcebergError::InvalidFormat(
473                    "Create table assertion".to_owned(),
474                ));
475            } else {
476                return Err(IcebergError::InvalidFormat(
477                    "Create table assertion".to_owned(),
478                ));
479            }
480        };
481        let (previous_metadata_location, metadata) = entry;
482
483        let bucket = Bucket::from_path(&previous_metadata_location)?;
484        let object_store = self.default_object_store(bucket);
485
486        let TabularMetadata::Table(mut metadata) = metadata else {
487            return Err(IcebergError::InvalidFormat(
488                "Table update on entity that is not a table".to_owned(),
489            ));
490        };
491        if !check_table_requirements(&commit.requirements, &metadata) {
492            return Err(IcebergError::InvalidFormat(
493                "Table requirements not valid".to_owned(),
494            ));
495        }
496        apply_table_updates(&mut metadata, commit.updates)?;
497        let metadata_location = new_metadata_location(&metadata);
498        object_store
499            .put_metadata(&metadata_location, metadata.as_ref())
500            .await?;
501        object_store.put_version_hint(&metadata_location).await.ok();
502
503        let catalog_name = self.name.clone();
504        let namespace = identifier.namespace().to_string();
505        let name = identifier.name().to_string();
506        let metadata_file_location = metadata_location.to_string();
507        let previous_metadata_file_location = previous_metadata_location.to_string();
508
509        sqlx::query(&format!("update iceberg_tables set metadata_location = '{metadata_file_location}', previous_metadata_location = '{previous_metadata_file_location}' where catalog_name = '{catalog_name}' and table_namespace = '{namespace}' and table_name = '{name}' and metadata_location = '{previous_metadata_file_location}';")).execute(&self.pool).await.map_err(Error::from)?;
510
511        self.cache.write().unwrap().insert(
512            identifier.clone(),
513            (metadata_location.clone(), metadata.clone().into()),
514        );
515
516        Ok(Table::new(
517            identifier.clone(),
518            self.clone(),
519            object_store.clone(),
520            metadata,
521        )
522        .await?)
523    }
524
525    async fn update_view(
526        self: Arc<Self>,
527        commit: CommitView<Option<()>>,
528    ) -> Result<View, IcebergError> {
529        let identifier = commit.identifier;
530        let Some(entry) = self.cache.read().unwrap().get(&identifier).cloned() else {
531            return Err(IcebergError::InvalidFormat(
532                "Create table assertion".to_owned(),
533            ));
534        };
535        let (previous_metadata_location, mut metadata) = entry;
536
537        let bucket = Bucket::from_path(&previous_metadata_location)?;
538        let object_store = self.default_object_store(bucket);
539
540        let metadata_location = match &mut metadata {
541            TabularMetadata::View(metadata) => {
542                if !check_view_requirements(&commit.requirements, metadata) {
543                    return Err(IcebergError::InvalidFormat(
544                        "View requirements not valid".to_owned(),
545                    ));
546                }
547                apply_view_updates(metadata, commit.updates)?;
548                let metadata_location = new_metadata_location(&*metadata);
549                object_store
550                    .put_metadata(&metadata_location, metadata.as_ref())
551                    .await?;
552                object_store.put_version_hint(&metadata_location).await.ok();
553
554                Ok(metadata_location)
555            }
556            _ => Err(IcebergError::InvalidFormat(
557                "View update on entity that is not a view".to_owned(),
558            )),
559        }?;
560
561        let catalog_name = self.name.clone();
562        let namespace = identifier.namespace().to_string();
563        let name = identifier.name().to_string();
564        let metadata_file_location = metadata_location.to_string();
565        let previous_metadata_file_location = previous_metadata_location.to_string();
566
567        sqlx::query(&format!("update iceberg_tables set metadata_location = '{metadata_file_location}', previous_metadata_location = '{previous_metadata_file_location}' where catalog_name = '{catalog_name}' and table_namespace = '{namespace}' and table_name = '{name}' and metadata_location = '{previous_metadata_file_location}';")).execute(&self.pool).await.map_err(Error::from)?;
568        self.cache.write().unwrap().insert(
569            identifier.clone(),
570            (metadata_location.clone(), metadata.clone()),
571        );
572        if let TabularMetadata::View(metadata) = metadata {
573            Ok(View::new(identifier.clone(), self.clone(), metadata).await?)
574        } else {
575            Err(IcebergError::InvalidFormat(
576                "Entity is not a view".to_owned(),
577            ))
578        }
579    }
580    async fn update_materialized_view(
581        self: Arc<Self>,
582        commit: CommitView<FullIdentifier>,
583    ) -> Result<MaterializedView, IcebergError> {
584        let identifier = commit.identifier;
585        let Some(entry) = self.cache.read().unwrap().get(&identifier).cloned() else {
586            return Err(IcebergError::InvalidFormat(
587                "Create table assertion".to_owned(),
588            ));
589        };
590        let (previous_metadata_location, mut metadata) = entry;
591
592        let bucket = Bucket::from_path(&previous_metadata_location)?;
593        let object_store = self.default_object_store(bucket);
594
595        let metadata_location = match &mut metadata {
596            TabularMetadata::MaterializedView(metadata) => {
597                if !check_view_requirements(&commit.requirements, metadata) {
598                    return Err(IcebergError::InvalidFormat(
599                        "Materialized view requirements not valid".to_owned(),
600                    ));
601                }
602                apply_view_updates(metadata, commit.updates)?;
603
604                let metadata_location = new_metadata_location(&*metadata);
605                object_store
606                    .put_metadata(&metadata_location, metadata.as_ref())
607                    .await?;
608                object_store.put_version_hint(&metadata_location).await.ok();
609
610                Ok(metadata_location)
611            }
612            _ => Err(IcebergError::InvalidFormat(
613                "Materialized view update on entity that is not a materialized view".to_owned(),
614            )),
615        }?;
616
617        let catalog_name = self.name.clone();
618        let namespace = identifier.namespace().to_string();
619        let name = identifier.name().to_string();
620        let metadata_file_location = metadata_location.to_string();
621        let previous_metadata_file_location = previous_metadata_location.to_string();
622
623        sqlx::query(&format!("update iceberg_tables set metadata_location = '{metadata_file_location}', previous_metadata_location = '{previous_metadata_file_location}' where catalog_name = '{catalog_name}' and table_namespace = '{namespace}' and table_name = '{name}' and metadata_location = '{previous_metadata_file_location}';")).execute(&self.pool).await.map_err(Error::from)?;
624        self.cache.write().unwrap().insert(
625            identifier.clone(),
626            (metadata_location.clone(), metadata.clone()),
627        );
628        if let TabularMetadata::MaterializedView(metadata) = metadata {
629            Ok(MaterializedView::new(identifier.clone(), self.clone(), metadata).await?)
630        } else {
631            Err(IcebergError::InvalidFormat(
632                "Entity is not a materialized view".to_owned(),
633            ))
634        }
635    }
636
637    async fn register_table(
638        self: Arc<Self>,
639        identifier: Identifier,
640        metadata_location: &str,
641    ) -> Result<Table, IcebergError> {
642        let bucket = Bucket::from_path(metadata_location)?;
643        let object_store = self.default_object_store(bucket);
644
645        let metadata: TableMetadata = serde_json::from_slice(
646            &object_store
647                .get(&metadata_location.into())
648                .await?
649                .bytes()
650                .await?,
651        )?;
652
653        {
654            let catalog_name = self.name.clone();
655            let namespace = identifier.namespace().to_string();
656            let name = identifier.name().to_string();
657            let metadata_location = metadata_location.to_string();
658
659            sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{catalog_name}', '{namespace}', '{name}', '{metadata_location}');")).execute(&self.pool).await.map_err(Error::from)?;
660        }
661        self.cache.write().unwrap().insert(
662            identifier.clone(),
663            (metadata_location.to_string(), metadata.clone().into()),
664        );
665        Ok(Table::new(
666            identifier.clone(),
667            self.clone(),
668            object_store.clone(),
669            metadata,
670        )
671        .await?)
672    }
673}
674
675impl SqlCatalog {
676    pub fn duplicate(&self, name: &str) -> Self {
677        Self {
678            name: name.to_owned(),
679            pool: self.pool.clone(),
680            object_store: self.object_store.clone(),
681            cache: Arc::new(RwLock::new(HashMap::new())),
682        }
683    }
684}
685
686#[derive(Debug)]
687pub struct SqlCatalogList {
688    pool: AnyPool,
689    object_store: ObjectStoreBuilder,
690}
691
692impl SqlCatalogList {
693    pub async fn new(url: &str, object_store: ObjectStoreBuilder) -> Result<Self, Error> {
694        install_default_drivers();
695
696        let mut pool_options = PoolOptions::new();
697
698        if url.starts_with("sqlite") {
699            pool_options = pool_options.max_connections(1);
700        }
701
702        let pool = AnyPoolOptions::after_connect(pool_options, |connection, _| {
703            Box::pin(async move {
704                connection
705                    .execute(
706                        "create table if not exists iceberg_tables (
707                                catalog_name varchar(255) not null,
708                                table_namespace varchar(255) not null,
709                                table_name varchar(255) not null,
710                                metadata_location varchar(255) not null,
711                                previous_metadata_location varchar(255),
712                                primary key (catalog_name, table_namespace, table_name)
713                            );",
714                    )
715                    .await?;
716                connection
717                    .execute(
718                        "create table if not exists iceberg_namespace_properties (
719                                catalog_name varchar(255) not null,
720                                namespace varchar(255) not null,
721                                property_key varchar(255),
722                                property_value varchar(255),
723                                primary key (catalog_name, namespace, property_key)
724                            );",
725                    )
726                    .await?;
727                Ok(())
728            })
729        })
730        .connect(url)
731        .await?;
732
733        Ok(SqlCatalogList { pool, object_store })
734    }
735}
736
737#[async_trait]
738impl CatalogList for SqlCatalogList {
739    fn catalog(&self, name: &str) -> Option<Arc<dyn Catalog>> {
740        Some(Arc::new(SqlCatalog {
741            name: name.to_owned(),
742            pool: self.pool.clone(),
743            object_store: self.object_store.clone(),
744            cache: Arc::new(RwLock::new(HashMap::new())),
745        }))
746    }
747    async fn list_catalogs(&self) -> Vec<String> {
748        let rows = {
749            sqlx::query("select distinct catalog_name from iceberg_tables;")
750                .fetch_all(&self.pool)
751                .await
752                .map_err(Error::from)
753                .unwrap_or_default()
754        };
755        let iter = rows.iter().map(|row| row.try_get::<String, _>(0));
756
757        iter.collect::<Result<_, sqlx::Error>>()
758            .map_err(Error::from)
759            .unwrap_or_default()
760    }
761}
762
763#[cfg(test)]
764pub mod tests {
765    use datafusion::{
766        arrow::array::{Float64Array, Int64Array},
767        common::tree_node::{TransformedResult, TreeNode},
768        execution::SessionStateBuilder,
769        prelude::SessionContext,
770    };
771    use datafusion_iceberg::{
772        catalog::catalog::IcebergCatalog,
773        planner::{iceberg_transform, IcebergQueryPlanner},
774    };
775    use iceberg_rust::{
776        catalog::{namespace::Namespace, Catalog},
777        object_store::ObjectStoreBuilder,
778        spec::util::strip_prefix,
779    };
780    use testcontainers::{core::ExecCommand, runners::AsyncRunner, ImageExt};
781    use testcontainers_modules::{localstack::LocalStack, postgres::Postgres};
782    use tokio::time::sleep;
783
784    use crate::SqlCatalog;
785    use iceberg_rust::object_store::store::version_hint_content;
786    use std::{sync::Arc, time::Duration};
787
788    #[tokio::test]
789    async fn test_create_update_drop_table() {
790        let localstack = LocalStack::default()
791            .with_env_var("SERVICES", "s3")
792            .with_env_var("AWS_ACCESS_KEY_ID", "user")
793            .with_env_var("AWS_SECRET_ACCESS_KEY", "password")
794            .start()
795            .await
796            .unwrap();
797
798        let command = localstack
799            .exec(ExecCommand::new(vec![
800                "awslocal",
801                "s3api",
802                "create-bucket",
803                "--bucket",
804                "warehouse",
805            ]))
806            .await
807            .unwrap();
808
809        while command.exit_code().await.unwrap().is_none() {
810            sleep(Duration::from_millis(100)).await;
811        }
812
813        let postgres = Postgres::default()
814            .with_db_name("postgres")
815            .with_user("postgres")
816            .with_password("postgres")
817            .start()
818            .await
819            .unwrap();
820
821        let postgres_host = postgres.get_host().await.unwrap();
822        let postgres_port = postgres.get_host_port_ipv4(5432).await.unwrap();
823
824        while command.exit_code().await.unwrap().is_none() {
825            sleep(Duration::from_millis(100)).await;
826        }
827
828        let localstack_host = localstack.get_host().await.unwrap();
829        let localstack_port = localstack.get_host_port_ipv4(4566).await.unwrap();
830
831        let object_store = ObjectStoreBuilder::s3()
832            .with_config("aws_access_key_id", "user")
833            .unwrap()
834            .with_config("aws_secret_access_key", "password")
835            .unwrap()
836            .with_config(
837                "endpoint",
838                format!("http://{localstack_host}:{localstack_port}"),
839            )
840            .unwrap()
841            .with_config("region", "us-east-1")
842            .unwrap()
843            .with_config("allow_http", "true")
844            .unwrap();
845
846        let iceberg_catalog = Arc::new(
847            SqlCatalog::new(
848                &format!("postgres://postgres:postgres@{postgres_host}:{postgres_port}/postgres"),
849                "warehouse",
850                object_store,
851            )
852            .await
853            .unwrap(),
854        );
855
856        let catalog = Arc::new(
857            IcebergCatalog::new(iceberg_catalog.clone(), None)
858                .await
859                .unwrap(),
860        );
861
862        let state = SessionStateBuilder::new()
863            .with_default_features()
864            .with_query_planner(Arc::new(IcebergQueryPlanner::new()))
865            .build();
866
867        let ctx = SessionContext::new_with_state(state);
868
869        ctx.register_catalog("warehouse", catalog);
870
871        let sql = &"CREATE SCHEMA warehouse.tpch;".to_string();
872
873        let plan = ctx.state().create_logical_plan(sql).await.unwrap();
874
875        let transformed = plan.transform(iceberg_transform).data().unwrap();
876
877        ctx.execute_logical_plan(transformed)
878            .await
879            .unwrap()
880            .collect()
881            .await
882            .expect("Failed to execute query plan.");
883
884        let sql = "CREATE EXTERNAL TABLE lineitem ( 
885    L_ORDERKEY BIGINT NOT NULL, 
886    L_PARTKEY BIGINT NOT NULL, 
887    L_SUPPKEY BIGINT NOT NULL, 
888    L_LINENUMBER INT NOT NULL, 
889    L_QUANTITY DOUBLE NOT NULL, 
890    L_EXTENDED_PRICE DOUBLE NOT NULL, 
891    L_DISCOUNT DOUBLE NOT NULL, 
892    L_TAX DOUBLE NOT NULL, 
893    L_RETURNFLAG CHAR NOT NULL, 
894    L_LINESTATUS CHAR NOT NULL, 
895    L_SHIPDATE DATE NOT NULL, 
896    L_COMMITDATE DATE NOT NULL, 
897    L_RECEIPTDATE DATE NOT NULL, 
898    L_SHIPINSTRUCT VARCHAR NOT NULL, 
899    L_SHIPMODE VARCHAR NOT NULL, 
900    L_COMMENT VARCHAR NOT NULL ) STORED AS CSV LOCATION '../../datafusion_iceberg/testdata/tpch/lineitem.csv' OPTIONS ('has_header' 'false');";
901
902        let plan = ctx.state().create_logical_plan(sql).await.unwrap();
903
904        let transformed = plan.transform(iceberg_transform).data().unwrap();
905
906        ctx.execute_logical_plan(transformed)
907            .await
908            .unwrap()
909            .collect()
910            .await
911            .expect("Failed to execute query plan.");
912
913        let sql = "CREATE EXTERNAL TABLE warehouse.tpch.lineitem ( 
914    L_ORDERKEY BIGINT NOT NULL, 
915    L_PARTKEY BIGINT NOT NULL, 
916    L_SUPPKEY BIGINT NOT NULL, 
917    L_LINENUMBER INT NOT NULL, 
918    L_QUANTITY DOUBLE NOT NULL, 
919    L_EXTENDED_PRICE DOUBLE NOT NULL, 
920    L_DISCOUNT DOUBLE NOT NULL, 
921    L_TAX DOUBLE NOT NULL, 
922    L_RETURNFLAG CHAR NOT NULL, 
923    L_LINESTATUS CHAR NOT NULL, 
924    L_SHIPDATE DATE NOT NULL, 
925    L_COMMITDATE DATE NOT NULL, 
926    L_RECEIPTDATE DATE NOT NULL, 
927    L_SHIPINSTRUCT VARCHAR NOT NULL, 
928    L_SHIPMODE VARCHAR NOT NULL, 
929    L_COMMENT VARCHAR NOT NULL ) STORED AS ICEBERG LOCATION 's3://warehouse/tpch/lineitem' PARTITIONED BY ( \"month(L_SHIPDATE)\" );";
930
931        let plan = ctx.state().create_logical_plan(sql).await.unwrap();
932
933        let transformed = plan.transform(iceberg_transform).data().unwrap();
934
935        ctx.execute_logical_plan(transformed)
936            .await
937            .unwrap()
938            .collect()
939            .await
940            .expect("Failed to execute query plan.");
941
942        let tables = iceberg_catalog
943            .clone()
944            .list_tabulars(
945                &Namespace::try_new(&["tpch".to_owned()]).expect("Failed to create namespace"),
946            )
947            .await
948            .expect("Failed to list Tables");
949        assert_eq!(tables[0].to_string(), "tpch.lineitem".to_owned());
950
951        let sql = "insert into warehouse.tpch.lineitem select * from lineitem;";
952
953        let plan = ctx.state().create_logical_plan(sql).await.unwrap();
954
955        let transformed = plan.transform(iceberg_transform).data().unwrap();
956
957        ctx.execute_logical_plan(transformed)
958            .await
959            .unwrap()
960            .collect()
961            .await
962            .expect("Failed to execute query plan.");
963
964        let batches = ctx
965        .sql("select sum(L_QUANTITY), L_PARTKEY from warehouse.tpch.lineitem group by L_PARTKEY;")
966        .await
967        .expect("Failed to create plan for select")
968        .collect()
969        .await
970        .expect("Failed to execute select query");
971
972        let mut once = false;
973
974        for batch in batches {
975            if batch.num_rows() != 0 {
976                let (amounts, product_ids) = (
977                    batch
978                        .column(0)
979                        .as_any()
980                        .downcast_ref::<Float64Array>()
981                        .unwrap(),
982                    batch
983                        .column(1)
984                        .as_any()
985                        .downcast_ref::<Int64Array>()
986                        .unwrap(),
987                );
988                for (product_id, amount) in product_ids.iter().zip(amounts) {
989                    if product_id.unwrap() == 24027 {
990                        assert_eq!(amount.unwrap(), 24.0)
991                    } else if product_id.unwrap() == 63700 {
992                        assert_eq!(amount.unwrap(), 23.0)
993                    }
994                }
995                once = true
996            }
997        }
998
999        assert!(once);
1000
1001        let object_store = iceberg_catalog
1002            .default_object_store(iceberg_rust::object_store::Bucket::S3("warehouse"));
1003
1004        let version_hint = object_store
1005            .get(&strip_prefix("s3://warehouse/tpch/lineitem/metadata/version-hint.text").into())
1006            .await
1007            .unwrap()
1008            .bytes()
1009            .await
1010            .unwrap();
1011
1012        let cache = iceberg_catalog.cache.read().unwrap();
1013        let keys = cache.values().collect::<Vec<_>>();
1014        let version = version_hint_content(&keys[0].clone().0);
1015
1016        assert_eq!(std::str::from_utf8(&version_hint).unwrap(), version);
1017    }
1018}