iceberg_datafusion/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::collections::HashMap;
20use std::sync::Arc;
21
22use datafusion::catalog::{CatalogProvider, SchemaProvider};
23use futures::future::try_join_all;
24use iceberg::{Catalog, NamespaceIdent, Result};
25
26use crate::schema::IcebergSchemaProvider;
27
28/// Provides an interface to manage and access multiple schemas
29/// within an Iceberg [`Catalog`].
30///
31/// Acts as a centralized catalog provider that aggregates
32/// multiple [`SchemaProvider`], each associated with distinct namespaces.
33#[derive(Debug)]
34pub struct IcebergCatalogProvider {
35    /// A `HashMap` where keys are namespace names
36    /// and values are dynamic references to objects implementing the
37    /// [`SchemaProvider`] trait.
38    schemas: HashMap<String, Arc<dyn SchemaProvider>>,
39}
40
41impl IcebergCatalogProvider {
42    /// Asynchronously tries to construct a new [`IcebergCatalogProvider`]
43    /// using the given client to fetch and initialize schema providers for
44    /// each namespace in the Iceberg [`Catalog`].
45    ///
46    /// This method retrieves the list of namespace names
47    /// attempts to create a schema provider for each namespace, and
48    /// collects these providers into a `HashMap`.
49    pub async fn try_new(client: Arc<dyn Catalog>) -> Result<Self> {
50        // TODO:
51        // Schemas and providers should be cached and evicted based on time
52        // As of right now; schemas might become stale.
53        let schema_names: Vec<_> = client
54            .list_namespaces(None)
55            .await?
56            .iter()
57            .flat_map(|ns| ns.as_ref().clone())
58            .collect();
59
60        let providers = try_join_all(
61            schema_names
62                .iter()
63                .map(|name| {
64                    IcebergSchemaProvider::try_new(
65                        client.clone(),
66                        NamespaceIdent::new(name.clone()),
67                    )
68                })
69                .collect::<Vec<_>>(),
70        )
71        .await?;
72
73        let schemas: HashMap<String, Arc<dyn SchemaProvider>> = schema_names
74            .into_iter()
75            .zip(providers.into_iter())
76            .map(|(name, provider)| {
77                let provider = Arc::new(provider) as Arc<dyn SchemaProvider>;
78                (name, provider)
79            })
80            .collect();
81
82        Ok(IcebergCatalogProvider { schemas })
83    }
84}
85
86impl CatalogProvider for IcebergCatalogProvider {
87    fn as_any(&self) -> &dyn Any {
88        self
89    }
90
91    fn schema_names(&self) -> Vec<String> {
92        self.schemas.keys().cloned().collect()
93    }
94
95    fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
96        self.schemas.get(name).cloned()
97    }
98}