iceberg_file_catalog/
lib.rs

1use std::{
2    collections::HashMap,
3    convert::identity,
4    sync::{Arc, RwLock},
5};
6
7use async_trait::async_trait;
8use futures::{future, TryStreamExt};
9use iceberg_rust::{
10    catalog::{
11        commit::{
12            apply_table_updates, apply_view_updates, check_table_requirements,
13            check_view_requirements, CommitTable, CommitView, TableRequirement,
14        },
15        create::{CreateMaterializedView, CreateTable, CreateView},
16        identifier::Identifier,
17        namespace::Namespace,
18        tabular::Tabular,
19        Catalog, CatalogList,
20    },
21    error::Error as IcebergError,
22    materialized_view::MaterializedView,
23    object_store::{store::IcebergStore, Bucket, ObjectStoreBuilder},
24    spec::{
25        identifier::FullIdentifier,
26        materialized_view_metadata::MaterializedViewMetadata,
27        table_metadata::{new_metadata_location, TableMetadata},
28        tabular::TabularMetadata,
29        util::strip_prefix,
30        view_metadata::ViewMetadata,
31    },
32    table::Table,
33    view::View,
34};
35use object_store::ObjectStore;
36
37use crate::error::Error;
38
39#[derive(Debug)]
40pub struct FileCatalog {
41    path: String,
42    object_store: ObjectStoreBuilder,
43    cache: Arc<RwLock<HashMap<Identifier, (String, TabularMetadata)>>>,
44}
45
46pub mod error;
47
48impl FileCatalog {
49    pub async fn new(path: &str, object_store: ObjectStoreBuilder) -> Result<Self, Error> {
50        Ok(FileCatalog {
51            path: path.to_owned(),
52            object_store,
53            cache: Arc::new(RwLock::new(HashMap::new())),
54        })
55    }
56
57    pub fn catalog_list(&self) -> Arc<FileCatalogList> {
58        Arc::new(FileCatalogList {
59            path: self.path.clone(),
60            object_store: self.object_store.clone(),
61        })
62    }
63}
64
65#[async_trait]
66impl Catalog for FileCatalog {
67    /// Catalog name
68    fn name(&self) -> &str {
69        self.path.trim_end_matches('/').split("/").last().unwrap()
70    }
71    /// Create a namespace in the catalog
72    async fn create_namespace(
73        &self,
74        _namespace: &Namespace,
75        _properties: Option<HashMap<String, String>>,
76    ) -> Result<HashMap<String, String>, IcebergError> {
77        Ok(HashMap::new())
78    }
79    /// Drop a namespace in the catalog
80    async fn drop_namespace(&self, _namespace: &Namespace) -> Result<(), IcebergError> {
81        todo!()
82    }
83    /// Load the namespace properties from the catalog
84    async fn load_namespace(
85        &self,
86        _namespace: &Namespace,
87    ) -> Result<HashMap<String, String>, IcebergError> {
88        todo!()
89    }
90    /// Update the namespace properties in the catalog
91    async fn update_namespace(
92        &self,
93        _namespace: &Namespace,
94        _updates: Option<HashMap<String, String>>,
95        _removals: Option<Vec<String>>,
96    ) -> Result<(), IcebergError> {
97        todo!()
98    }
99    /// Check if a namespace exists
100    async fn namespace_exists(&self, _namespace: &Namespace) -> Result<bool, IcebergError> {
101        todo!()
102    }
103    async fn list_tabulars(&self, namespace: &Namespace) -> Result<Vec<Identifier>, IcebergError> {
104        let bucket = Bucket::from_path(&self.path)?;
105        let object_store = self.object_store.build(bucket)?;
106
107        object_store
108            .list(Some(
109                &strip_prefix(&self.namespace_path(&namespace[0])).into(),
110            ))
111            .map_err(IcebergError::from)
112            .map_ok(|x| {
113                let path = x.location.as_ref();
114                self.identifier(path)
115            })
116            .try_collect()
117            .await
118    }
119    async fn list_namespaces(&self, _parent: Option<&str>) -> Result<Vec<Namespace>, IcebergError> {
120        let bucket = Bucket::from_path(&self.path)?;
121        let object_store = self.object_store.build(bucket)?;
122
123        object_store
124            .list_with_delimiter(Some(
125                &strip_prefix(self.path.trim_start_matches('/')).into(),
126            ))
127            .await
128            .map_err(IcebergError::from)?
129            .common_prefixes
130            .into_iter()
131            .map(|x| self.namespace(x.as_ref()))
132            .collect::<Result<_, IcebergError>>()
133    }
134    async fn tabular_exists(&self, identifier: &Identifier) -> Result<bool, IcebergError> {
135        self.metadata_location(identifier)
136            .await
137            .map(|_| true)
138            .or(Ok(false))
139    }
140    async fn drop_table(&self, _identifierr: &Identifier) -> Result<(), IcebergError> {
141        todo!()
142    }
143    async fn drop_view(&self, _identifier: &Identifier) -> Result<(), IcebergError> {
144        todo!()
145    }
146    async fn drop_materialized_view(&self, _identifier: &Identifier) -> Result<(), IcebergError> {
147        todo!()
148    }
149    async fn load_tabular(
150        self: Arc<Self>,
151        identifier: &Identifier,
152    ) -> Result<Tabular, IcebergError> {
153        let bucket = Bucket::from_path(&self.path)?;
154        let object_store = self.object_store.build(bucket)?;
155
156        let metadata_location = self.metadata_location(identifier).await?;
157
158        let bytes = object_store
159            .get(&strip_prefix(&metadata_location).as_str().into())
160            .await
161            .map_err(|_| IcebergError::CatalogNotFound)?
162            .bytes()
163            .await?;
164
165        let metadata: TabularMetadata = serde_json::from_slice(&bytes)?;
166
167        self.cache.write().unwrap().insert(
168            identifier.clone(),
169            (metadata_location.clone(), metadata.clone()),
170        );
171
172        match metadata {
173            TabularMetadata::Table(metadata) => Ok(Tabular::Table(
174                Table::new(
175                    identifier.clone(),
176                    self.clone(),
177                    object_store.clone(),
178                    metadata,
179                )
180                .await?,
181            )),
182            TabularMetadata::View(metadata) => Ok(Tabular::View(
183                View::new(identifier.clone(), self.clone(), metadata).await?,
184            )),
185            TabularMetadata::MaterializedView(metadata) => Ok(Tabular::MaterializedView(
186                MaterializedView::new(identifier.clone(), self.clone(), metadata).await?,
187            )),
188        }
189    }
190
191    async fn create_table(
192        self: Arc<Self>,
193        identifier: Identifier,
194        mut create_table: CreateTable,
195    ) -> Result<Table, IcebergError> {
196        if self.tabular_exists(&identifier).await.is_ok_and(identity) {
197            return Err(IcebergError::InvalidFormat(
198                "Table already exists. Path".to_owned(),
199            ));
200        }
201        create_table.location =
202            Some(self.tabular_path(&identifier.namespace()[0], identifier.name()));
203        let metadata: TableMetadata = create_table.try_into()?;
204        // Create metadata
205        let location = metadata.location.to_string();
206
207        // Write metadata to object_store
208        let bucket = Bucket::from_path(&location)?;
209        let object_store = self.default_object_store(bucket);
210
211        let metadata_location = location + "/metadata/v0.metadata.json";
212
213        object_store
214            .put_metadata(&metadata_location, metadata.as_ref())
215            .await?;
216
217        object_store.put_version_hint(&metadata_location).await.ok();
218
219        self.cache.write().unwrap().insert(
220            identifier.clone(),
221            (metadata_location.clone(), metadata.clone().into()),
222        );
223        Ok(Table::new(
224            identifier.clone(),
225            self.clone(),
226            object_store.clone(),
227            metadata,
228        )
229        .await?)
230    }
231
232    async fn create_view(
233        self: Arc<Self>,
234        identifier: Identifier,
235        mut create_view: CreateView<Option<()>>,
236    ) -> Result<View, IcebergError> {
237        if self.tabular_exists(&identifier).await.is_ok_and(identity) {
238            return Err(IcebergError::InvalidFormat(
239                "View already exists. Path".to_owned(),
240            ));
241        }
242
243        create_view.location =
244            Some(self.tabular_path(&identifier.namespace()[0], identifier.name()));
245        let metadata: ViewMetadata = create_view.try_into()?;
246        // Create metadata
247        let location = metadata.location.to_string();
248
249        // Write metadata to object_store
250        let bucket = Bucket::from_path(&location)?;
251        let object_store = self.default_object_store(bucket);
252
253        let metadata_location = location + "/metadata/v0.metadata.json";
254
255        object_store
256            .put_metadata(&metadata_location, metadata.as_ref())
257            .await?;
258
259        object_store.put_version_hint(&metadata_location).await.ok();
260
261        self.cache.write().unwrap().insert(
262            identifier.clone(),
263            (metadata_location.clone(), metadata.clone().into()),
264        );
265        Ok(View::new(identifier.clone(), self.clone(), metadata).await?)
266    }
267
268    async fn create_materialized_view(
269        self: Arc<Self>,
270        identifier: Identifier,
271        create_view: CreateMaterializedView,
272    ) -> Result<MaterializedView, IcebergError> {
273        if self.tabular_exists(&identifier).await.is_ok_and(identity) {
274            return Err(IcebergError::InvalidFormat(
275                "View already exists. Path".to_owned(),
276            ));
277        }
278
279        let (mut create_view, mut create_table) = create_view.into();
280
281        create_view.location =
282            Some(self.tabular_path(&identifier.namespace()[0], identifier.name()));
283        let metadata: MaterializedViewMetadata = create_view.try_into()?;
284        let table_identifier = metadata.current_version(None)?.storage_table();
285
286        create_table.location =
287            Some(self.tabular_path(&table_identifier.namespace()[0], table_identifier.name()));
288        let table_metadata: TableMetadata = create_table.try_into()?;
289        // Create metadata
290        let location = metadata.location.to_string();
291
292        // Write metadata to object_store
293        let bucket = Bucket::from_path(&location)?;
294        let object_store = self.default_object_store(bucket);
295
296        let metadata_location = location + "/metadata/v0.metadata.json";
297
298        let table_metadata_location =
299            table_metadata.location.clone() + "/metadata/v0.metadata.json";
300
301        object_store
302            .put_metadata(&metadata_location, metadata.as_ref())
303            .await?;
304
305        object_store.put_version_hint(&metadata_location).await.ok();
306
307        object_store
308            .put_metadata(&table_metadata_location, table_metadata.as_ref())
309            .await?;
310
311        self.cache.write().unwrap().insert(
312            identifier.clone(),
313            (metadata_location.clone(), metadata.clone().into()),
314        );
315
316        Ok(MaterializedView::new(identifier.clone(), self.clone(), metadata).await?)
317    }
318
319    async fn update_table(self: Arc<Self>, commit: CommitTable) -> Result<Table, IcebergError> {
320        let bucket = Bucket::from_path(&self.path)?;
321        let object_store = self.object_store.build(bucket)?;
322
323        let identifier = commit.identifier;
324        let Some(entry) = self.cache.read().unwrap().get(&identifier).cloned() else {
325            #[allow(clippy::if_same_then_else)]
326            if !matches!(commit.requirements[0], TableRequirement::AssertCreate) {
327                return Err(IcebergError::InvalidFormat(
328                    "Create table assertion".to_owned(),
329                ));
330            } else {
331                return Err(IcebergError::InvalidFormat(
332                    "Create table assertion".to_owned(),
333                ));
334            }
335        };
336        let (previous_metadata_location, metadata) = entry;
337
338        let TabularMetadata::Table(mut metadata) = metadata else {
339            return Err(IcebergError::InvalidFormat(
340                "Table update on entity that is not a table".to_owned(),
341            ));
342        };
343        if !check_table_requirements(&commit.requirements, &metadata) {
344            return Err(IcebergError::InvalidFormat(
345                "Table requirements not valid".to_owned(),
346            ));
347        }
348        apply_table_updates(&mut metadata, commit.updates)?;
349        let temp_metadata_location = new_metadata_location(&metadata);
350
351        object_store
352            .put_metadata(&temp_metadata_location, metadata.as_ref())
353            .await?;
354
355        let metadata_location =
356            new_filesystem_metadata_location(&metadata.location, &previous_metadata_location)?;
357
358        object_store
359            .copy_if_not_exists(
360                &strip_prefix(&temp_metadata_location).into(),
361                &strip_prefix(&metadata_location).into(),
362            )
363            .await?;
364
365        object_store.put_version_hint(&metadata_location).await.ok();
366
367        self.cache.write().unwrap().insert(
368            identifier.clone(),
369            (metadata_location.clone(), metadata.clone().into()),
370        );
371
372        Ok(Table::new(
373            identifier.clone(),
374            self.clone(),
375            object_store.clone(),
376            metadata,
377        )
378        .await?)
379    }
380
381    async fn update_view(
382        self: Arc<Self>,
383        commit: CommitView<Option<()>>,
384    ) -> Result<View, IcebergError> {
385        let bucket = Bucket::from_path(&self.path)?;
386        let object_store = self.object_store.build(bucket)?;
387
388        let identifier = commit.identifier;
389        let Some(entry) = self.cache.read().unwrap().get(&identifier).cloned() else {
390            return Err(IcebergError::InvalidFormat(
391                "Create table assertion".to_owned(),
392            ));
393        };
394        let (previous_metadata_location, mut metadata) = entry;
395        let metadata_location = match &mut metadata {
396            TabularMetadata::View(metadata) => {
397                if !check_view_requirements(&commit.requirements, metadata) {
398                    return Err(IcebergError::InvalidFormat(
399                        "View requirements not valid".to_owned(),
400                    ));
401                }
402                apply_view_updates(metadata, commit.updates)?;
403                let temp_metadata_location = new_metadata_location(&*metadata);
404
405                object_store
406                    .put_metadata(&temp_metadata_location, metadata.as_ref())
407                    .await?;
408
409                let metadata_location = new_filesystem_metadata_location(
410                    &metadata.location,
411                    &previous_metadata_location,
412                )?;
413
414                object_store
415                    .copy_if_not_exists(
416                        &strip_prefix(&temp_metadata_location).into(),
417                        &strip_prefix(&metadata_location).into(),
418                    )
419                    .await?;
420
421                object_store.put_version_hint(&metadata_location).await.ok();
422
423                Ok(metadata_location)
424            }
425            _ => Err(IcebergError::InvalidFormat(
426                "View update on entity that is not a view".to_owned(),
427            )),
428        }?;
429
430        self.cache.write().unwrap().insert(
431            identifier.clone(),
432            (metadata_location.clone(), metadata.clone()),
433        );
434        if let TabularMetadata::View(metadata) = metadata {
435            Ok(View::new(identifier.clone(), self.clone(), metadata).await?)
436        } else {
437            Err(IcebergError::InvalidFormat(
438                "Entity is not a view".to_owned(),
439            ))
440        }
441    }
442    async fn update_materialized_view(
443        self: Arc<Self>,
444        commit: CommitView<FullIdentifier>,
445    ) -> Result<MaterializedView, IcebergError> {
446        let bucket = Bucket::from_path(&self.path)?;
447        let object_store = self.object_store.build(bucket)?;
448
449        let identifier = commit.identifier;
450        let Some(entry) = self.cache.read().unwrap().get(&identifier).cloned() else {
451            return Err(IcebergError::InvalidFormat(
452                "Create table assertion".to_owned(),
453            ));
454        };
455        let (previous_metadata_location, mut metadata) = entry;
456        let metadata_location = match &mut metadata {
457            TabularMetadata::MaterializedView(metadata) => {
458                if !check_view_requirements(&commit.requirements, metadata) {
459                    return Err(IcebergError::InvalidFormat(
460                        "Materialized view requirements not valid".to_owned(),
461                    ));
462                }
463                apply_view_updates(metadata, commit.updates)?;
464                let temp_metadata_location = new_metadata_location(&*metadata);
465
466                object_store
467                    .put_metadata(&temp_metadata_location, metadata.as_ref())
468                    .await?;
469
470                let metadata_location = new_filesystem_metadata_location(
471                    &metadata.location,
472                    &previous_metadata_location,
473                )?;
474
475                object_store
476                    .copy_if_not_exists(
477                        &strip_prefix(&temp_metadata_location).into(),
478                        &strip_prefix(&metadata_location).into(),
479                    )
480                    .await?;
481
482                object_store.put_version_hint(&metadata_location).await.ok();
483
484                Ok(metadata_location)
485            }
486            _ => Err(IcebergError::InvalidFormat(
487                "Materialized view update on entity that is not a materialized view".to_owned(),
488            )),
489        }?;
490
491        self.cache.write().unwrap().insert(
492            identifier.clone(),
493            (metadata_location.clone(), metadata.clone()),
494        );
495        if let TabularMetadata::MaterializedView(metadata) = metadata {
496            Ok(MaterializedView::new(identifier.clone(), self.clone(), metadata).await?)
497        } else {
498            Err(IcebergError::InvalidFormat(
499                "Entity is not a materialized view".to_owned(),
500            ))
501        }
502    }
503
504    async fn register_table(
505        self: Arc<Self>,
506        _identifier: Identifier,
507        _metadata_location: &str,
508    ) -> Result<Table, IcebergError> {
509        unimplemented!()
510    }
511}
512
513impl FileCatalog {
514    fn default_object_store(&self, bucket: Bucket) -> Arc<dyn object_store::ObjectStore> {
515        Arc::new(self.object_store.build(bucket).unwrap())
516    }
517    fn namespace_path(&self, namespace: &str) -> String {
518        self.path.as_str().trim_end_matches('/').to_owned() + "/" + namespace
519    }
520
521    fn tabular_path(&self, namespace: &str, name: &str) -> String {
522        self.path.as_str().trim_end_matches('/').to_owned() + "/" + namespace + "/" + name
523    }
524
525    async fn metadata_location(&self, identifier: &Identifier) -> Result<String, IcebergError> {
526        let bucket = Bucket::from_path(&self.path)?;
527        let object_store = self.object_store.build(bucket)?;
528
529        let path = self.tabular_path(&identifier.namespace()[0], identifier.name()) + "/metadata";
530        let mut files: Vec<String> = object_store
531            .list(Some(&strip_prefix(&path).into()))
532            .map_ok(|x| x.location.to_string())
533            .try_filter(|x| {
534                future::ready(
535                    x.ends_with("metadata.json")
536                        && x.starts_with((strip_prefix(&path) + "/v").trim_start_matches('/')),
537                )
538            })
539            .try_collect()
540            .await
541            .map_err(IcebergError::from)?;
542        files.sort_by(|x, y| {
543            let x = x
544                .trim_start_matches((strip_prefix(&path) + "/v").trim_start_matches("/"))
545                .trim_end_matches("/")
546                .trim_end_matches(".metadata.json")
547                .parse::<usize>()
548                .unwrap();
549            let y = y
550                .trim_start_matches((strip_prefix(&path) + "/v").trim_start_matches("/"))
551                .trim_end_matches("/")
552                .trim_end_matches(".metadata.json")
553                .parse::<usize>()
554                .unwrap();
555            x.cmp(&y)
556        });
557        files
558            .into_iter()
559            .next_back()
560            .ok_or(IcebergError::CatalogNotFound)
561    }
562
563    fn identifier(&self, path: &str) -> Identifier {
564        let parts: Vec<&str> = trim_start_path(path)
565            .trim_start_matches(trim_start_path(&self.path))
566            .trim_start_matches('/')
567            .split('/')
568            .take(2)
569            .collect();
570        Identifier::new(&[parts[0].to_owned()], parts[1])
571    }
572
573    fn namespace(&self, path: &str) -> Result<Namespace, IcebergError> {
574        let parts = trim_start_path(path)
575            .trim_start_matches(trim_start_path(&self.path))
576            .trim_start_matches('/')
577            .split('/')
578            .next()
579            .ok_or(IcebergError::InvalidFormat("Namespace in path".to_owned()))?
580            .to_owned();
581        Namespace::try_new(&[parts]).map_err(IcebergError::from)
582    }
583}
584
585fn trim_start_path(path: &str) -> &str {
586    path.trim_start_matches('/').trim_start_matches("s3://")
587}
588
589fn parse_version(path: &str) -> Result<u64, IcebergError> {
590    path.split('/')
591        .next_back()
592        .ok_or(IcebergError::InvalidFormat("Metadata location".to_owned()))?
593        .trim_start_matches('v')
594        .trim_end_matches(".metadata.json")
595        .parse()
596        .map_err(IcebergError::from)
597}
598
599fn new_filesystem_metadata_location(
600    metadata_location: &str,
601    previous_metadata_location: &str,
602) -> Result<String, IcebergError> {
603    let current_version = parse_version(previous_metadata_location)? + 1;
604    Ok(metadata_location.to_string()
605        + "/metadata/v"
606        + &current_version.to_string()
607        + ".metadata.json")
608}
609
610#[derive(Debug)]
611pub struct FileCatalogList {
612    path: String,
613    object_store: ObjectStoreBuilder,
614}
615
616impl FileCatalogList {
617    pub async fn new(path: &str, object_store: ObjectStoreBuilder) -> Result<Self, Error> {
618        Ok(FileCatalogList {
619            path: path.to_owned(),
620            object_store,
621        })
622    }
623
624    fn parse_catalog(&self, path: &str) -> Result<String, IcebergError> {
625        trim_start_path(path.trim_start_matches(trim_start_path(&self.path)))
626            .trim_start_matches('/')
627            .split('/')
628            .next()
629            .ok_or(IcebergError::InvalidFormat("Catalog in path".to_owned()))
630            .map(ToOwned::to_owned)
631    }
632}
633
634#[async_trait]
635impl CatalogList for FileCatalogList {
636    fn catalog(&self, name: &str) -> Option<Arc<dyn Catalog>> {
637        Some(Arc::new(FileCatalog {
638            path: self.path.clone() + "/" + name,
639            object_store: self.object_store.clone(),
640            cache: Arc::new(RwLock::new(HashMap::new())),
641        }))
642    }
643    async fn list_catalogs(&self) -> Vec<String> {
644        let bucket = Bucket::from_path(&self.path).unwrap();
645        let object_store = self.object_store.build(bucket).unwrap();
646
647        object_store
648            .list_with_delimiter(Some(&strip_prefix(trim_start_path(&self.path)).into()))
649            .await
650            .map_err(IcebergError::from)
651            .unwrap()
652            .common_prefixes
653            .into_iter()
654            .map(|x| self.parse_catalog(x.as_ref()))
655            .collect::<Result<_, IcebergError>>()
656            .unwrap()
657    }
658}
659
660#[cfg(test)]
661pub mod tests {
662    use datafusion::{
663        arrow::array::{Float64Array, Int64Array},
664        common::tree_node::{TransformedResult, TreeNode},
665        execution::SessionStateBuilder,
666        prelude::{SessionConfig, SessionContext},
667    };
668    use datafusion_iceberg::{
669        catalog::catalog::IcebergCatalog,
670        planner::{iceberg_transform, IcebergQueryPlanner},
671    };
672    use futures::StreamExt;
673    use iceberg_rust::{
674        catalog::{namespace::Namespace, Catalog},
675        object_store::{Bucket, ObjectStoreBuilder},
676        spec::util::strip_prefix,
677    };
678    use std::{sync::Arc, time::Duration};
679    use testcontainers::{core::ExecCommand, runners::AsyncRunner, ImageExt};
680    use testcontainers_modules::localstack::LocalStack;
681    use tokio::time::sleep;
682    // use testcontainers::{core::ExecCommand, runners::AsyncRunner, ImageExt};
683    // use testcontainers_modules::localstack::LocalStack;
684
685    use crate::FileCatalog;
686
687    #[tokio::test]
688    async fn test_create_update_drop_table() {
689        let localstack = LocalStack::default()
690            .with_env_var("SERVICES", "s3")
691            .with_env_var("AWS_ACCESS_KEY_ID", "user")
692            .with_env_var("AWS_SECRET_ACCESS_KEY", "password")
693            .start()
694            .await
695            .unwrap();
696
697        let command = localstack
698            .exec(ExecCommand::new(vec![
699                "awslocal",
700                "s3api",
701                "create-bucket",
702                "--bucket",
703                "warehouse",
704            ]))
705            .await
706            .unwrap();
707
708        while command.exit_code().await.unwrap().is_none() {
709            sleep(Duration::from_millis(100)).await;
710        }
711
712        let localstack_host = localstack.get_host().await.unwrap();
713        let localstack_port = localstack.get_host_port_ipv4(4566).await.unwrap();
714
715        let object_store = ObjectStoreBuilder::s3()
716            .with_config("aws_access_key_id", "user")
717            .unwrap()
718            .with_config("aws_secret_access_key", "password")
719            .unwrap()
720            .with_config(
721                "endpoint",
722                format!("http://{localstack_host}:{localstack_port}"),
723            )
724            .unwrap()
725            .with_config("region", "us-east-1")
726            .unwrap()
727            .with_config("allow_http", "true")
728            .unwrap();
729        // let object_store = ObjectStoreBuilder::memory();
730
731        let iceberg_catalog: Arc<dyn Catalog> = Arc::new(
732            FileCatalog::new("s3://warehouse", object_store.clone())
733                .await
734                .unwrap(),
735        );
736
737        let catalog = Arc::new(
738            IcebergCatalog::new(iceberg_catalog.clone(), None)
739                .await
740                .unwrap(),
741        );
742
743        let mut config = SessionConfig::default();
744
745        config.options_mut().execution.minimum_parallel_output_files = 1;
746        config
747            .options_mut()
748            .execution
749            .parquet
750            .maximum_parallel_row_group_writers = 4;
751
752        let state = SessionStateBuilder::new()
753            .with_config(config)
754            .with_default_features()
755            .with_query_planner(Arc::new(IcebergQueryPlanner::new()))
756            .build();
757
758        let ctx = SessionContext::new_with_state(state);
759
760        ctx.register_catalog("warehouse", catalog);
761
762        let sql = &"CREATE SCHEMA warehouse.tpch;".to_string();
763
764        let plan = ctx.state().create_logical_plan(sql).await.unwrap();
765
766        let transformed = plan.transform(iceberg_transform).data().unwrap();
767
768        ctx.execute_logical_plan(transformed)
769            .await
770            .unwrap()
771            .collect()
772            .await
773            .expect("Failed to execute query plan.");
774
775        let sql = "CREATE EXTERNAL TABLE lineitem ( 
776    L_ORDERKEY BIGINT NOT NULL, 
777    L_PARTKEY BIGINT NOT NULL, 
778    L_SUPPKEY BIGINT NOT NULL, 
779    L_LINENUMBER INT NOT NULL, 
780    L_QUANTITY DOUBLE NOT NULL, 
781    L_EXTENDED_PRICE DOUBLE NOT NULL, 
782    L_DISCOUNT DOUBLE NOT NULL, 
783    L_TAX DOUBLE NOT NULL, 
784    L_RETURNFLAG CHAR NOT NULL, 
785    L_LINESTATUS CHAR NOT NULL, 
786    L_SHIPDATE DATE NOT NULL, 
787    L_COMMITDATE DATE NOT NULL, 
788    L_RECEIPTDATE DATE NOT NULL, 
789    L_SHIPINSTRUCT VARCHAR NOT NULL, 
790    L_SHIPMODE VARCHAR NOT NULL, 
791    L_COMMENT VARCHAR NOT NULL ) STORED AS CSV LOCATION '../../datafusion_iceberg/testdata/tpch/lineitem.csv' OPTIONS ('has_header' 'false');";
792
793        let plan = ctx.state().create_logical_plan(sql).await.unwrap();
794
795        let transformed = plan.transform(iceberg_transform).data().unwrap();
796
797        ctx.execute_logical_plan(transformed)
798            .await
799            .unwrap()
800            .collect()
801            .await
802            .expect("Failed to execute query plan.");
803
804        let sql = "CREATE EXTERNAL TABLE warehouse.tpch.lineitem ( 
805    L_ORDERKEY BIGINT NOT NULL, 
806    L_PARTKEY BIGINT NOT NULL, 
807    L_SUPPKEY BIGINT NOT NULL, 
808    L_LINENUMBER INT NOT NULL, 
809    L_QUANTITY DOUBLE NOT NULL, 
810    L_EXTENDED_PRICE DOUBLE NOT NULL, 
811    L_DISCOUNT DOUBLE NOT NULL, 
812    L_TAX DOUBLE NOT NULL, 
813    L_RETURNFLAG CHAR NOT NULL, 
814    L_LINESTATUS CHAR NOT NULL, 
815    L_SHIPDATE DATE NOT NULL, 
816    L_COMMITDATE DATE NOT NULL, 
817    L_RECEIPTDATE DATE NOT NULL, 
818    L_SHIPINSTRUCT VARCHAR NOT NULL, 
819    L_SHIPMODE VARCHAR NOT NULL, 
820    L_COMMENT VARCHAR NOT NULL ) STORED AS ICEBERG LOCATION 's3://warehouse/tpch/lineitem';";
821
822        let plan = ctx.state().create_logical_plan(sql).await.unwrap();
823
824        let transformed = plan.transform(iceberg_transform).data().unwrap();
825
826        ctx.execute_logical_plan(transformed)
827            .await
828            .unwrap()
829            .collect()
830            .await
831            .expect("Failed to execute query plan.");
832
833        let tables = iceberg_catalog
834            .clone()
835            .list_tabulars(
836                &Namespace::try_new(&["tpch".to_owned()]).expect("Failed to create namespace"),
837            )
838            .await
839            .expect("Failed to list Tables");
840        assert_eq!(tables[0].to_string(), "tpch.lineitem".to_owned());
841
842        let sql = "insert into warehouse.tpch.lineitem select * from lineitem;";
843
844        let plan = ctx.state().create_logical_plan(sql).await.unwrap();
845
846        let transformed = plan.transform(iceberg_transform).data().unwrap();
847
848        ctx.execute_logical_plan(transformed)
849            .await
850            .unwrap()
851            .collect()
852            .await
853            .expect("Failed to execute query plan.");
854
855        let batches = ctx
856        .sql("select sum(L_QUANTITY), L_PARTKEY from warehouse.tpch.lineitem group by L_PARTKEY;")
857        .await
858        .expect("Failed to create plan for select")
859        .collect()
860        .await
861        .expect("Failed to execute select query");
862
863        let mut once = false;
864
865        for batch in batches {
866            if batch.num_rows() != 0 {
867                let (amounts, product_ids) = (
868                    batch
869                        .column(0)
870                        .as_any()
871                        .downcast_ref::<Float64Array>()
872                        .unwrap(),
873                    batch
874                        .column(1)
875                        .as_any()
876                        .downcast_ref::<Int64Array>()
877                        .unwrap(),
878                );
879                for (product_id, amount) in product_ids.iter().zip(amounts) {
880                    if product_id.unwrap() == 24027 {
881                        assert_eq!(amount.unwrap(), 24.0)
882                    } else if product_id.unwrap() == 63700 {
883                        assert_eq!(amount.unwrap(), 23.0)
884                    }
885                }
886                once = true
887            }
888        }
889
890        assert!(once);
891
892        let object_store = object_store
893            .build(Bucket::from_path("s3://warehouse").unwrap())
894            .unwrap();
895
896        let version_hint = object_store
897            .get(&strip_prefix("s3://warehouse/tpch/lineitem/metadata/version-hint.text").into())
898            .await
899            .unwrap()
900            .bytes()
901            .await
902            .unwrap();
903
904        assert_eq!(std::str::from_utf8(&version_hint).unwrap(), "1");
905
906        let files = object_store.list(None).collect::<Vec<_>>().await;
907
908        assert_eq!(
909            files
910                .iter()
911                .filter(|x| x
912                    .as_ref()
913                    .unwrap()
914                    .location
915                    .extension()
916                    .is_some_and(|x| x == "parquet"))
917                .count(),
918            1
919        );
920    }
921
922    #[tokio::test]
923    async fn test_namespace_path_normal_case() {
924        let test_struct = FileCatalog::new("/base/path", ObjectStoreBuilder::memory())
925            .await
926            .unwrap();
927        assert_eq!(
928            test_struct.namespace_path("test_namespace"),
929            "/base/path/test_namespace"
930        );
931    }
932
933    #[tokio::test]
934    async fn test_namespace_path_s3() {
935        let test_struct = FileCatalog::new("s3://base/path", ObjectStoreBuilder::memory())
936            .await
937            .unwrap();
938        assert_eq!(
939            test_struct.namespace_path("test_namespace"),
940            "s3://base/path/test_namespace"
941        );
942    }
943
944    #[tokio::test]
945    async fn test_identifier_normal_case() {
946        let test_struct = FileCatalog::new("/base/path", ObjectStoreBuilder::memory())
947            .await
948            .unwrap();
949
950        let result = test_struct.identifier("/base/path/test_namespace/test_table");
951        assert_eq!(result.namespace()[0], "test_namespace");
952        assert_eq!(result.name(), "test_table");
953    }
954
955    #[tokio::test]
956    async fn test_namespace_normal_case() {
957        let test_struct = FileCatalog::new("/base/path", ObjectStoreBuilder::memory())
958            .await
959            .unwrap();
960
961        let result = test_struct.namespace("/base/path/test_namespace").unwrap();
962        assert_eq!(result.as_ref(), &["test_namespace".to_string()]);
963    }
964}