1use std::{
2 collections::HashMap,
3 sync::{Arc, RwLock},
4};
5
6use async_trait::async_trait;
7use iceberg_rust::{
8 catalog::{
9 commit::{
10 apply_table_updates, apply_view_updates, check_table_requirements,
11 check_view_requirements, CommitTable, CommitView, TableRequirement,
12 },
13 create::{CreateMaterializedView, CreateTable, CreateView},
14 identifier::Identifier,
15 namespace::Namespace,
16 tabular::Tabular,
17 Catalog, CatalogList,
18 },
19 error::Error as IcebergError,
20 materialized_view::MaterializedView,
21 object_store::{store::IcebergStore, Bucket, ObjectStoreBuilder},
22 spec::{
23 identifier::FullIdentifier,
24 materialized_view_metadata::MaterializedViewMetadata,
25 table_metadata::{new_metadata_location, TableMetadata},
26 tabular::TabularMetadata,
27 util::strip_prefix,
28 view_metadata::ViewMetadata,
29 },
30 table::Table,
31 view::View,
32};
33use object_store::ObjectStore;
34use sqlx::{
35 any::{install_default_drivers, AnyPoolOptions, AnyRow},
36 pool::PoolOptions,
37 AnyPool, Executor, Row,
38};
39
40use crate::error::Error;
41
42#[derive(Debug)]
43pub struct SqlCatalog {
44 name: String,
45 pool: AnyPool,
46 object_store: ObjectStoreBuilder,
47 cache: Arc<RwLock<HashMap<Identifier, (String, TabularMetadata)>>>,
48}
49
50pub mod error;
51
52impl SqlCatalog {
53 pub async fn new(
54 url: &str,
55 name: &str,
56 object_store: ObjectStoreBuilder,
57 ) -> Result<Self, Error> {
58 install_default_drivers();
59
60 let mut pool_options = PoolOptions::new();
61
62 if url == "sqlite://" {
63 pool_options = pool_options.max_connections(1);
64 }
65
66 let pool = AnyPoolOptions::after_connect(pool_options, |connection, _| {
67 Box::pin(async move {
68 connection
69 .execute(
70 "create table if not exists iceberg_tables (
71 catalog_name varchar(255) not null,
72 table_namespace varchar(255) not null,
73 table_name varchar(255) not null,
74 metadata_location varchar(255) not null,
75 previous_metadata_location varchar(255),
76 primary key (catalog_name, table_namespace, table_name)
77 );",
78 )
79 .await?;
80 connection
81 .execute(
82 "create table if not exists iceberg_namespace_properties (
83 catalog_name varchar(255) not null,
84 namespace varchar(255) not null,
85 property_key varchar(255),
86 property_value varchar(255),
87 primary key (catalog_name, namespace, property_key)
88 );",
89 )
90 .await?;
91 Ok(())
92 })
93 })
94 .connect_lazy(url)?;
95
96 Ok(SqlCatalog {
97 name: name.to_owned(),
98 pool,
99 object_store,
100 cache: Arc::new(RwLock::new(HashMap::new())),
101 })
102 }
103
104 pub fn catalog_list(&self) -> Arc<SqlCatalogList> {
105 Arc::new(SqlCatalogList {
106 pool: self.pool.clone(),
107 object_store: self.object_store.clone(),
108 })
109 }
110 fn default_object_store(&self, bucket: Bucket) -> Arc<dyn object_store::ObjectStore> {
111 Arc::new(self.object_store.build(bucket).unwrap())
112 }
113}
114
115#[derive(Debug)]
116struct TableRef {
117 table_namespace: String,
118 table_name: String,
119 metadata_location: String,
120 _previous_metadata_location: Option<String>,
121}
122
123fn query_map(row: &AnyRow) -> Result<TableRef, sqlx::Error> {
124 Ok(TableRef {
125 table_namespace: row.try_get(0)?,
126 table_name: row.try_get(1)?,
127 metadata_location: row.try_get(2)?,
128 _previous_metadata_location: row.try_get::<String, _>(3).map(Some).or_else(|err| {
129 if let sqlx::Error::ColumnDecode {
130 index: _,
131 source: _,
132 } = err
133 {
134 Ok(None)
135 } else {
136 Err(err)
137 }
138 })?,
139 })
140}
141
142#[async_trait]
143impl Catalog for SqlCatalog {
144 fn name(&self) -> &str {
146 &self.name
147 }
148 async fn create_namespace(
150 &self,
151 namespace: &Namespace,
152 properties: Option<HashMap<String, String>>,
153 ) -> Result<HashMap<String, String>, IcebergError> {
154 let catalog_name = self.name.clone();
155 let namespace_str = namespace.to_string();
156 let properties = properties.unwrap_or_default();
157
158 for (key, value) in &properties {
160 sqlx::query(&format!(
161 "insert into iceberg_namespace_properties (catalog_name, namespace, property_key, property_value) values ('{catalog_name}', '{namespace_str}', '{key}', '{value}');"
162 ))
163 .execute(&self.pool)
164 .await
165 .map_err(Error::from)?;
166 }
167
168 if properties.is_empty() {
170 sqlx::query(&format!(
171 "insert into iceberg_namespace_properties (catalog_name, namespace, property_key, property_value) values ('{catalog_name}', '{namespace_str}', 'exists', 'true');"
172 ))
173 .execute(&self.pool)
174 .await
175 .map_err(Error::from)?;
176 }
177
178 Ok(properties)
179 }
180 async fn drop_namespace(&self, _namespace: &Namespace) -> Result<(), IcebergError> {
182 todo!()
183 }
184 async fn load_namespace(
186 &self,
187 _namespace: &Namespace,
188 ) -> Result<HashMap<String, String>, IcebergError> {
189 todo!()
190 }
191 async fn update_namespace(
193 &self,
194 _namespace: &Namespace,
195 _updates: Option<HashMap<String, String>>,
196 _removals: Option<Vec<String>>,
197 ) -> Result<(), IcebergError> {
198 todo!()
199 }
200 async fn namespace_exists(&self, _namespace: &Namespace) -> Result<bool, IcebergError> {
202 todo!()
203 }
204 async fn list_tabulars(&self, namespace: &Namespace) -> Result<Vec<Identifier>, IcebergError> {
205 let name = self.name.clone();
206 let namespace = namespace.to_string();
207
208 let rows = {
209 sqlx::query(&format!("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = '{}' and table_namespace = '{}';",&name, &namespace)).fetch_all(&self.pool).await.map_err(Error::from)?
210 };
211 let iter = rows.iter().map(query_map);
212
213 Ok(iter
214 .map(|x| {
215 x.and_then(|y| {
216 Identifier::parse(&(y.table_namespace.to_string() + "." + &y.table_name), None)
217 .map_err(|err| sqlx::Error::Decode(Box::new(err)))
218 })
219 })
220 .collect::<Result<_, sqlx::Error>>()
221 .map_err(Error::from)?)
222 }
223 async fn list_namespaces(&self, _parent: Option<&str>) -> Result<Vec<Namespace>, IcebergError> {
224 let name = self.name.clone();
225
226 let rows = {
227 sqlx::query(&format!(
228 "select distinct namespace from iceberg_namespace_properties where catalog_name = '{name}';",
229 ))
230 .fetch_all(&self.pool)
231 .await
232 .map_err(Error::from)?
233 };
234 let iter = rows.iter().map(|row| row.try_get::<String, _>(0));
235
236 Ok(iter
237 .map(|x| {
238 x.and_then(|y| {
239 Namespace::try_new(&y.split('.').map(ToString::to_string).collect::<Vec<_>>())
240 .map_err(|err| sqlx::Error::Decode(Box::new(err)))
241 })
242 })
243 .collect::<Result<_, sqlx::Error>>()
244 .map_err(Error::from)?)
245 }
246 async fn tabular_exists(&self, identifier: &Identifier) -> Result<bool, IcebergError> {
247 let catalog_name = self.name.clone();
248 let namespace = identifier.namespace().to_string();
249 let name = identifier.name().to_string();
250
251 let rows = {
252 sqlx::query(&format!("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name,
253 &namespace,
254 &name)).fetch_all(&self.pool).await.map_err(Error::from)?
255 };
256 let mut iter = rows.iter().map(query_map);
257
258 Ok(iter.next().is_some())
259 }
260 async fn drop_table(&self, identifier: &Identifier) -> Result<(), IcebergError> {
261 let catalog_name = self.name.clone();
262 let namespace = identifier.namespace().to_string();
263 let name = identifier.name().to_string();
264
265 sqlx::query(&format!("delete from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name,
266 &namespace,
267 &name)).execute(&self.pool).await.map_err(Error::from)?;
268 Ok(())
269 }
270 async fn drop_view(&self, identifier: &Identifier) -> Result<(), IcebergError> {
271 let catalog_name = self.name.clone();
272 let namespace = identifier.namespace().to_string();
273 let name = identifier.name().to_string();
274
275 sqlx::query(&format!("delete from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name,
276 &namespace,
277 &name)).execute(&self.pool).await.map_err(Error::from)?;
278 Ok(())
279 }
280 async fn drop_materialized_view(&self, identifier: &Identifier) -> Result<(), IcebergError> {
281 let catalog_name = self.name.clone();
282 let namespace = identifier.namespace().to_string();
283 let name = identifier.name().to_string();
284
285 sqlx::query(&format!("delete from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name,
286 &namespace,
287 &name)).execute(&self.pool).await.map_err(Error::from)?;
288 Ok(())
289 }
290 async fn load_tabular(
291 self: Arc<Self>,
292 identifier: &Identifier,
293 ) -> Result<Tabular, IcebergError> {
294 let path = {
295 let catalog_name = self.name.clone();
296 let namespace = identifier.namespace().to_string();
297 let name = identifier.name().to_string();
298
299 let row = {
300 sqlx::query(&format!("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name,
301 &namespace,
302 &name)).fetch_one(&self.pool).await.map_err(|_| IcebergError::CatalogNotFound)?
303 };
304 let row = query_map(&row).map_err(Error::from)?;
305
306 row.metadata_location
307 };
308
309 let bucket = Bucket::from_path(&path)?;
310 let object_store = self.default_object_store(bucket);
311
312 let bytes = object_store
313 .get(&strip_prefix(&path).as_str().into())
314 .await?
315 .bytes()
316 .await?;
317 let metadata: TabularMetadata = serde_json::from_slice(&bytes)?;
318 self.cache
319 .write()
320 .unwrap()
321 .insert(identifier.clone(), (path.clone(), metadata.clone()));
322 match metadata {
323 TabularMetadata::Table(metadata) => Ok(Tabular::Table(
324 Table::new(
325 identifier.clone(),
326 self.clone(),
327 object_store.clone(),
328 metadata,
329 )
330 .await?,
331 )),
332 TabularMetadata::View(metadata) => Ok(Tabular::View(
333 View::new(identifier.clone(), self.clone(), metadata).await?,
334 )),
335 TabularMetadata::MaterializedView(metadata) => Ok(Tabular::MaterializedView(
336 MaterializedView::new(identifier.clone(), self.clone(), metadata).await?,
337 )),
338 }
339 }
340
341 async fn create_table(
342 self: Arc<Self>,
343 identifier: Identifier,
344 create_table: CreateTable,
345 ) -> Result<Table, IcebergError> {
346 let metadata: TableMetadata = create_table.try_into()?;
347 let location = metadata.location.to_string();
349
350 let bucket = Bucket::from_path(&location)?;
352 let object_store = self.default_object_store(bucket);
353
354 let metadata_location = new_metadata_location(&metadata);
355 object_store
356 .put_metadata(&metadata_location, metadata.as_ref())
357 .await?;
358
359 object_store.put_version_hint(&metadata_location).await.ok();
360 {
361 let catalog_name = self.name.clone();
362 let namespace = identifier.namespace().to_string();
363 let name = identifier.name().to_string();
364 let metadata_location = metadata_location.to_string();
365
366 sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{catalog_name}', '{namespace}', '{name}', '{metadata_location}');")).execute(&self.pool).await.map_err(Error::from)?;
367 }
368 self.cache.write().unwrap().insert(
369 identifier.clone(),
370 (metadata_location.clone(), metadata.clone().into()),
371 );
372 Ok(Table::new(
373 identifier.clone(),
374 self.clone(),
375 object_store.clone(),
376 metadata,
377 )
378 .await?)
379 }
380
381 async fn create_view(
382 self: Arc<Self>,
383 identifier: Identifier,
384 create_view: CreateView<Option<()>>,
385 ) -> Result<View, IcebergError> {
386 let metadata: ViewMetadata = create_view.try_into()?;
387 let location = metadata.location.to_string();
389
390 let bucket = Bucket::from_path(&location)?;
392 let object_store = self.default_object_store(bucket);
393
394 let metadata_location = new_metadata_location(&metadata);
395 object_store
396 .put_metadata(&metadata_location, metadata.as_ref())
397 .await?;
398
399 object_store.put_version_hint(&metadata_location).await.ok();
400 {
401 let catalog_name = self.name.clone();
402 let namespace = identifier.namespace().to_string();
403 let name = identifier.name().to_string();
404 let metadata_location = metadata_location.to_string();
405
406 sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{catalog_name}', '{namespace}', '{name}', '{metadata_location}');")).execute(&self.pool).await.map_err(Error::from)?;
407 }
408 self.cache.write().unwrap().insert(
409 identifier.clone(),
410 (metadata_location.clone(), metadata.clone().into()),
411 );
412 Ok(View::new(identifier.clone(), self.clone(), metadata).await?)
413 }
414
415 async fn create_materialized_view(
416 self: Arc<Self>,
417 identifier: Identifier,
418 create_view: CreateMaterializedView,
419 ) -> Result<MaterializedView, IcebergError> {
420 let (create_view, create_table) = create_view.into();
421 let metadata: MaterializedViewMetadata = create_view.try_into()?;
422 let table_metadata: TableMetadata = create_table.try_into()?;
423 let location = metadata.location.to_string();
425
426 let bucket = Bucket::from_path(&location)?;
428 let object_store = self.default_object_store(bucket);
429
430 let metadata_location = new_metadata_location(&metadata);
431
432 let table_metadata_location = new_metadata_location(&table_metadata);
433 let table_identifier = metadata.current_version(None)?.storage_table();
434 object_store
435 .put_metadata(&metadata_location, metadata.as_ref())
436 .await?;
437 object_store.put_version_hint(&metadata_location).await.ok();
438
439 object_store
440 .put_metadata(&table_metadata_location, table_metadata.as_ref())
441 .await?;
442 {
443 let mut transaction = self.pool.begin().await.map_err(Error::from)?;
444 let catalog_name = self.name.clone();
445 let namespace = identifier.namespace().to_string();
446 let name = identifier.name().to_string();
447 let metadata_location = metadata_location.to_string();
448
449 sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{catalog_name}', '{namespace}', '{name}', '{metadata_location}');")).execute(&mut *transaction).await.map_err(Error::from)?;
450
451 let table_catalog_name = self.name.clone();
452 let table_namespace = table_identifier.namespace().to_string();
453 let table_name = table_identifier.name().to_string();
454 let table_metadata_location = table_metadata_location.to_string();
455
456 sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{table_catalog_name}', '{table_namespace}', '{table_name}', '{table_metadata_location}');")).execute(&mut *transaction).await.map_err(Error::from)?;
457
458 transaction.commit().await.map_err(Error::from)?;
459 }
460 self.cache.write().unwrap().insert(
461 identifier.clone(),
462 (metadata_location.clone(), metadata.clone().into()),
463 );
464 Ok(MaterializedView::new(identifier.clone(), self.clone(), metadata).await?)
465 }
466
467 async fn update_table(self: Arc<Self>, commit: CommitTable) -> Result<Table, IcebergError> {
468 let identifier = commit.identifier;
469 let Some(entry) = self.cache.read().unwrap().get(&identifier).cloned() else {
470 #[allow(clippy::if_same_then_else)]
471 if !matches!(commit.requirements[0], TableRequirement::AssertCreate) {
472 return Err(IcebergError::InvalidFormat(
473 "Create table assertion".to_owned(),
474 ));
475 } else {
476 return Err(IcebergError::InvalidFormat(
477 "Create table assertion".to_owned(),
478 ));
479 }
480 };
481 let (previous_metadata_location, metadata) = entry;
482
483 let bucket = Bucket::from_path(&previous_metadata_location)?;
484 let object_store = self.default_object_store(bucket);
485
486 let TabularMetadata::Table(mut metadata) = metadata else {
487 return Err(IcebergError::InvalidFormat(
488 "Table update on entity that is not a table".to_owned(),
489 ));
490 };
491 if !check_table_requirements(&commit.requirements, &metadata) {
492 return Err(IcebergError::InvalidFormat(
493 "Table requirements not valid".to_owned(),
494 ));
495 }
496 apply_table_updates(&mut metadata, commit.updates)?;
497 let metadata_location = new_metadata_location(&metadata);
498 object_store
499 .put_metadata(&metadata_location, metadata.as_ref())
500 .await?;
501 object_store.put_version_hint(&metadata_location).await.ok();
502
503 let catalog_name = self.name.clone();
504 let namespace = identifier.namespace().to_string();
505 let name = identifier.name().to_string();
506 let metadata_file_location = metadata_location.to_string();
507 let previous_metadata_file_location = previous_metadata_location.to_string();
508
509 sqlx::query(&format!("update iceberg_tables set metadata_location = '{metadata_file_location}', previous_metadata_location = '{previous_metadata_file_location}' where catalog_name = '{catalog_name}' and table_namespace = '{namespace}' and table_name = '{name}' and metadata_location = '{previous_metadata_file_location}';")).execute(&self.pool).await.map_err(Error::from)?;
510
511 self.cache.write().unwrap().insert(
512 identifier.clone(),
513 (metadata_location.clone(), metadata.clone().into()),
514 );
515
516 Ok(Table::new(
517 identifier.clone(),
518 self.clone(),
519 object_store.clone(),
520 metadata,
521 )
522 .await?)
523 }
524
525 async fn update_view(
526 self: Arc<Self>,
527 commit: CommitView<Option<()>>,
528 ) -> Result<View, IcebergError> {
529 let identifier = commit.identifier;
530 let Some(entry) = self.cache.read().unwrap().get(&identifier).cloned() else {
531 return Err(IcebergError::InvalidFormat(
532 "Create table assertion".to_owned(),
533 ));
534 };
535 let (previous_metadata_location, mut metadata) = entry;
536
537 let bucket = Bucket::from_path(&previous_metadata_location)?;
538 let object_store = self.default_object_store(bucket);
539
540 let metadata_location = match &mut metadata {
541 TabularMetadata::View(metadata) => {
542 if !check_view_requirements(&commit.requirements, metadata) {
543 return Err(IcebergError::InvalidFormat(
544 "View requirements not valid".to_owned(),
545 ));
546 }
547 apply_view_updates(metadata, commit.updates)?;
548 let metadata_location = new_metadata_location(&*metadata);
549 object_store
550 .put_metadata(&metadata_location, metadata.as_ref())
551 .await?;
552 object_store.put_version_hint(&metadata_location).await.ok();
553
554 Ok(metadata_location)
555 }
556 _ => Err(IcebergError::InvalidFormat(
557 "View update on entity that is not a view".to_owned(),
558 )),
559 }?;
560
561 let catalog_name = self.name.clone();
562 let namespace = identifier.namespace().to_string();
563 let name = identifier.name().to_string();
564 let metadata_file_location = metadata_location.to_string();
565 let previous_metadata_file_location = previous_metadata_location.to_string();
566
567 sqlx::query(&format!("update iceberg_tables set metadata_location = '{metadata_file_location}', previous_metadata_location = '{previous_metadata_file_location}' where catalog_name = '{catalog_name}' and table_namespace = '{namespace}' and table_name = '{name}' and metadata_location = '{previous_metadata_file_location}';")).execute(&self.pool).await.map_err(Error::from)?;
568 self.cache.write().unwrap().insert(
569 identifier.clone(),
570 (metadata_location.clone(), metadata.clone()),
571 );
572 if let TabularMetadata::View(metadata) = metadata {
573 Ok(View::new(identifier.clone(), self.clone(), metadata).await?)
574 } else {
575 Err(IcebergError::InvalidFormat(
576 "Entity is not a view".to_owned(),
577 ))
578 }
579 }
580 async fn update_materialized_view(
581 self: Arc<Self>,
582 commit: CommitView<FullIdentifier>,
583 ) -> Result<MaterializedView, IcebergError> {
584 let identifier = commit.identifier;
585 let Some(entry) = self.cache.read().unwrap().get(&identifier).cloned() else {
586 return Err(IcebergError::InvalidFormat(
587 "Create table assertion".to_owned(),
588 ));
589 };
590 let (previous_metadata_location, mut metadata) = entry;
591
592 let bucket = Bucket::from_path(&previous_metadata_location)?;
593 let object_store = self.default_object_store(bucket);
594
595 let metadata_location = match &mut metadata {
596 TabularMetadata::MaterializedView(metadata) => {
597 if !check_view_requirements(&commit.requirements, metadata) {
598 return Err(IcebergError::InvalidFormat(
599 "Materialized view requirements not valid".to_owned(),
600 ));
601 }
602 apply_view_updates(metadata, commit.updates)?;
603
604 let metadata_location = new_metadata_location(&*metadata);
605 object_store
606 .put_metadata(&metadata_location, metadata.as_ref())
607 .await?;
608 object_store.put_version_hint(&metadata_location).await.ok();
609
610 Ok(metadata_location)
611 }
612 _ => Err(IcebergError::InvalidFormat(
613 "Materialized view update on entity that is not a materialized view".to_owned(),
614 )),
615 }?;
616
617 let catalog_name = self.name.clone();
618 let namespace = identifier.namespace().to_string();
619 let name = identifier.name().to_string();
620 let metadata_file_location = metadata_location.to_string();
621 let previous_metadata_file_location = previous_metadata_location.to_string();
622
623 sqlx::query(&format!("update iceberg_tables set metadata_location = '{metadata_file_location}', previous_metadata_location = '{previous_metadata_file_location}' where catalog_name = '{catalog_name}' and table_namespace = '{namespace}' and table_name = '{name}' and metadata_location = '{previous_metadata_file_location}';")).execute(&self.pool).await.map_err(Error::from)?;
624 self.cache.write().unwrap().insert(
625 identifier.clone(),
626 (metadata_location.clone(), metadata.clone()),
627 );
628 if let TabularMetadata::MaterializedView(metadata) = metadata {
629 Ok(MaterializedView::new(identifier.clone(), self.clone(), metadata).await?)
630 } else {
631 Err(IcebergError::InvalidFormat(
632 "Entity is not a materialized view".to_owned(),
633 ))
634 }
635 }
636
637 async fn register_table(
638 self: Arc<Self>,
639 identifier: Identifier,
640 metadata_location: &str,
641 ) -> Result<Table, IcebergError> {
642 let bucket = Bucket::from_path(metadata_location)?;
643 let object_store = self.default_object_store(bucket);
644
645 let metadata: TableMetadata = serde_json::from_slice(
646 &object_store
647 .get(&metadata_location.into())
648 .await?
649 .bytes()
650 .await?,
651 )?;
652
653 {
654 let catalog_name = self.name.clone();
655 let namespace = identifier.namespace().to_string();
656 let name = identifier.name().to_string();
657 let metadata_location = metadata_location.to_string();
658
659 sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{catalog_name}', '{namespace}', '{name}', '{metadata_location}');")).execute(&self.pool).await.map_err(Error::from)?;
660 }
661 self.cache.write().unwrap().insert(
662 identifier.clone(),
663 (metadata_location.to_string(), metadata.clone().into()),
664 );
665 Ok(Table::new(
666 identifier.clone(),
667 self.clone(),
668 object_store.clone(),
669 metadata,
670 )
671 .await?)
672 }
673}
674
675impl SqlCatalog {
676 pub fn duplicate(&self, name: &str) -> Self {
677 Self {
678 name: name.to_owned(),
679 pool: self.pool.clone(),
680 object_store: self.object_store.clone(),
681 cache: Arc::new(RwLock::new(HashMap::new())),
682 }
683 }
684}
685
686#[derive(Debug)]
687pub struct SqlCatalogList {
688 pool: AnyPool,
689 object_store: ObjectStoreBuilder,
690}
691
692impl SqlCatalogList {
693 pub async fn new(url: &str, object_store: ObjectStoreBuilder) -> Result<Self, Error> {
694 install_default_drivers();
695
696 let mut pool_options = PoolOptions::new();
697
698 if url.starts_with("sqlite") {
699 pool_options = pool_options.max_connections(1);
700 }
701
702 let pool = AnyPoolOptions::after_connect(pool_options, |connection, _| {
703 Box::pin(async move {
704 connection
705 .execute(
706 "create table if not exists iceberg_tables (
707 catalog_name varchar(255) not null,
708 table_namespace varchar(255) not null,
709 table_name varchar(255) not null,
710 metadata_location varchar(255) not null,
711 previous_metadata_location varchar(255),
712 primary key (catalog_name, table_namespace, table_name)
713 );",
714 )
715 .await?;
716 connection
717 .execute(
718 "create table if not exists iceberg_namespace_properties (
719 catalog_name varchar(255) not null,
720 namespace varchar(255) not null,
721 property_key varchar(255),
722 property_value varchar(255),
723 primary key (catalog_name, namespace, property_key)
724 );",
725 )
726 .await?;
727 Ok(())
728 })
729 })
730 .connect(url)
731 .await?;
732
733 Ok(SqlCatalogList { pool, object_store })
734 }
735}
736
737#[async_trait]
738impl CatalogList for SqlCatalogList {
739 fn catalog(&self, name: &str) -> Option<Arc<dyn Catalog>> {
740 Some(Arc::new(SqlCatalog {
741 name: name.to_owned(),
742 pool: self.pool.clone(),
743 object_store: self.object_store.clone(),
744 cache: Arc::new(RwLock::new(HashMap::new())),
745 }))
746 }
747 async fn list_catalogs(&self) -> Vec<String> {
748 let rows = {
749 sqlx::query("select distinct catalog_name from iceberg_tables;")
750 .fetch_all(&self.pool)
751 .await
752 .map_err(Error::from)
753 .unwrap_or_default()
754 };
755 let iter = rows.iter().map(|row| row.try_get::<String, _>(0));
756
757 iter.collect::<Result<_, sqlx::Error>>()
758 .map_err(Error::from)
759 .unwrap_or_default()
760 }
761}
762
763#[cfg(test)]
764pub mod tests {
765 use datafusion::{
766 arrow::array::{Float64Array, Int64Array},
767 common::tree_node::{TransformedResult, TreeNode},
768 execution::SessionStateBuilder,
769 prelude::SessionContext,
770 };
771 use datafusion_iceberg::{
772 catalog::catalog::IcebergCatalog,
773 planner::{iceberg_transform, IcebergQueryPlanner},
774 };
775 use iceberg_rust::{
776 catalog::{namespace::Namespace, Catalog},
777 object_store::ObjectStoreBuilder,
778 spec::util::strip_prefix,
779 };
780 use testcontainers::{core::ExecCommand, runners::AsyncRunner, ImageExt};
781 use testcontainers_modules::{localstack::LocalStack, postgres::Postgres};
782 use tokio::time::sleep;
783
784 use crate::SqlCatalog;
785 use iceberg_rust::object_store::store::version_hint_content;
786 use std::{sync::Arc, time::Duration};
787
788 #[tokio::test]
789 async fn test_create_update_drop_table() {
790 let localstack = LocalStack::default()
791 .with_env_var("SERVICES", "s3")
792 .with_env_var("AWS_ACCESS_KEY_ID", "user")
793 .with_env_var("AWS_SECRET_ACCESS_KEY", "password")
794 .start()
795 .await
796 .unwrap();
797
798 let command = localstack
799 .exec(ExecCommand::new(vec![
800 "awslocal",
801 "s3api",
802 "create-bucket",
803 "--bucket",
804 "warehouse",
805 ]))
806 .await
807 .unwrap();
808
809 while command.exit_code().await.unwrap().is_none() {
810 sleep(Duration::from_millis(100)).await;
811 }
812
813 let postgres = Postgres::default()
814 .with_db_name("postgres")
815 .with_user("postgres")
816 .with_password("postgres")
817 .start()
818 .await
819 .unwrap();
820
821 let postgres_host = postgres.get_host().await.unwrap();
822 let postgres_port = postgres.get_host_port_ipv4(5432).await.unwrap();
823
824 while command.exit_code().await.unwrap().is_none() {
825 sleep(Duration::from_millis(100)).await;
826 }
827
828 let localstack_host = localstack.get_host().await.unwrap();
829 let localstack_port = localstack.get_host_port_ipv4(4566).await.unwrap();
830
831 let object_store = ObjectStoreBuilder::s3()
832 .with_config("aws_access_key_id", "user")
833 .unwrap()
834 .with_config("aws_secret_access_key", "password")
835 .unwrap()
836 .with_config(
837 "endpoint",
838 format!("http://{localstack_host}:{localstack_port}"),
839 )
840 .unwrap()
841 .with_config("region", "us-east-1")
842 .unwrap()
843 .with_config("allow_http", "true")
844 .unwrap();
845
846 let iceberg_catalog = Arc::new(
847 SqlCatalog::new(
848 &format!("postgres://postgres:postgres@{postgres_host}:{postgres_port}/postgres"),
849 "warehouse",
850 object_store,
851 )
852 .await
853 .unwrap(),
854 );
855
856 let catalog = Arc::new(
857 IcebergCatalog::new(iceberg_catalog.clone(), None)
858 .await
859 .unwrap(),
860 );
861
862 let state = SessionStateBuilder::new()
863 .with_default_features()
864 .with_query_planner(Arc::new(IcebergQueryPlanner::new()))
865 .build();
866
867 let ctx = SessionContext::new_with_state(state);
868
869 ctx.register_catalog("warehouse", catalog);
870
871 let sql = &"CREATE SCHEMA warehouse.tpch;".to_string();
872
873 let plan = ctx.state().create_logical_plan(sql).await.unwrap();
874
875 let transformed = plan.transform(iceberg_transform).data().unwrap();
876
877 ctx.execute_logical_plan(transformed)
878 .await
879 .unwrap()
880 .collect()
881 .await
882 .expect("Failed to execute query plan.");
883
884 let sql = "CREATE EXTERNAL TABLE lineitem (
885 L_ORDERKEY BIGINT NOT NULL,
886 L_PARTKEY BIGINT NOT NULL,
887 L_SUPPKEY BIGINT NOT NULL,
888 L_LINENUMBER INT NOT NULL,
889 L_QUANTITY DOUBLE NOT NULL,
890 L_EXTENDED_PRICE DOUBLE NOT NULL,
891 L_DISCOUNT DOUBLE NOT NULL,
892 L_TAX DOUBLE NOT NULL,
893 L_RETURNFLAG CHAR NOT NULL,
894 L_LINESTATUS CHAR NOT NULL,
895 L_SHIPDATE DATE NOT NULL,
896 L_COMMITDATE DATE NOT NULL,
897 L_RECEIPTDATE DATE NOT NULL,
898 L_SHIPINSTRUCT VARCHAR NOT NULL,
899 L_SHIPMODE VARCHAR NOT NULL,
900 L_COMMENT VARCHAR NOT NULL ) STORED AS CSV LOCATION '../../datafusion_iceberg/testdata/tpch/lineitem.csv' OPTIONS ('has_header' 'false');";
901
902 let plan = ctx.state().create_logical_plan(sql).await.unwrap();
903
904 let transformed = plan.transform(iceberg_transform).data().unwrap();
905
906 ctx.execute_logical_plan(transformed)
907 .await
908 .unwrap()
909 .collect()
910 .await
911 .expect("Failed to execute query plan.");
912
913 let sql = "CREATE EXTERNAL TABLE warehouse.tpch.lineitem (
914 L_ORDERKEY BIGINT NOT NULL,
915 L_PARTKEY BIGINT NOT NULL,
916 L_SUPPKEY BIGINT NOT NULL,
917 L_LINENUMBER INT NOT NULL,
918 L_QUANTITY DOUBLE NOT NULL,
919 L_EXTENDED_PRICE DOUBLE NOT NULL,
920 L_DISCOUNT DOUBLE NOT NULL,
921 L_TAX DOUBLE NOT NULL,
922 L_RETURNFLAG CHAR NOT NULL,
923 L_LINESTATUS CHAR NOT NULL,
924 L_SHIPDATE DATE NOT NULL,
925 L_COMMITDATE DATE NOT NULL,
926 L_RECEIPTDATE DATE NOT NULL,
927 L_SHIPINSTRUCT VARCHAR NOT NULL,
928 L_SHIPMODE VARCHAR NOT NULL,
929 L_COMMENT VARCHAR NOT NULL ) STORED AS ICEBERG LOCATION 's3://warehouse/tpch/lineitem' PARTITIONED BY ( \"month(L_SHIPDATE)\" );";
930
931 let plan = ctx.state().create_logical_plan(sql).await.unwrap();
932
933 let transformed = plan.transform(iceberg_transform).data().unwrap();
934
935 ctx.execute_logical_plan(transformed)
936 .await
937 .unwrap()
938 .collect()
939 .await
940 .expect("Failed to execute query plan.");
941
942 let tables = iceberg_catalog
943 .clone()
944 .list_tabulars(
945 &Namespace::try_new(&["tpch".to_owned()]).expect("Failed to create namespace"),
946 )
947 .await
948 .expect("Failed to list Tables");
949 assert_eq!(tables[0].to_string(), "tpch.lineitem".to_owned());
950
951 let sql = "insert into warehouse.tpch.lineitem select * from lineitem;";
952
953 let plan = ctx.state().create_logical_plan(sql).await.unwrap();
954
955 let transformed = plan.transform(iceberg_transform).data().unwrap();
956
957 ctx.execute_logical_plan(transformed)
958 .await
959 .unwrap()
960 .collect()
961 .await
962 .expect("Failed to execute query plan.");
963
964 let batches = ctx
965 .sql("select sum(L_QUANTITY), L_PARTKEY from warehouse.tpch.lineitem group by L_PARTKEY;")
966 .await
967 .expect("Failed to create plan for select")
968 .collect()
969 .await
970 .expect("Failed to execute select query");
971
972 let mut once = false;
973
974 for batch in batches {
975 if batch.num_rows() != 0 {
976 let (amounts, product_ids) = (
977 batch
978 .column(0)
979 .as_any()
980 .downcast_ref::<Float64Array>()
981 .unwrap(),
982 batch
983 .column(1)
984 .as_any()
985 .downcast_ref::<Int64Array>()
986 .unwrap(),
987 );
988 for (product_id, amount) in product_ids.iter().zip(amounts) {
989 if product_id.unwrap() == 24027 {
990 assert_eq!(amount.unwrap(), 24.0)
991 } else if product_id.unwrap() == 63700 {
992 assert_eq!(amount.unwrap(), 23.0)
993 }
994 }
995 once = true
996 }
997 }
998
999 assert!(once);
1000
1001 let object_store = iceberg_catalog
1002 .default_object_store(iceberg_rust::object_store::Bucket::S3("warehouse"));
1003
1004 let version_hint = object_store
1005 .get(&strip_prefix("s3://warehouse/tpch/lineitem/metadata/version-hint.text").into())
1006 .await
1007 .unwrap()
1008 .bytes()
1009 .await
1010 .unwrap();
1011
1012 let cache = iceberg_catalog.cache.read().unwrap();
1013 let keys = cache.values().collect::<Vec<_>>();
1014 let version = version_hint_content(&keys[0].clone().0);
1015
1016 assert_eq!(std::str::from_utf8(&version_hint).unwrap(), version);
1017 }
1018}