1use std::{
2 collections::HashMap,
3 convert::identity,
4 sync::{Arc, RwLock},
5};
6
7use async_trait::async_trait;
8use futures::{future, TryStreamExt};
9use iceberg_rust::{
10 catalog::{
11 commit::{
12 apply_table_updates, apply_view_updates, check_table_requirements,
13 check_view_requirements, CommitTable, CommitView, TableRequirement,
14 },
15 create::{CreateMaterializedView, CreateTable, CreateView},
16 identifier::Identifier,
17 namespace::Namespace,
18 tabular::Tabular,
19 Catalog, CatalogList,
20 },
21 error::Error as IcebergError,
22 materialized_view::MaterializedView,
23 object_store::{store::IcebergStore, Bucket, ObjectStoreBuilder},
24 spec::{
25 identifier::FullIdentifier,
26 materialized_view_metadata::MaterializedViewMetadata,
27 table_metadata::{new_metadata_location, TableMetadata},
28 tabular::TabularMetadata,
29 util::strip_prefix,
30 view_metadata::ViewMetadata,
31 },
32 table::Table,
33 view::View,
34};
35use object_store::ObjectStore;
36
37use crate::error::Error;
38
39#[derive(Debug)]
40pub struct FileCatalog {
41 path: String,
42 object_store: ObjectStoreBuilder,
43 cache: Arc<RwLock<HashMap<Identifier, (String, TabularMetadata)>>>,
44}
45
46pub mod error;
47
48impl FileCatalog {
49 pub async fn new(path: &str, object_store: ObjectStoreBuilder) -> Result<Self, Error> {
50 Ok(FileCatalog {
51 path: path.to_owned(),
52 object_store,
53 cache: Arc::new(RwLock::new(HashMap::new())),
54 })
55 }
56
57 pub fn catalog_list(&self) -> Arc<FileCatalogList> {
58 Arc::new(FileCatalogList {
59 path: self.path.clone(),
60 object_store: self.object_store.clone(),
61 })
62 }
63}
64
65#[async_trait]
66impl Catalog for FileCatalog {
67 fn name(&self) -> &str {
69 self.path.trim_end_matches('/').split("/").last().unwrap()
70 }
71 async fn create_namespace(
73 &self,
74 _namespace: &Namespace,
75 _properties: Option<HashMap<String, String>>,
76 ) -> Result<HashMap<String, String>, IcebergError> {
77 Ok(HashMap::new())
78 }
79 async fn drop_namespace(&self, _namespace: &Namespace) -> Result<(), IcebergError> {
81 todo!()
82 }
83 async fn load_namespace(
85 &self,
86 _namespace: &Namespace,
87 ) -> Result<HashMap<String, String>, IcebergError> {
88 todo!()
89 }
90 async fn update_namespace(
92 &self,
93 _namespace: &Namespace,
94 _updates: Option<HashMap<String, String>>,
95 _removals: Option<Vec<String>>,
96 ) -> Result<(), IcebergError> {
97 todo!()
98 }
99 async fn namespace_exists(&self, _namespace: &Namespace) -> Result<bool, IcebergError> {
101 todo!()
102 }
103 async fn list_tabulars(&self, namespace: &Namespace) -> Result<Vec<Identifier>, IcebergError> {
104 let bucket = Bucket::from_path(&self.path)?;
105 let object_store = self.object_store.build(bucket)?;
106
107 object_store
108 .list(Some(
109 &strip_prefix(&self.namespace_path(&namespace[0])).into(),
110 ))
111 .map_err(IcebergError::from)
112 .map_ok(|x| {
113 let path = x.location.as_ref();
114 self.identifier(path)
115 })
116 .try_collect()
117 .await
118 }
119 async fn list_namespaces(&self, _parent: Option<&str>) -> Result<Vec<Namespace>, IcebergError> {
120 let bucket = Bucket::from_path(&self.path)?;
121 let object_store = self.object_store.build(bucket)?;
122
123 object_store
124 .list_with_delimiter(Some(
125 &strip_prefix(self.path.trim_start_matches('/')).into(),
126 ))
127 .await
128 .map_err(IcebergError::from)?
129 .common_prefixes
130 .into_iter()
131 .map(|x| self.namespace(x.as_ref()))
132 .collect::<Result<_, IcebergError>>()
133 }
134 async fn tabular_exists(&self, identifier: &Identifier) -> Result<bool, IcebergError> {
135 self.metadata_location(identifier)
136 .await
137 .map(|_| true)
138 .or(Ok(false))
139 }
140 async fn drop_table(&self, _identifierr: &Identifier) -> Result<(), IcebergError> {
141 todo!()
142 }
143 async fn drop_view(&self, _identifier: &Identifier) -> Result<(), IcebergError> {
144 todo!()
145 }
146 async fn drop_materialized_view(&self, _identifier: &Identifier) -> Result<(), IcebergError> {
147 todo!()
148 }
149 async fn load_tabular(
150 self: Arc<Self>,
151 identifier: &Identifier,
152 ) -> Result<Tabular, IcebergError> {
153 let bucket = Bucket::from_path(&self.path)?;
154 let object_store = self.object_store.build(bucket)?;
155
156 let metadata_location = self.metadata_location(identifier).await?;
157
158 let bytes = object_store
159 .get(&strip_prefix(&metadata_location).as_str().into())
160 .await
161 .map_err(|_| IcebergError::CatalogNotFound)?
162 .bytes()
163 .await?;
164
165 let metadata: TabularMetadata = serde_json::from_slice(&bytes)?;
166
167 self.cache.write().unwrap().insert(
168 identifier.clone(),
169 (metadata_location.clone(), metadata.clone()),
170 );
171
172 match metadata {
173 TabularMetadata::Table(metadata) => Ok(Tabular::Table(
174 Table::new(
175 identifier.clone(),
176 self.clone(),
177 object_store.clone(),
178 metadata,
179 )
180 .await?,
181 )),
182 TabularMetadata::View(metadata) => Ok(Tabular::View(
183 View::new(identifier.clone(), self.clone(), metadata).await?,
184 )),
185 TabularMetadata::MaterializedView(metadata) => Ok(Tabular::MaterializedView(
186 MaterializedView::new(identifier.clone(), self.clone(), metadata).await?,
187 )),
188 }
189 }
190
191 async fn create_table(
192 self: Arc<Self>,
193 identifier: Identifier,
194 mut create_table: CreateTable,
195 ) -> Result<Table, IcebergError> {
196 if self.tabular_exists(&identifier).await.is_ok_and(identity) {
197 return Err(IcebergError::InvalidFormat(
198 "Table already exists. Path".to_owned(),
199 ));
200 }
201 create_table.location =
202 Some(self.tabular_path(&identifier.namespace()[0], identifier.name()));
203 let metadata: TableMetadata = create_table.try_into()?;
204 let location = metadata.location.to_string();
206
207 let bucket = Bucket::from_path(&location)?;
209 let object_store = self.default_object_store(bucket);
210
211 let metadata_location = location + "/metadata/v0.metadata.json";
212
213 object_store
214 .put_metadata(&metadata_location, metadata.as_ref())
215 .await?;
216
217 object_store.put_version_hint(&metadata_location).await.ok();
218
219 self.cache.write().unwrap().insert(
220 identifier.clone(),
221 (metadata_location.clone(), metadata.clone().into()),
222 );
223 Ok(Table::new(
224 identifier.clone(),
225 self.clone(),
226 object_store.clone(),
227 metadata,
228 )
229 .await?)
230 }
231
232 async fn create_view(
233 self: Arc<Self>,
234 identifier: Identifier,
235 mut create_view: CreateView<Option<()>>,
236 ) -> Result<View, IcebergError> {
237 if self.tabular_exists(&identifier).await.is_ok_and(identity) {
238 return Err(IcebergError::InvalidFormat(
239 "View already exists. Path".to_owned(),
240 ));
241 }
242
243 create_view.location =
244 Some(self.tabular_path(&identifier.namespace()[0], identifier.name()));
245 let metadata: ViewMetadata = create_view.try_into()?;
246 let location = metadata.location.to_string();
248
249 let bucket = Bucket::from_path(&location)?;
251 let object_store = self.default_object_store(bucket);
252
253 let metadata_location = location + "/metadata/v0.metadata.json";
254
255 object_store
256 .put_metadata(&metadata_location, metadata.as_ref())
257 .await?;
258
259 object_store.put_version_hint(&metadata_location).await.ok();
260
261 self.cache.write().unwrap().insert(
262 identifier.clone(),
263 (metadata_location.clone(), metadata.clone().into()),
264 );
265 Ok(View::new(identifier.clone(), self.clone(), metadata).await?)
266 }
267
268 async fn create_materialized_view(
269 self: Arc<Self>,
270 identifier: Identifier,
271 create_view: CreateMaterializedView,
272 ) -> Result<MaterializedView, IcebergError> {
273 if self.tabular_exists(&identifier).await.is_ok_and(identity) {
274 return Err(IcebergError::InvalidFormat(
275 "View already exists. Path".to_owned(),
276 ));
277 }
278
279 let (mut create_view, mut create_table) = create_view.into();
280
281 create_view.location =
282 Some(self.tabular_path(&identifier.namespace()[0], identifier.name()));
283 let metadata: MaterializedViewMetadata = create_view.try_into()?;
284 let table_identifier = metadata.current_version(None)?.storage_table();
285
286 create_table.location =
287 Some(self.tabular_path(&table_identifier.namespace()[0], table_identifier.name()));
288 let table_metadata: TableMetadata = create_table.try_into()?;
289 let location = metadata.location.to_string();
291
292 let bucket = Bucket::from_path(&location)?;
294 let object_store = self.default_object_store(bucket);
295
296 let metadata_location = location + "/metadata/v0.metadata.json";
297
298 let table_metadata_location =
299 table_metadata.location.clone() + "/metadata/v0.metadata.json";
300
301 object_store
302 .put_metadata(&metadata_location, metadata.as_ref())
303 .await?;
304
305 object_store.put_version_hint(&metadata_location).await.ok();
306
307 object_store
308 .put_metadata(&table_metadata_location, table_metadata.as_ref())
309 .await?;
310
311 self.cache.write().unwrap().insert(
312 identifier.clone(),
313 (metadata_location.clone(), metadata.clone().into()),
314 );
315
316 Ok(MaterializedView::new(identifier.clone(), self.clone(), metadata).await?)
317 }
318
319 async fn update_table(self: Arc<Self>, commit: CommitTable) -> Result<Table, IcebergError> {
320 let bucket = Bucket::from_path(&self.path)?;
321 let object_store = self.object_store.build(bucket)?;
322
323 let identifier = commit.identifier;
324 let Some(entry) = self.cache.read().unwrap().get(&identifier).cloned() else {
325 #[allow(clippy::if_same_then_else)]
326 if !matches!(commit.requirements[0], TableRequirement::AssertCreate) {
327 return Err(IcebergError::InvalidFormat(
328 "Create table assertion".to_owned(),
329 ));
330 } else {
331 return Err(IcebergError::InvalidFormat(
332 "Create table assertion".to_owned(),
333 ));
334 }
335 };
336 let (previous_metadata_location, metadata) = entry;
337
338 let TabularMetadata::Table(mut metadata) = metadata else {
339 return Err(IcebergError::InvalidFormat(
340 "Table update on entity that is not a table".to_owned(),
341 ));
342 };
343 if !check_table_requirements(&commit.requirements, &metadata) {
344 return Err(IcebergError::InvalidFormat(
345 "Table requirements not valid".to_owned(),
346 ));
347 }
348 apply_table_updates(&mut metadata, commit.updates)?;
349 let temp_metadata_location = new_metadata_location(&metadata);
350
351 object_store
352 .put_metadata(&temp_metadata_location, metadata.as_ref())
353 .await?;
354
355 let metadata_location =
356 new_filesystem_metadata_location(&metadata.location, &previous_metadata_location)?;
357
358 object_store
359 .copy_if_not_exists(
360 &strip_prefix(&temp_metadata_location).into(),
361 &strip_prefix(&metadata_location).into(),
362 )
363 .await?;
364
365 object_store.put_version_hint(&metadata_location).await.ok();
366
367 self.cache.write().unwrap().insert(
368 identifier.clone(),
369 (metadata_location.clone(), metadata.clone().into()),
370 );
371
372 Ok(Table::new(
373 identifier.clone(),
374 self.clone(),
375 object_store.clone(),
376 metadata,
377 )
378 .await?)
379 }
380
381 async fn update_view(
382 self: Arc<Self>,
383 commit: CommitView<Option<()>>,
384 ) -> Result<View, IcebergError> {
385 let bucket = Bucket::from_path(&self.path)?;
386 let object_store = self.object_store.build(bucket)?;
387
388 let identifier = commit.identifier;
389 let Some(entry) = self.cache.read().unwrap().get(&identifier).cloned() else {
390 return Err(IcebergError::InvalidFormat(
391 "Create table assertion".to_owned(),
392 ));
393 };
394 let (previous_metadata_location, mut metadata) = entry;
395 let metadata_location = match &mut metadata {
396 TabularMetadata::View(metadata) => {
397 if !check_view_requirements(&commit.requirements, metadata) {
398 return Err(IcebergError::InvalidFormat(
399 "View requirements not valid".to_owned(),
400 ));
401 }
402 apply_view_updates(metadata, commit.updates)?;
403 let temp_metadata_location = new_metadata_location(&*metadata);
404
405 object_store
406 .put_metadata(&temp_metadata_location, metadata.as_ref())
407 .await?;
408
409 let metadata_location = new_filesystem_metadata_location(
410 &metadata.location,
411 &previous_metadata_location,
412 )?;
413
414 object_store
415 .copy_if_not_exists(
416 &strip_prefix(&temp_metadata_location).into(),
417 &strip_prefix(&metadata_location).into(),
418 )
419 .await?;
420
421 object_store.put_version_hint(&metadata_location).await.ok();
422
423 Ok(metadata_location)
424 }
425 _ => Err(IcebergError::InvalidFormat(
426 "View update on entity that is not a view".to_owned(),
427 )),
428 }?;
429
430 self.cache.write().unwrap().insert(
431 identifier.clone(),
432 (metadata_location.clone(), metadata.clone()),
433 );
434 if let TabularMetadata::View(metadata) = metadata {
435 Ok(View::new(identifier.clone(), self.clone(), metadata).await?)
436 } else {
437 Err(IcebergError::InvalidFormat(
438 "Entity is not a view".to_owned(),
439 ))
440 }
441 }
442 async fn update_materialized_view(
443 self: Arc<Self>,
444 commit: CommitView<FullIdentifier>,
445 ) -> Result<MaterializedView, IcebergError> {
446 let bucket = Bucket::from_path(&self.path)?;
447 let object_store = self.object_store.build(bucket)?;
448
449 let identifier = commit.identifier;
450 let Some(entry) = self.cache.read().unwrap().get(&identifier).cloned() else {
451 return Err(IcebergError::InvalidFormat(
452 "Create table assertion".to_owned(),
453 ));
454 };
455 let (previous_metadata_location, mut metadata) = entry;
456 let metadata_location = match &mut metadata {
457 TabularMetadata::MaterializedView(metadata) => {
458 if !check_view_requirements(&commit.requirements, metadata) {
459 return Err(IcebergError::InvalidFormat(
460 "Materialized view requirements not valid".to_owned(),
461 ));
462 }
463 apply_view_updates(metadata, commit.updates)?;
464 let temp_metadata_location = new_metadata_location(&*metadata);
465
466 object_store
467 .put_metadata(&temp_metadata_location, metadata.as_ref())
468 .await?;
469
470 let metadata_location = new_filesystem_metadata_location(
471 &metadata.location,
472 &previous_metadata_location,
473 )?;
474
475 object_store
476 .copy_if_not_exists(
477 &strip_prefix(&temp_metadata_location).into(),
478 &strip_prefix(&metadata_location).into(),
479 )
480 .await?;
481
482 object_store.put_version_hint(&metadata_location).await.ok();
483
484 Ok(metadata_location)
485 }
486 _ => Err(IcebergError::InvalidFormat(
487 "Materialized view update on entity that is not a materialized view".to_owned(),
488 )),
489 }?;
490
491 self.cache.write().unwrap().insert(
492 identifier.clone(),
493 (metadata_location.clone(), metadata.clone()),
494 );
495 if let TabularMetadata::MaterializedView(metadata) = metadata {
496 Ok(MaterializedView::new(identifier.clone(), self.clone(), metadata).await?)
497 } else {
498 Err(IcebergError::InvalidFormat(
499 "Entity is not a materialized view".to_owned(),
500 ))
501 }
502 }
503
504 async fn register_table(
505 self: Arc<Self>,
506 _identifier: Identifier,
507 _metadata_location: &str,
508 ) -> Result<Table, IcebergError> {
509 unimplemented!()
510 }
511}
512
513impl FileCatalog {
514 fn default_object_store(&self, bucket: Bucket) -> Arc<dyn object_store::ObjectStore> {
515 Arc::new(self.object_store.build(bucket).unwrap())
516 }
517 fn namespace_path(&self, namespace: &str) -> String {
518 self.path.as_str().trim_end_matches('/').to_owned() + "/" + namespace
519 }
520
521 fn tabular_path(&self, namespace: &str, name: &str) -> String {
522 self.path.as_str().trim_end_matches('/').to_owned() + "/" + namespace + "/" + name
523 }
524
525 async fn metadata_location(&self, identifier: &Identifier) -> Result<String, IcebergError> {
526 let bucket = Bucket::from_path(&self.path)?;
527 let object_store = self.object_store.build(bucket)?;
528
529 let path = self.tabular_path(&identifier.namespace()[0], identifier.name()) + "/metadata";
530 let mut files: Vec<String> = object_store
531 .list(Some(&strip_prefix(&path).into()))
532 .map_ok(|x| x.location.to_string())
533 .try_filter(|x| {
534 future::ready(
535 x.ends_with("metadata.json")
536 && x.starts_with((strip_prefix(&path) + "/v").trim_start_matches('/')),
537 )
538 })
539 .try_collect()
540 .await
541 .map_err(IcebergError::from)?;
542 files.sort_by(|x, y| {
543 let x = x
544 .trim_start_matches((strip_prefix(&path) + "/v").trim_start_matches("/"))
545 .trim_end_matches("/")
546 .trim_end_matches(".metadata.json")
547 .parse::<usize>()
548 .unwrap();
549 let y = y
550 .trim_start_matches((strip_prefix(&path) + "/v").trim_start_matches("/"))
551 .trim_end_matches("/")
552 .trim_end_matches(".metadata.json")
553 .parse::<usize>()
554 .unwrap();
555 x.cmp(&y)
556 });
557 files
558 .into_iter()
559 .next_back()
560 .ok_or(IcebergError::CatalogNotFound)
561 }
562
563 fn identifier(&self, path: &str) -> Identifier {
564 let parts: Vec<&str> = trim_start_path(path)
565 .trim_start_matches(trim_start_path(&self.path))
566 .trim_start_matches('/')
567 .split('/')
568 .take(2)
569 .collect();
570 Identifier::new(&[parts[0].to_owned()], parts[1])
571 }
572
573 fn namespace(&self, path: &str) -> Result<Namespace, IcebergError> {
574 let parts = trim_start_path(path)
575 .trim_start_matches(trim_start_path(&self.path))
576 .trim_start_matches('/')
577 .split('/')
578 .next()
579 .ok_or(IcebergError::InvalidFormat("Namespace in path".to_owned()))?
580 .to_owned();
581 Namespace::try_new(&[parts]).map_err(IcebergError::from)
582 }
583}
584
585fn trim_start_path(path: &str) -> &str {
586 path.trim_start_matches('/').trim_start_matches("s3://")
587}
588
589fn parse_version(path: &str) -> Result<u64, IcebergError> {
590 path.split('/')
591 .next_back()
592 .ok_or(IcebergError::InvalidFormat("Metadata location".to_owned()))?
593 .trim_start_matches('v')
594 .trim_end_matches(".metadata.json")
595 .parse()
596 .map_err(IcebergError::from)
597}
598
599fn new_filesystem_metadata_location(
600 metadata_location: &str,
601 previous_metadata_location: &str,
602) -> Result<String, IcebergError> {
603 let current_version = parse_version(previous_metadata_location)? + 1;
604 Ok(metadata_location.to_string()
605 + "/metadata/v"
606 + ¤t_version.to_string()
607 + ".metadata.json")
608}
609
610#[derive(Debug)]
611pub struct FileCatalogList {
612 path: String,
613 object_store: ObjectStoreBuilder,
614}
615
616impl FileCatalogList {
617 pub async fn new(path: &str, object_store: ObjectStoreBuilder) -> Result<Self, Error> {
618 Ok(FileCatalogList {
619 path: path.to_owned(),
620 object_store,
621 })
622 }
623
624 fn parse_catalog(&self, path: &str) -> Result<String, IcebergError> {
625 trim_start_path(path.trim_start_matches(trim_start_path(&self.path)))
626 .trim_start_matches('/')
627 .split('/')
628 .next()
629 .ok_or(IcebergError::InvalidFormat("Catalog in path".to_owned()))
630 .map(ToOwned::to_owned)
631 }
632}
633
634#[async_trait]
635impl CatalogList for FileCatalogList {
636 fn catalog(&self, name: &str) -> Option<Arc<dyn Catalog>> {
637 Some(Arc::new(FileCatalog {
638 path: self.path.clone() + "/" + name,
639 object_store: self.object_store.clone(),
640 cache: Arc::new(RwLock::new(HashMap::new())),
641 }))
642 }
643 async fn list_catalogs(&self) -> Vec<String> {
644 let bucket = Bucket::from_path(&self.path).unwrap();
645 let object_store = self.object_store.build(bucket).unwrap();
646
647 object_store
648 .list_with_delimiter(Some(&strip_prefix(trim_start_path(&self.path)).into()))
649 .await
650 .map_err(IcebergError::from)
651 .unwrap()
652 .common_prefixes
653 .into_iter()
654 .map(|x| self.parse_catalog(x.as_ref()))
655 .collect::<Result<_, IcebergError>>()
656 .unwrap()
657 }
658}
659
660#[cfg(test)]
661pub mod tests {
662 use datafusion::{
663 arrow::array::{Float64Array, Int64Array},
664 common::tree_node::{TransformedResult, TreeNode},
665 execution::SessionStateBuilder,
666 prelude::{SessionConfig, SessionContext},
667 };
668 use datafusion_iceberg::{
669 catalog::catalog::IcebergCatalog,
670 planner::{iceberg_transform, IcebergQueryPlanner},
671 };
672 use futures::StreamExt;
673 use iceberg_rust::{
674 catalog::{namespace::Namespace, Catalog},
675 object_store::{Bucket, ObjectStoreBuilder},
676 spec::util::strip_prefix,
677 };
678 use std::{sync::Arc, time::Duration};
679 use testcontainers::{core::ExecCommand, runners::AsyncRunner, ImageExt};
680 use testcontainers_modules::localstack::LocalStack;
681 use tokio::time::sleep;
682 use crate::FileCatalog;
686
687 #[tokio::test]
688 async fn test_create_update_drop_table() {
689 let localstack = LocalStack::default()
690 .with_env_var("SERVICES", "s3")
691 .with_env_var("AWS_ACCESS_KEY_ID", "user")
692 .with_env_var("AWS_SECRET_ACCESS_KEY", "password")
693 .start()
694 .await
695 .unwrap();
696
697 let command = localstack
698 .exec(ExecCommand::new(vec![
699 "awslocal",
700 "s3api",
701 "create-bucket",
702 "--bucket",
703 "warehouse",
704 ]))
705 .await
706 .unwrap();
707
708 while command.exit_code().await.unwrap().is_none() {
709 sleep(Duration::from_millis(100)).await;
710 }
711
712 let localstack_host = localstack.get_host().await.unwrap();
713 let localstack_port = localstack.get_host_port_ipv4(4566).await.unwrap();
714
715 let object_store = ObjectStoreBuilder::s3()
716 .with_config("aws_access_key_id", "user")
717 .unwrap()
718 .with_config("aws_secret_access_key", "password")
719 .unwrap()
720 .with_config(
721 "endpoint",
722 format!("http://{localstack_host}:{localstack_port}"),
723 )
724 .unwrap()
725 .with_config("region", "us-east-1")
726 .unwrap()
727 .with_config("allow_http", "true")
728 .unwrap();
729 let iceberg_catalog: Arc<dyn Catalog> = Arc::new(
732 FileCatalog::new("s3://warehouse", object_store.clone())
733 .await
734 .unwrap(),
735 );
736
737 let catalog = Arc::new(
738 IcebergCatalog::new(iceberg_catalog.clone(), None)
739 .await
740 .unwrap(),
741 );
742
743 let mut config = SessionConfig::default();
744
745 config.options_mut().execution.minimum_parallel_output_files = 1;
746 config
747 .options_mut()
748 .execution
749 .parquet
750 .maximum_parallel_row_group_writers = 4;
751
752 let state = SessionStateBuilder::new()
753 .with_config(config)
754 .with_default_features()
755 .with_query_planner(Arc::new(IcebergQueryPlanner::new()))
756 .build();
757
758 let ctx = SessionContext::new_with_state(state);
759
760 ctx.register_catalog("warehouse", catalog);
761
762 let sql = &"CREATE SCHEMA warehouse.tpch;".to_string();
763
764 let plan = ctx.state().create_logical_plan(sql).await.unwrap();
765
766 let transformed = plan.transform(iceberg_transform).data().unwrap();
767
768 ctx.execute_logical_plan(transformed)
769 .await
770 .unwrap()
771 .collect()
772 .await
773 .expect("Failed to execute query plan.");
774
775 let sql = "CREATE EXTERNAL TABLE lineitem (
776 L_ORDERKEY BIGINT NOT NULL,
777 L_PARTKEY BIGINT NOT NULL,
778 L_SUPPKEY BIGINT NOT NULL,
779 L_LINENUMBER INT NOT NULL,
780 L_QUANTITY DOUBLE NOT NULL,
781 L_EXTENDED_PRICE DOUBLE NOT NULL,
782 L_DISCOUNT DOUBLE NOT NULL,
783 L_TAX DOUBLE NOT NULL,
784 L_RETURNFLAG CHAR NOT NULL,
785 L_LINESTATUS CHAR NOT NULL,
786 L_SHIPDATE DATE NOT NULL,
787 L_COMMITDATE DATE NOT NULL,
788 L_RECEIPTDATE DATE NOT NULL,
789 L_SHIPINSTRUCT VARCHAR NOT NULL,
790 L_SHIPMODE VARCHAR NOT NULL,
791 L_COMMENT VARCHAR NOT NULL ) STORED AS CSV LOCATION '../../datafusion_iceberg/testdata/tpch/lineitem.csv' OPTIONS ('has_header' 'false');";
792
793 let plan = ctx.state().create_logical_plan(sql).await.unwrap();
794
795 let transformed = plan.transform(iceberg_transform).data().unwrap();
796
797 ctx.execute_logical_plan(transformed)
798 .await
799 .unwrap()
800 .collect()
801 .await
802 .expect("Failed to execute query plan.");
803
804 let sql = "CREATE EXTERNAL TABLE warehouse.tpch.lineitem (
805 L_ORDERKEY BIGINT NOT NULL,
806 L_PARTKEY BIGINT NOT NULL,
807 L_SUPPKEY BIGINT NOT NULL,
808 L_LINENUMBER INT NOT NULL,
809 L_QUANTITY DOUBLE NOT NULL,
810 L_EXTENDED_PRICE DOUBLE NOT NULL,
811 L_DISCOUNT DOUBLE NOT NULL,
812 L_TAX DOUBLE NOT NULL,
813 L_RETURNFLAG CHAR NOT NULL,
814 L_LINESTATUS CHAR NOT NULL,
815 L_SHIPDATE DATE NOT NULL,
816 L_COMMITDATE DATE NOT NULL,
817 L_RECEIPTDATE DATE NOT NULL,
818 L_SHIPINSTRUCT VARCHAR NOT NULL,
819 L_SHIPMODE VARCHAR NOT NULL,
820 L_COMMENT VARCHAR NOT NULL ) STORED AS ICEBERG LOCATION 's3://warehouse/tpch/lineitem';";
821
822 let plan = ctx.state().create_logical_plan(sql).await.unwrap();
823
824 let transformed = plan.transform(iceberg_transform).data().unwrap();
825
826 ctx.execute_logical_plan(transformed)
827 .await
828 .unwrap()
829 .collect()
830 .await
831 .expect("Failed to execute query plan.");
832
833 let tables = iceberg_catalog
834 .clone()
835 .list_tabulars(
836 &Namespace::try_new(&["tpch".to_owned()]).expect("Failed to create namespace"),
837 )
838 .await
839 .expect("Failed to list Tables");
840 assert_eq!(tables[0].to_string(), "tpch.lineitem".to_owned());
841
842 let sql = "insert into warehouse.tpch.lineitem select * from lineitem;";
843
844 let plan = ctx.state().create_logical_plan(sql).await.unwrap();
845
846 let transformed = plan.transform(iceberg_transform).data().unwrap();
847
848 ctx.execute_logical_plan(transformed)
849 .await
850 .unwrap()
851 .collect()
852 .await
853 .expect("Failed to execute query plan.");
854
855 let batches = ctx
856 .sql("select sum(L_QUANTITY), L_PARTKEY from warehouse.tpch.lineitem group by L_PARTKEY;")
857 .await
858 .expect("Failed to create plan for select")
859 .collect()
860 .await
861 .expect("Failed to execute select query");
862
863 let mut once = false;
864
865 for batch in batches {
866 if batch.num_rows() != 0 {
867 let (amounts, product_ids) = (
868 batch
869 .column(0)
870 .as_any()
871 .downcast_ref::<Float64Array>()
872 .unwrap(),
873 batch
874 .column(1)
875 .as_any()
876 .downcast_ref::<Int64Array>()
877 .unwrap(),
878 );
879 for (product_id, amount) in product_ids.iter().zip(amounts) {
880 if product_id.unwrap() == 24027 {
881 assert_eq!(amount.unwrap(), 24.0)
882 } else if product_id.unwrap() == 63700 {
883 assert_eq!(amount.unwrap(), 23.0)
884 }
885 }
886 once = true
887 }
888 }
889
890 assert!(once);
891
892 let object_store = object_store
893 .build(Bucket::from_path("s3://warehouse").unwrap())
894 .unwrap();
895
896 let version_hint = object_store
897 .get(&strip_prefix("s3://warehouse/tpch/lineitem/metadata/version-hint.text").into())
898 .await
899 .unwrap()
900 .bytes()
901 .await
902 .unwrap();
903
904 assert_eq!(std::str::from_utf8(&version_hint).unwrap(), "1");
905
906 let files = object_store.list(None).collect::<Vec<_>>().await;
907
908 assert_eq!(
909 files
910 .iter()
911 .filter(|x| x
912 .as_ref()
913 .unwrap()
914 .location
915 .extension()
916 .is_some_and(|x| x == "parquet"))
917 .count(),
918 1
919 );
920 }
921
922 #[tokio::test]
923 async fn test_namespace_path_normal_case() {
924 let test_struct = FileCatalog::new("/base/path", ObjectStoreBuilder::memory())
925 .await
926 .unwrap();
927 assert_eq!(
928 test_struct.namespace_path("test_namespace"),
929 "/base/path/test_namespace"
930 );
931 }
932
933 #[tokio::test]
934 async fn test_namespace_path_s3() {
935 let test_struct = FileCatalog::new("s3://base/path", ObjectStoreBuilder::memory())
936 .await
937 .unwrap();
938 assert_eq!(
939 test_struct.namespace_path("test_namespace"),
940 "s3://base/path/test_namespace"
941 );
942 }
943
944 #[tokio::test]
945 async fn test_identifier_normal_case() {
946 let test_struct = FileCatalog::new("/base/path", ObjectStoreBuilder::memory())
947 .await
948 .unwrap();
949
950 let result = test_struct.identifier("/base/path/test_namespace/test_table");
951 assert_eq!(result.namespace()[0], "test_namespace");
952 assert_eq!(result.name(), "test_table");
953 }
954
955 #[tokio::test]
956 async fn test_namespace_normal_case() {
957 let test_struct = FileCatalog::new("/base/path", ObjectStoreBuilder::memory())
958 .await
959 .unwrap();
960
961 let result = test_struct.namespace("/base/path/test_namespace").unwrap();
962 assert_eq!(result.as_ref(), &["test_namespace".to_string()]);
963 }
964}