1use std::{
2 collections::HashMap,
3 sync::{Arc, RwLock},
4};
5
6use async_trait::async_trait;
7use iceberg_rust::{
8 catalog::{
9 commit::{
10 apply_table_updates, apply_view_updates, check_table_requirements,
11 check_view_requirements, CommitTable, CommitView, TableRequirement,
12 },
13 create::{CreateMaterializedView, CreateTable, CreateView},
14 identifier::Identifier,
15 namespace::Namespace,
16 tabular::Tabular,
17 Catalog, CatalogList,
18 },
19 error::Error as IcebergError,
20 materialized_view::MaterializedView,
21 object_store::{store::IcebergStore, Bucket, ObjectStoreBuilder},
22 spec::{
23 materialized_view_metadata::MaterializedViewMetadata,
24 table_metadata::{new_metadata_location, TableMetadata},
25 tabular::TabularMetadata,
26 util::strip_prefix,
27 view_metadata::ViewMetadata,
28 },
29 table::Table,
30 view::View,
31};
32use object_store::ObjectStoreExt;
33use sqlx::{
34 any::{install_default_drivers, AnyPoolOptions, AnyRow},
35 pool::PoolOptions,
36 AnyPool, Executor, Row,
37};
38
39use crate::error::Error;
40
41#[derive(Debug)]
42pub struct SqlCatalog {
43 name: String,
44 pool: AnyPool,
45 object_store: ObjectStoreBuilder,
46 cache: Arc<RwLock<HashMap<Identifier, (String, TabularMetadata)>>>,
47}
48
49pub mod error;
50
51impl SqlCatalog {
52 pub async fn new(
53 url: &str,
54 name: &str,
55 object_store: ObjectStoreBuilder,
56 ) -> Result<Self, Error> {
57 install_default_drivers();
58
59 let mut pool_options = PoolOptions::new();
60
61 if url == "sqlite://" {
62 pool_options = pool_options.max_connections(1);
63 }
64
65 let pool = AnyPoolOptions::after_connect(pool_options, |connection, _| {
66 Box::pin(async move {
67 connection
68 .execute(
69 "create table if not exists iceberg_tables (
70 catalog_name varchar(255) not null,
71 table_namespace varchar(255) not null,
72 table_name varchar(255) not null,
73 metadata_location varchar(255) not null,
74 previous_metadata_location varchar(255),
75 primary key (catalog_name, table_namespace, table_name)
76 );",
77 )
78 .await?;
79 connection
80 .execute(
81 "create table if not exists iceberg_namespace_properties (
82 catalog_name varchar(255) not null,
83 namespace varchar(255) not null,
84 property_key varchar(255),
85 property_value varchar(255),
86 primary key (catalog_name, namespace, property_key)
87 );",
88 )
89 .await?;
90 Ok(())
91 })
92 })
93 .connect_lazy(url)?;
94
95 Ok(SqlCatalog {
96 name: name.to_owned(),
97 pool,
98 object_store,
99 cache: Arc::new(RwLock::new(HashMap::new())),
100 })
101 }
102
103 pub fn catalog_list(&self) -> Arc<SqlCatalogList> {
104 Arc::new(SqlCatalogList {
105 pool: self.pool.clone(),
106 object_store: self.object_store.clone(),
107 })
108 }
109 fn default_object_store(&self, bucket: Bucket) -> Arc<dyn object_store::ObjectStore> {
110 Arc::new(self.object_store.build(bucket).unwrap())
111 }
112}
113
114#[derive(Debug)]
115struct TableRef {
116 table_namespace: String,
117 table_name: String,
118 metadata_location: String,
119 _previous_metadata_location: Option<String>,
120}
121
122fn query_map(row: &AnyRow) -> Result<TableRef, sqlx::Error> {
123 Ok(TableRef {
124 table_namespace: row.try_get(0)?,
125 table_name: row.try_get(1)?,
126 metadata_location: row.try_get(2)?,
127 _previous_metadata_location: row.try_get::<String, _>(3).map(Some).or_else(|err| {
128 if let sqlx::Error::ColumnDecode {
129 index: _,
130 source: _,
131 } = err
132 {
133 Ok(None)
134 } else {
135 Err(err)
136 }
137 })?,
138 })
139}
140
141#[async_trait]
142impl Catalog for SqlCatalog {
143 fn name(&self) -> &str {
145 &self.name
146 }
147 async fn create_namespace(
149 &self,
150 namespace: &Namespace,
151 properties: Option<HashMap<String, String>>,
152 ) -> Result<HashMap<String, String>, IcebergError> {
153 let catalog_name = self.name.clone();
154 let namespace_str = namespace.to_string();
155 let properties = properties.unwrap_or_default();
156
157 for (key, value) in &properties {
159 sqlx::query(&format!(
160 "insert into iceberg_namespace_properties (catalog_name, namespace, property_key, property_value) values ('{catalog_name}', '{namespace_str}', '{key}', '{value}');"
161 ))
162 .execute(&self.pool)
163 .await
164 .map_err(Error::from)?;
165 }
166
167 if properties.is_empty() {
169 sqlx::query(&format!(
170 "insert into iceberg_namespace_properties (catalog_name, namespace, property_key, property_value) values ('{catalog_name}', '{namespace_str}', 'exists', 'true');"
171 ))
172 .execute(&self.pool)
173 .await
174 .map_err(Error::from)?;
175 }
176
177 Ok(properties)
178 }
179 async fn drop_namespace(&self, _namespace: &Namespace) -> Result<(), IcebergError> {
181 todo!()
182 }
183 async fn load_namespace(
185 &self,
186 _namespace: &Namespace,
187 ) -> Result<HashMap<String, String>, IcebergError> {
188 todo!()
189 }
190 async fn update_namespace(
192 &self,
193 _namespace: &Namespace,
194 _updates: Option<HashMap<String, String>>,
195 _removals: Option<Vec<String>>,
196 ) -> Result<(), IcebergError> {
197 todo!()
198 }
199 async fn namespace_exists(&self, _namespace: &Namespace) -> Result<bool, IcebergError> {
201 todo!()
202 }
203 async fn list_tabulars(&self, namespace: &Namespace) -> Result<Vec<Identifier>, IcebergError> {
204 let name = self.name.clone();
205 let namespace = namespace.to_string();
206
207 let rows = {
208 sqlx::query(&format!("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = '{}' and table_namespace = '{}';",&name, &namespace)).fetch_all(&self.pool).await.map_err(Error::from)?
209 };
210 let iter = rows.iter().map(query_map);
211
212 Ok(iter
213 .map(|x| {
214 x.and_then(|y| {
215 Identifier::parse(&(y.table_namespace.to_string() + "." + &y.table_name), None)
216 .map_err(|err| sqlx::Error::Decode(Box::new(err)))
217 })
218 })
219 .collect::<Result<_, sqlx::Error>>()
220 .map_err(Error::from)?)
221 }
222 async fn list_namespaces(&self, _parent: Option<&str>) -> Result<Vec<Namespace>, IcebergError> {
223 let name = self.name.clone();
224
225 let rows = {
226 sqlx::query(&format!(
227 "select distinct namespace from iceberg_namespace_properties where catalog_name = '{name}';",
228 ))
229 .fetch_all(&self.pool)
230 .await
231 .map_err(Error::from)?
232 };
233 let iter = rows.iter().map(|row| row.try_get::<String, _>(0));
234
235 Ok(iter
236 .map(|x| {
237 x.and_then(|y| {
238 Namespace::try_new(&y.split('.').map(ToString::to_string).collect::<Vec<_>>())
239 .map_err(|err| sqlx::Error::Decode(Box::new(err)))
240 })
241 })
242 .collect::<Result<_, sqlx::Error>>()
243 .map_err(Error::from)?)
244 }
245 async fn tabular_exists(&self, identifier: &Identifier) -> Result<bool, IcebergError> {
246 let catalog_name = self.name.clone();
247 let namespace = identifier.namespace().to_string();
248 let name = identifier.name().to_string();
249
250 let rows = {
251 sqlx::query(&format!("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name,
252 &namespace,
253 &name)).fetch_all(&self.pool).await.map_err(Error::from)?
254 };
255 let mut iter = rows.iter().map(query_map);
256
257 Ok(iter.next().is_some())
258 }
259 async fn drop_table(&self, identifier: &Identifier) -> Result<(), IcebergError> {
260 let catalog_name = self.name.clone();
261 let namespace = identifier.namespace().to_string();
262 let name = identifier.name().to_string();
263
264 sqlx::query(&format!("delete from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name,
265 &namespace,
266 &name)).execute(&self.pool).await.map_err(Error::from)?;
267 Ok(())
268 }
269 async fn drop_view(&self, identifier: &Identifier) -> Result<(), IcebergError> {
270 let catalog_name = self.name.clone();
271 let namespace = identifier.namespace().to_string();
272 let name = identifier.name().to_string();
273
274 sqlx::query(&format!("delete from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name,
275 &namespace,
276 &name)).execute(&self.pool).await.map_err(Error::from)?;
277 Ok(())
278 }
279 async fn drop_materialized_view(&self, identifier: &Identifier) -> Result<(), IcebergError> {
280 let catalog_name = self.name.clone();
281 let namespace = identifier.namespace().to_string();
282 let name = identifier.name().to_string();
283
284 sqlx::query(&format!("delete from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name,
285 &namespace,
286 &name)).execute(&self.pool).await.map_err(Error::from)?;
287 Ok(())
288 }
289 async fn load_tabular(
290 self: Arc<Self>,
291 identifier: &Identifier,
292 ) -> Result<Tabular, IcebergError> {
293 let path = {
294 let catalog_name = self.name.clone();
295 let namespace = identifier.namespace().to_string();
296 let name = identifier.name().to_string();
297
298 let row = {
299 sqlx::query(&format!("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name,
300 &namespace,
301 &name)).fetch_one(&self.pool).await.map_err(|_| IcebergError::CatalogNotFound)?
302 };
303 let row = query_map(&row).map_err(Error::from)?;
304
305 row.metadata_location
306 };
307
308 let bucket = Bucket::from_path(&path)?;
309 let object_store = self.default_object_store(bucket);
310
311 let bytes = object_store
312 .get(&strip_prefix(&path).as_str().into())
313 .await?
314 .bytes()
315 .await?;
316 let metadata: TabularMetadata = serde_json::from_slice(&bytes)?;
317 self.cache
318 .write()
319 .unwrap()
320 .insert(identifier.clone(), (path.clone(), metadata.clone()));
321 match metadata {
322 TabularMetadata::Table(metadata) => Ok(Tabular::Table(
323 Table::new(
324 identifier.clone(),
325 self.clone(),
326 object_store.clone(),
327 metadata,
328 )
329 .await?,
330 )),
331 TabularMetadata::View(metadata) => Ok(Tabular::View(
332 View::new(identifier.clone(), self.clone(), metadata).await?,
333 )),
334 TabularMetadata::MaterializedView(metadata) => Ok(Tabular::MaterializedView(
335 MaterializedView::new(identifier.clone(), self.clone(), metadata).await?,
336 )),
337 }
338 }
339
340 async fn create_table(
341 self: Arc<Self>,
342 identifier: Identifier,
343 create_table: CreateTable,
344 ) -> Result<Table, IcebergError> {
345 let metadata: TableMetadata = create_table.try_into()?;
346 let location = metadata.location.to_string();
348
349 let bucket = Bucket::from_path(&location)?;
351 let object_store = self.default_object_store(bucket);
352
353 let metadata_location = new_metadata_location(&metadata);
354 object_store
355 .put_metadata(&metadata_location, metadata.as_ref())
356 .await?;
357
358 object_store.put_version_hint(&metadata_location).await.ok();
359 {
360 let catalog_name = self.name.clone();
361 let namespace = identifier.namespace().to_string();
362 let name = identifier.name().to_string();
363 let metadata_location = metadata_location.to_string();
364
365 sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{catalog_name}', '{namespace}', '{name}', '{metadata_location}');")).execute(&self.pool).await.map_err(Error::from)?;
366 }
367 self.cache.write().unwrap().insert(
368 identifier.clone(),
369 (metadata_location.clone(), metadata.clone().into()),
370 );
371 Ok(Table::new(
372 identifier.clone(),
373 self.clone(),
374 object_store.clone(),
375 metadata,
376 )
377 .await?)
378 }
379
380 async fn create_view(
381 self: Arc<Self>,
382 identifier: Identifier,
383 create_view: CreateView<Option<()>>,
384 ) -> Result<View, IcebergError> {
385 let metadata: ViewMetadata = create_view.try_into()?;
386 let location = metadata.location.to_string();
388
389 let bucket = Bucket::from_path(&location)?;
391 let object_store = self.default_object_store(bucket);
392
393 let metadata_location = new_metadata_location(&metadata);
394 object_store
395 .put_metadata(&metadata_location, metadata.as_ref())
396 .await?;
397
398 object_store.put_version_hint(&metadata_location).await.ok();
399 {
400 let catalog_name = self.name.clone();
401 let namespace = identifier.namespace().to_string();
402 let name = identifier.name().to_string();
403 let metadata_location = metadata_location.to_string();
404
405 sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{catalog_name}', '{namespace}', '{name}', '{metadata_location}');")).execute(&self.pool).await.map_err(Error::from)?;
406 }
407 self.cache.write().unwrap().insert(
408 identifier.clone(),
409 (metadata_location.clone(), metadata.clone().into()),
410 );
411 Ok(View::new(identifier.clone(), self.clone(), metadata).await?)
412 }
413
414 async fn create_materialized_view(
415 self: Arc<Self>,
416 identifier: Identifier,
417 create_view: CreateMaterializedView,
418 ) -> Result<MaterializedView, IcebergError> {
419 let (create_view, create_table) = create_view.into();
420 let metadata: MaterializedViewMetadata = create_view.try_into()?;
421 let table_metadata: TableMetadata = create_table.try_into()?;
422 let location = metadata.location.to_string();
424
425 let bucket = Bucket::from_path(&location)?;
427 let object_store = self.default_object_store(bucket);
428
429 let metadata_location = new_metadata_location(&metadata);
430
431 let table_metadata_location = new_metadata_location(&table_metadata);
432 let table_identifier = metadata.current_version(None)?.storage_table();
433 object_store
434 .put_metadata(&metadata_location, metadata.as_ref())
435 .await?;
436 object_store.put_version_hint(&metadata_location).await.ok();
437
438 object_store
439 .put_metadata(&table_metadata_location, table_metadata.as_ref())
440 .await?;
441 {
442 let mut transaction = self.pool.begin().await.map_err(Error::from)?;
443 let catalog_name = self.name.clone();
444 let namespace = identifier.namespace().to_string();
445 let name = identifier.name().to_string();
446 let metadata_location = metadata_location.to_string();
447
448 sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{catalog_name}', '{namespace}', '{name}', '{metadata_location}');")).execute(&mut *transaction).await.map_err(Error::from)?;
449
450 let table_catalog_name = self.name.clone();
451 let table_namespace = table_identifier.namespace().to_string();
452 let table_name = table_identifier.name().to_string();
453 let table_metadata_location = table_metadata_location.to_string();
454
455 sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{table_catalog_name}', '{table_namespace}', '{table_name}', '{table_metadata_location}');")).execute(&mut *transaction).await.map_err(Error::from)?;
456
457 transaction.commit().await.map_err(Error::from)?;
458 }
459 self.cache.write().unwrap().insert(
460 identifier.clone(),
461 (metadata_location.clone(), metadata.clone().into()),
462 );
463 Ok(MaterializedView::new(identifier.clone(), self.clone(), metadata).await?)
464 }
465
466 async fn update_table(self: Arc<Self>, commit: CommitTable) -> Result<Table, IcebergError> {
467 let identifier = commit.identifier;
468 let Some(entry) = self.cache.read().unwrap().get(&identifier).cloned() else {
469 #[allow(clippy::if_same_then_else)]
470 if !matches!(commit.requirements[0], TableRequirement::AssertCreate) {
471 return Err(IcebergError::InvalidFormat(
472 "Create table assertion".to_owned(),
473 ));
474 } else {
475 return Err(IcebergError::InvalidFormat(
476 "Create table assertion".to_owned(),
477 ));
478 }
479 };
480 let (previous_metadata_location, metadata) = entry;
481
482 let bucket = Bucket::from_path(&previous_metadata_location)?;
483 let object_store = self.default_object_store(bucket);
484
485 let TabularMetadata::Table(mut metadata) = metadata else {
486 return Err(IcebergError::InvalidFormat(
487 "Table update on entity that is not a table".to_owned(),
488 ));
489 };
490 if !check_table_requirements(&commit.requirements, &metadata) {
491 return Err(IcebergError::InvalidFormat(
492 "Table requirements not valid".to_owned(),
493 ));
494 }
495 apply_table_updates(&mut metadata, commit.updates)?;
496 let metadata_location = new_metadata_location(&metadata);
497 object_store
498 .put_metadata(&metadata_location, metadata.as_ref())
499 .await?;
500 object_store.put_version_hint(&metadata_location).await.ok();
501
502 let catalog_name = self.name.clone();
503 let namespace = identifier.namespace().to_string();
504 let name = identifier.name().to_string();
505 let metadata_file_location = metadata_location.to_string();
506 let previous_metadata_file_location = previous_metadata_location.to_string();
507
508 sqlx::query(&format!("update iceberg_tables set metadata_location = '{metadata_file_location}', previous_metadata_location = '{previous_metadata_file_location}' where catalog_name = '{catalog_name}' and table_namespace = '{namespace}' and table_name = '{name}' and metadata_location = '{previous_metadata_file_location}';")).execute(&self.pool).await.map_err(Error::from)?;
509
510 self.cache.write().unwrap().insert(
511 identifier.clone(),
512 (metadata_location.clone(), metadata.clone().into()),
513 );
514
515 Ok(Table::new(
516 identifier.clone(),
517 self.clone(),
518 object_store.clone(),
519 metadata,
520 )
521 .await?)
522 }
523
524 async fn update_view(
525 self: Arc<Self>,
526 commit: CommitView<Option<()>>,
527 ) -> Result<View, IcebergError> {
528 let identifier = commit.identifier;
529 let Some(entry) = self.cache.read().unwrap().get(&identifier).cloned() else {
530 return Err(IcebergError::InvalidFormat(
531 "Create table assertion".to_owned(),
532 ));
533 };
534 let (previous_metadata_location, mut metadata) = entry;
535
536 let bucket = Bucket::from_path(&previous_metadata_location)?;
537 let object_store = self.default_object_store(bucket);
538
539 let metadata_location = match &mut metadata {
540 TabularMetadata::View(metadata) => {
541 if !check_view_requirements(&commit.requirements, metadata) {
542 return Err(IcebergError::InvalidFormat(
543 "View requirements not valid".to_owned(),
544 ));
545 }
546 apply_view_updates(metadata, commit.updates)?;
547 let metadata_location = new_metadata_location(&*metadata);
548 object_store
549 .put_metadata(&metadata_location, metadata.as_ref())
550 .await?;
551 object_store.put_version_hint(&metadata_location).await.ok();
552
553 Ok(metadata_location)
554 }
555 _ => Err(IcebergError::InvalidFormat(
556 "View update on entity that is not a view".to_owned(),
557 )),
558 }?;
559
560 let catalog_name = self.name.clone();
561 let namespace = identifier.namespace().to_string();
562 let name = identifier.name().to_string();
563 let metadata_file_location = metadata_location.to_string();
564 let previous_metadata_file_location = previous_metadata_location.to_string();
565
566 sqlx::query(&format!("update iceberg_tables set metadata_location = '{metadata_file_location}', previous_metadata_location = '{previous_metadata_file_location}' where catalog_name = '{catalog_name}' and table_namespace = '{namespace}' and table_name = '{name}' and metadata_location = '{previous_metadata_file_location}';")).execute(&self.pool).await.map_err(Error::from)?;
567 self.cache.write().unwrap().insert(
568 identifier.clone(),
569 (metadata_location.clone(), metadata.clone()),
570 );
571 if let TabularMetadata::View(metadata) = metadata {
572 Ok(View::new(identifier.clone(), self.clone(), metadata).await?)
573 } else {
574 Err(IcebergError::InvalidFormat(
575 "Entity is not a view".to_owned(),
576 ))
577 }
578 }
579 async fn update_materialized_view(
580 self: Arc<Self>,
581 commit: CommitView<Identifier>,
582 ) -> Result<MaterializedView, IcebergError> {
583 let identifier = commit.identifier;
584 let Some(entry) = self.cache.read().unwrap().get(&identifier).cloned() else {
585 return Err(IcebergError::InvalidFormat(
586 "Create table assertion".to_owned(),
587 ));
588 };
589 let (previous_metadata_location, mut metadata) = entry;
590
591 let bucket = Bucket::from_path(&previous_metadata_location)?;
592 let object_store = self.default_object_store(bucket);
593
594 let metadata_location = match &mut metadata {
595 TabularMetadata::MaterializedView(metadata) => {
596 if !check_view_requirements(&commit.requirements, metadata) {
597 return Err(IcebergError::InvalidFormat(
598 "Materialized view requirements not valid".to_owned(),
599 ));
600 }
601 apply_view_updates(metadata, commit.updates)?;
602
603 let metadata_location = new_metadata_location(&*metadata);
604 object_store
605 .put_metadata(&metadata_location, metadata.as_ref())
606 .await?;
607 object_store.put_version_hint(&metadata_location).await.ok();
608
609 Ok(metadata_location)
610 }
611 _ => Err(IcebergError::InvalidFormat(
612 "Materialized view update on entity that is not a materialized view".to_owned(),
613 )),
614 }?;
615
616 let catalog_name = self.name.clone();
617 let namespace = identifier.namespace().to_string();
618 let name = identifier.name().to_string();
619 let metadata_file_location = metadata_location.to_string();
620 let previous_metadata_file_location = previous_metadata_location.to_string();
621
622 sqlx::query(&format!("update iceberg_tables set metadata_location = '{metadata_file_location}', previous_metadata_location = '{previous_metadata_file_location}' where catalog_name = '{catalog_name}' and table_namespace = '{namespace}' and table_name = '{name}' and metadata_location = '{previous_metadata_file_location}';")).execute(&self.pool).await.map_err(Error::from)?;
623 self.cache.write().unwrap().insert(
624 identifier.clone(),
625 (metadata_location.clone(), metadata.clone()),
626 );
627 if let TabularMetadata::MaterializedView(metadata) = metadata {
628 Ok(MaterializedView::new(identifier.clone(), self.clone(), metadata).await?)
629 } else {
630 Err(IcebergError::InvalidFormat(
631 "Entity is not a materialized view".to_owned(),
632 ))
633 }
634 }
635
636 async fn register_table(
637 self: Arc<Self>,
638 identifier: Identifier,
639 metadata_location: &str,
640 ) -> Result<Table, IcebergError> {
641 let bucket = Bucket::from_path(metadata_location)?;
642 let object_store = self.default_object_store(bucket);
643
644 let metadata: TableMetadata = serde_json::from_slice(
645 &object_store
646 .get(&metadata_location.into())
647 .await?
648 .bytes()
649 .await?,
650 )?;
651
652 {
653 let catalog_name = self.name.clone();
654 let namespace = identifier.namespace().to_string();
655 let name = identifier.name().to_string();
656 let metadata_location = metadata_location.to_string();
657
658 sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{catalog_name}', '{namespace}', '{name}', '{metadata_location}');")).execute(&self.pool).await.map_err(Error::from)?;
659 }
660 self.cache.write().unwrap().insert(
661 identifier.clone(),
662 (metadata_location.to_string(), metadata.clone().into()),
663 );
664 Ok(Table::new(
665 identifier.clone(),
666 self.clone(),
667 object_store.clone(),
668 metadata,
669 )
670 .await?)
671 }
672}
673
674impl SqlCatalog {
675 pub fn duplicate(&self, name: &str) -> Self {
676 Self {
677 name: name.to_owned(),
678 pool: self.pool.clone(),
679 object_store: self.object_store.clone(),
680 cache: Arc::new(RwLock::new(HashMap::new())),
681 }
682 }
683}
684
685#[derive(Debug)]
686pub struct SqlCatalogList {
687 pool: AnyPool,
688 object_store: ObjectStoreBuilder,
689}
690
691impl SqlCatalogList {
692 pub async fn new(url: &str, object_store: ObjectStoreBuilder) -> Result<Self, Error> {
693 install_default_drivers();
694
695 let mut pool_options = PoolOptions::new();
696
697 if url.starts_with("sqlite") {
698 pool_options = pool_options.max_connections(1);
699 }
700
701 let pool = AnyPoolOptions::after_connect(pool_options, |connection, _| {
702 Box::pin(async move {
703 connection
704 .execute(
705 "create table if not exists iceberg_tables (
706 catalog_name varchar(255) not null,
707 table_namespace varchar(255) not null,
708 table_name varchar(255) not null,
709 metadata_location varchar(255) not null,
710 previous_metadata_location varchar(255),
711 primary key (catalog_name, table_namespace, table_name)
712 );",
713 )
714 .await?;
715 connection
716 .execute(
717 "create table if not exists iceberg_namespace_properties (
718 catalog_name varchar(255) not null,
719 namespace varchar(255) not null,
720 property_key varchar(255),
721 property_value varchar(255),
722 primary key (catalog_name, namespace, property_key)
723 );",
724 )
725 .await?;
726 Ok(())
727 })
728 })
729 .connect(url)
730 .await?;
731
732 Ok(SqlCatalogList { pool, object_store })
733 }
734}
735
736#[async_trait]
737impl CatalogList for SqlCatalogList {
738 fn catalog(&self, name: &str) -> Option<Arc<dyn Catalog>> {
739 Some(Arc::new(SqlCatalog {
740 name: name.to_owned(),
741 pool: self.pool.clone(),
742 object_store: self.object_store.clone(),
743 cache: Arc::new(RwLock::new(HashMap::new())),
744 }))
745 }
746 async fn list_catalogs(&self) -> Vec<String> {
747 let rows = {
748 sqlx::query("select distinct catalog_name from iceberg_tables;")
749 .fetch_all(&self.pool)
750 .await
751 .map_err(Error::from)
752 .unwrap_or_default()
753 };
754 let iter = rows.iter().map(|row| row.try_get::<String, _>(0));
755
756 iter.collect::<Result<_, sqlx::Error>>()
757 .map_err(Error::from)
758 .unwrap_or_default()
759 }
760}
761
762#[cfg(test)]
763pub mod tests {
764 use datafusion::{
765 arrow::array::{Float64Array, Int64Array},
766 common::tree_node::{TransformedResult, TreeNode},
767 execution::SessionStateBuilder,
768 prelude::SessionContext,
769 };
770 use datafusion_iceberg::{
771 catalog::catalog::IcebergCatalog,
772 planner::{iceberg_transform, IcebergQueryPlanner},
773 };
774 use iceberg_rust::{
775 catalog::{namespace::Namespace, Catalog},
776 object_store::ObjectStoreBuilder,
777 spec::util::strip_prefix,
778 };
779 use object_store::ObjectStoreExt;
780 use testcontainers::{core::ExecCommand, runners::AsyncRunner, ImageExt};
781 use testcontainers_modules::{localstack::LocalStack, postgres::Postgres};
782 use tokio::time::sleep;
783
784 use crate::SqlCatalog;
785 use iceberg_rust::object_store::store::version_hint_content;
786 use std::{sync::Arc, time::Duration};
787
788 #[tokio::test]
789 async fn test_create_update_drop_table() {
790 let localstack = LocalStack::default()
791 .with_env_var("SERVICES", "s3")
792 .with_env_var("AWS_ACCESS_KEY_ID", "user")
793 .with_env_var("AWS_SECRET_ACCESS_KEY", "password")
794 .start()
795 .await
796 .unwrap();
797
798 let command = localstack
799 .exec(ExecCommand::new(vec![
800 "awslocal",
801 "s3api",
802 "create-bucket",
803 "--bucket",
804 "warehouse",
805 ]))
806 .await
807 .unwrap();
808
809 while command.exit_code().await.unwrap().is_none() {
810 sleep(Duration::from_millis(100)).await;
811 }
812
813 let postgres = Postgres::default()
814 .with_db_name("postgres")
815 .with_user("postgres")
816 .with_password("postgres")
817 .start()
818 .await
819 .unwrap();
820
821 let postgres_host = postgres.get_host().await.unwrap();
822 let postgres_port = postgres.get_host_port_ipv4(5432).await.unwrap();
823
824 while command.exit_code().await.unwrap().is_none() {
825 sleep(Duration::from_millis(100)).await;
826 }
827
828 let localstack_host = localstack.get_host().await.unwrap();
829 let localstack_port = localstack.get_host_port_ipv4(4566).await.unwrap();
830
831 let object_store = ObjectStoreBuilder::s3()
832 .with_config("aws_access_key_id", "user")
833 .unwrap()
834 .with_config("aws_secret_access_key", "password")
835 .unwrap()
836 .with_config(
837 "endpoint",
838 format!("http://{localstack_host}:{localstack_port}"),
839 )
840 .unwrap()
841 .with_config("region", "us-east-1")
842 .unwrap()
843 .with_config("allow_http", "true")
844 .unwrap();
845
846 let iceberg_catalog = Arc::new(
847 SqlCatalog::new(
848 &format!("postgres://postgres:postgres@{postgres_host}:{postgres_port}/postgres"),
849 "warehouse",
850 object_store,
851 )
852 .await
853 .unwrap(),
854 );
855
856 let catalog = Arc::new(
857 IcebergCatalog::new(iceberg_catalog.clone(), None)
858 .await
859 .unwrap(),
860 );
861
862 let state = SessionStateBuilder::new()
863 .with_default_features()
864 .with_query_planner(Arc::new(IcebergQueryPlanner::new()))
865 .build();
866
867 let ctx = SessionContext::new_with_state(state);
868
869 ctx.register_catalog("warehouse", catalog);
870
871 let sql = &"CREATE SCHEMA warehouse.tpch;".to_string();
872
873 let plan = ctx.state().create_logical_plan(sql).await.unwrap();
874
875 let transformed = plan.transform(iceberg_transform).data().unwrap();
876
877 ctx.execute_logical_plan(transformed)
878 .await
879 .unwrap()
880 .collect()
881 .await
882 .expect("Failed to execute query plan.");
883
884 let sql = "CREATE EXTERNAL TABLE lineitem (
885 L_ORDERKEY BIGINT NOT NULL,
886 L_PARTKEY BIGINT NOT NULL,
887 L_SUPPKEY BIGINT NOT NULL,
888 L_LINENUMBER INT NOT NULL,
889 L_QUANTITY DOUBLE NOT NULL,
890 L_EXTENDED_PRICE DOUBLE NOT NULL,
891 L_DISCOUNT DOUBLE NOT NULL,
892 L_TAX DOUBLE NOT NULL,
893 L_RETURNFLAG CHAR NOT NULL,
894 L_LINESTATUS CHAR NOT NULL,
895 L_SHIPDATE DATE NOT NULL,
896 L_COMMITDATE DATE NOT NULL,
897 L_RECEIPTDATE DATE NOT NULL,
898 L_SHIPINSTRUCT VARCHAR NOT NULL,
899 L_SHIPMODE VARCHAR NOT NULL,
900 L_COMMENT VARCHAR NOT NULL ) STORED AS CSV LOCATION '../../datafusion_iceberg/testdata/tpch/lineitem.csv' OPTIONS ('has_header' 'false');";
901
902 let plan = ctx.state().create_logical_plan(sql).await.unwrap();
903
904 let transformed = plan.transform(iceberg_transform).data().unwrap();
905
906 ctx.execute_logical_plan(transformed)
907 .await
908 .unwrap()
909 .collect()
910 .await
911 .expect("Failed to execute query plan.");
912
913 let sql = "CREATE EXTERNAL TABLE warehouse.tpch.lineitem (
914 L_ORDERKEY BIGINT NOT NULL,
915 L_PARTKEY BIGINT NOT NULL,
916 L_SUPPKEY BIGINT NOT NULL,
917 L_LINENUMBER INT NOT NULL,
918 L_QUANTITY DOUBLE NOT NULL,
919 L_EXTENDED_PRICE DOUBLE NOT NULL,
920 L_DISCOUNT DOUBLE NOT NULL,
921 L_TAX DOUBLE NOT NULL,
922 L_RETURNFLAG CHAR NOT NULL,
923 L_LINESTATUS CHAR NOT NULL,
924 L_SHIPDATE DATE NOT NULL,
925 L_COMMITDATE DATE NOT NULL,
926 L_RECEIPTDATE DATE NOT NULL,
927 L_SHIPINSTRUCT VARCHAR NOT NULL,
928 L_SHIPMODE VARCHAR NOT NULL,
929 L_COMMENT VARCHAR NOT NULL ) STORED AS ICEBERG LOCATION 's3://warehouse/tpch/lineitem' PARTITIONED BY ( \"month(L_SHIPDATE)\" );";
930
931 let plan = ctx.state().create_logical_plan(sql).await.unwrap();
932
933 let transformed = plan.transform(iceberg_transform).data().unwrap();
934
935 ctx.execute_logical_plan(transformed)
936 .await
937 .unwrap()
938 .collect()
939 .await
940 .expect("Failed to execute query plan.");
941
942 let tables = iceberg_catalog
943 .clone()
944 .list_tabulars(
945 &Namespace::try_new(&["tpch".to_owned()]).expect("Failed to create namespace"),
946 )
947 .await
948 .expect("Failed to list Tables");
949 assert_eq!(tables[0].to_string(), "tpch.lineitem".to_owned());
950
951 let sql = "insert into warehouse.tpch.lineitem select * from lineitem;";
952
953 let plan = ctx.state().create_logical_plan(sql).await.unwrap();
954
955 let transformed = plan.transform(iceberg_transform).data().unwrap();
956
957 ctx.execute_logical_plan(transformed)
958 .await
959 .unwrap()
960 .collect()
961 .await
962 .expect("Failed to execute query plan.");
963
964 let batches = ctx
965 .sql("select sum(L_QUANTITY), L_PARTKEY from warehouse.tpch.lineitem group by L_PARTKEY;")
966 .await
967 .expect("Failed to create plan for select")
968 .collect()
969 .await
970 .expect("Failed to execute select query");
971
972 let mut once = false;
973
974 for batch in batches {
975 if batch.num_rows() != 0 {
976 let (amounts, product_ids) = (
977 batch
978 .column(0)
979 .as_any()
980 .downcast_ref::<Float64Array>()
981 .unwrap(),
982 batch
983 .column(1)
984 .as_any()
985 .downcast_ref::<Int64Array>()
986 .unwrap(),
987 );
988 for (product_id, amount) in product_ids.iter().zip(amounts) {
989 if product_id.unwrap() == 24027 {
990 assert_eq!(amount.unwrap(), 24.0)
991 } else if product_id.unwrap() == 63700 {
992 assert_eq!(amount.unwrap(), 23.0)
993 }
994 }
995 once = true
996 }
997 }
998
999 assert!(once);
1000
1001 let object_store = iceberg_catalog
1002 .default_object_store(iceberg_rust::object_store::Bucket::S3("warehouse"));
1003
1004 let version_hint = object_store
1005 .get(&strip_prefix("s3://warehouse/tpch/lineitem/metadata/version-hint.text").into())
1006 .await
1007 .unwrap()
1008 .bytes()
1009 .await
1010 .unwrap();
1011
1012 let cache = iceberg_catalog.cache.read().unwrap();
1013 let keys = cache.values().collect::<Vec<_>>();
1014 let version = version_hint_content(&keys[0].clone().0);
1015
1016 assert_eq!(std::str::from_utf8(&version_hint).unwrap(), version);
1017 }
1018}