#![cfg(feature = "rest-catalog")]
use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use iceberg::table::Table;
use iceberg::{
Catalog, Namespace, NamespaceIdent, Result as IcebergResult, TableCommit, TableCreation,
TableIdent,
};
use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
use crate::catalog::CatalogError;
#[derive(Debug, Clone)]
pub struct KrishivRestCatalog {
inner: Arc<RestCatalog>,
}
impl KrishivRestCatalog {
pub async fn new(
uri: &str,
warehouse: &str,
token: Option<&str>,
) -> Result<Self, CatalogError> {
let mut builder = RestCatalogConfig::builder()
.uri(uri.to_string())
.warehouse(warehouse.to_string());
if let Some(t) = token {
builder = builder.token(t.to_string());
}
let config = builder.build();
Ok(Self {
inner: Arc::new(RestCatalog::new(config)),
})
}
pub fn inner(&self) -> &RestCatalog {
&self.inner
}
}
#[async_trait]
impl Catalog for KrishivRestCatalog {
async fn list_namespaces(
&self,
parent: Option<&NamespaceIdent>,
) -> IcebergResult<Vec<NamespaceIdent>> {
self.inner.list_namespaces(parent).await
}
async fn create_namespace(
&self,
namespace: &NamespaceIdent,
properties: HashMap<String, String>,
) -> IcebergResult<Namespace> {
self.inner.create_namespace(namespace, properties).await
}
async fn get_namespace(&self, namespace: &NamespaceIdent) -> IcebergResult<Namespace> {
self.inner.get_namespace(namespace).await
}
async fn namespace_exists(&self, namespace: &NamespaceIdent) -> IcebergResult<bool> {
self.inner.namespace_exists(namespace).await
}
async fn update_namespace(
&self,
namespace: &NamespaceIdent,
properties: HashMap<String, String>,
) -> IcebergResult<()> {
self.inner.update_namespace(namespace, properties).await
}
async fn drop_namespace(&self, namespace: &NamespaceIdent) -> IcebergResult<()> {
self.inner.drop_namespace(namespace).await
}
async fn list_tables(&self, namespace: &NamespaceIdent) -> IcebergResult<Vec<TableIdent>> {
self.inner.list_tables(namespace).await
}
async fn create_table(
&self,
namespace: &NamespaceIdent,
creation: TableCreation,
) -> IcebergResult<Table> {
self.inner.create_table(namespace, creation).await
}
async fn load_table(&self, table: &TableIdent) -> IcebergResult<Table> {
self.inner.load_table(table).await
}
async fn drop_table(&self, table: &TableIdent) -> IcebergResult<()> {
self.inner.drop_table(table).await
}
async fn table_exists(&self, table: &TableIdent) -> IcebergResult<bool> {
self.inner.table_exists(table).await
}
async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> IcebergResult<()> {
self.inner.rename_table(src, dest).await
}
async fn register_table(
&self,
table: &TableIdent,
metadata_location: String,
) -> IcebergResult<Table> {
self.inner.register_table(table, metadata_location).await
}
async fn update_table(&self, commit: TableCommit) -> IcebergResult<Table> {
self.inner.update_table(commit).await
}
}