iceberg_datafusion/catalog.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::collections::HashMap;
20use std::sync::Arc;
21
22use datafusion::catalog::{CatalogProvider, SchemaProvider};
23use futures::future::try_join_all;
24use iceberg::{Catalog, NamespaceIdent, Result};
25
26use crate::schema::IcebergSchemaProvider;
27
28/// Provides an interface to manage and access multiple schemas
29/// within an Iceberg [`Catalog`].
30///
31/// Acts as a centralized catalog provider that aggregates
32/// multiple [`SchemaProvider`], each associated with distinct namespaces.
33#[derive(Debug)]
34pub struct IcebergCatalogProvider {
35 /// A `HashMap` where keys are namespace names
36 /// and values are dynamic references to objects implementing the
37 /// [`SchemaProvider`] trait.
38 schemas: HashMap<String, Arc<dyn SchemaProvider>>,
39}
40
41impl IcebergCatalogProvider {
42 /// Asynchronously tries to construct a new [`IcebergCatalogProvider`]
43 /// using the given client to fetch and initialize schema providers for
44 /// each namespace in the Iceberg [`Catalog`].
45 ///
46 /// This method retrieves the list of namespace names
47 /// attempts to create a schema provider for each namespace, and
48 /// collects these providers into a `HashMap`.
49 pub async fn try_new(client: Arc<dyn Catalog>) -> Result<Self> {
50 // TODO:
51 // Schemas and providers should be cached and evicted based on time
52 // As of right now; schemas might become stale.
53 let schema_names: Vec<_> = client
54 .list_namespaces(None)
55 .await?
56 .iter()
57 .flat_map(|ns| ns.as_ref().clone())
58 .collect();
59
60 let providers = try_join_all(
61 schema_names
62 .iter()
63 .map(|name| {
64 IcebergSchemaProvider::try_new(
65 client.clone(),
66 NamespaceIdent::new(name.clone()),
67 )
68 })
69 .collect::<Vec<_>>(),
70 )
71 .await?;
72
73 let schemas: HashMap<String, Arc<dyn SchemaProvider>> = schema_names
74 .into_iter()
75 .zip(providers.into_iter())
76 .map(|(name, provider)| {
77 let provider = Arc::new(provider) as Arc<dyn SchemaProvider>;
78 (name, provider)
79 })
80 .collect();
81
82 Ok(IcebergCatalogProvider { schemas })
83 }
84}
85
86impl CatalogProvider for IcebergCatalogProvider {
87 fn as_any(&self) -> &dyn Any {
88 self
89 }
90
91 fn schema_names(&self) -> Vec<String> {
92 self.schemas.keys().cloned().collect()
93 }
94
95 fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
96 self.schemas.get(name).cloned()
97 }
98}