#![cfg(all(feature = "iceberg-datafusion", feature = "local-catalog"))]
use std::any::Any;
use std::collections::HashMap;
use std::fmt;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use datafusion::catalog::{CatalogProvider, SchemaProvider};
use datafusion::datasource::TableProvider;
use datafusion::error::Result as DfResult;
use crate::catalog::iceberg_table_provider::iceberg_scan::iceberg_table_provider;
use crate::catalog::unified::KrishivCatalog;
const CATALOG_CACHE_TTL: Duration = Duration::from_secs(30);
struct CatalogMetaCache {
namespaces: Option<(Vec<String>, Instant)>,
tables: HashMap<String, (Vec<String>, Instant)>,
}
impl CatalogMetaCache {
fn new() -> Self {
Self {
namespaces: None,
tables: HashMap::new(),
}
}
fn get_namespaces(&self) -> Option<&Vec<String>> {
self.namespaces
.as_ref()
.filter(|(_, t)| t.elapsed() < CATALOG_CACHE_TTL)
.map(|(v, _)| v)
}
fn set_namespaces(&mut self, ns: Vec<String>) {
self.namespaces = Some((ns, Instant::now()));
}
fn get_tables(&self, namespace: &str) -> Option<&Vec<String>> {
self.tables
.get(namespace)
.filter(|(_, t)| t.elapsed() < CATALOG_CACHE_TTL)
.map(|(v, _)| v)
}
fn set_tables(&mut self, namespace: impl Into<String>, tables: Vec<String>) {
self.tables
.insert(namespace.into(), (tables, Instant::now()));
}
}
#[derive(Clone)]
pub struct IcebergCatalogBridge {
catalog: Arc<KrishivCatalog>,
catalog_name: String,
cache: Arc<Mutex<CatalogMetaCache>>,
}
impl IcebergCatalogBridge {
pub fn new(catalog: Arc<KrishivCatalog>, catalog_name: impl Into<String>) -> Self {
Self {
catalog,
catalog_name: catalog_name.into(),
cache: Arc::new(Mutex::new(CatalogMetaCache::new())),
}
}
pub fn catalog_name(&self) -> &str {
&self.catalog_name
}
fn block_on<F: std::future::Future>(fut: F) -> F::Output {
match tokio::runtime::Handle::try_current() {
Ok(handle) => tokio::task::block_in_place(|| handle.block_on(fut)),
Err(_) => tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to build fallback Tokio runtime")
.block_on(fut),
}
}
}
impl fmt::Debug for IcebergCatalogBridge {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("IcebergCatalogBridge")
.field("catalog_name", &self.catalog_name)
.finish()
}
}
impl CatalogProvider for IcebergCatalogBridge {
fn as_any(&self) -> &dyn Any {
self
}
fn schema_names(&self) -> Vec<String> {
{
let cache = self.cache.lock().unwrap_or_else(|p| p.into_inner());
if let Some(ns) = cache.get_namespaces() {
return ns.clone();
}
}
let ns = Self::block_on(self.catalog.list_namespaces()).unwrap_or_default();
self.cache
.lock()
.unwrap_or_else(|p| p.into_inner())
.set_namespaces(ns.clone());
ns
}
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
let cached_exists = {
let cache = self.cache.lock().unwrap_or_else(|p| p.into_inner());
cache
.get_namespaces()
.map(|ns| ns.iter().any(|n| n == name))
};
let exists = match cached_exists {
Some(found) => found,
None => {
let ns = Self::block_on(self.catalog.list_namespaces()).ok()?;
let found = ns.iter().any(|n| n == name);
self.cache
.lock()
.unwrap_or_else(|p| p.into_inner())
.set_namespaces(ns);
found
}
};
if exists {
Some(Arc::new(IcebergSchemaBridge {
catalog: self.catalog.clone(),
namespace: name.to_string(),
cache: self.cache.clone(),
}))
} else {
None
}
}
}
struct IcebergSchemaBridge {
catalog: Arc<KrishivCatalog>,
namespace: String,
cache: Arc<Mutex<CatalogMetaCache>>,
}
impl fmt::Debug for IcebergSchemaBridge {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("IcebergSchemaBridge")
.field("namespace", &self.namespace)
.finish()
}
}
#[async_trait::async_trait]
impl SchemaProvider for IcebergSchemaBridge {
fn as_any(&self) -> &dyn Any {
self
}
fn table_names(&self) -> Vec<String> {
{
let cache = self.cache.lock().unwrap_or_else(|p| p.into_inner());
if let Some(tables) = cache.get_tables(&self.namespace) {
return tables.clone();
}
}
let tables = IcebergCatalogBridge::block_on(self.catalog.list_tables(&self.namespace))
.unwrap_or_default();
self.cache
.lock()
.unwrap_or_else(|p| p.into_inner())
.set_tables(&self.namespace, tables.clone());
tables
}
async fn table(&self, name: &str) -> DfResult<Option<Arc<dyn TableProvider>>> {
match self.catalog.load_table(&self.namespace, name).await {
Ok(table) => {
let provider = iceberg_table_provider(table).await?;
Ok(Some(provider))
}
Err(e) => {
let msg = e.to_string().to_ascii_lowercase();
if msg.contains("not found") || msg.contains("does not exist") {
Ok(None)
} else {
Err(datafusion::error::DataFusionError::External(Box::new(e)))
}
}
}
}
fn table_exist(&self, name: &str) -> bool {
{
let cache = self.cache.lock().unwrap_or_else(|p| p.into_inner());
if let Some(tables) = cache.get_tables(&self.namespace) {
return tables.iter().any(|t| t == name);
}
}
let tables = IcebergCatalogBridge::block_on(self.catalog.list_tables(&self.namespace))
.unwrap_or_default();
let exists = tables.iter().any(|t| t == name);
self.cache
.lock()
.unwrap_or_else(|p| p.into_inner())
.set_tables(&self.namespace, tables);
exists
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
use super::*;
use crate::catalog::unified::KrishivCatalog;
fn sample_schema() -> Schema {
Schema::builder()
.with_schema_id(0)
.with_fields(vec![Arc::new(NestedField::required(
1,
"id",
Type::Primitive(PrimitiveType::Long),
))])
.build()
.unwrap()
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn iceberg_catalog_bridge_lists_namespaces_as_schemas() {
let dir = tempfile::tempdir().unwrap();
let catalog = Arc::new(KrishivCatalog::local(dir.path()).await.unwrap());
catalog
.create_table("sales", "orders", sample_schema(), "")
.await
.unwrap();
let bridge = IcebergCatalogBridge::new(catalog, "iceberg");
let schemas = bridge.schema_names();
assert!(schemas.contains(&"sales".to_string()));
assert!(bridge.schema("sales").is_some());
assert!(bridge.schema("does_not_exist").is_none());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn iceberg_catalog_bridge_table_provider_returns_iceberg_provider() {
let dir = tempfile::tempdir().unwrap();
let catalog = Arc::new(KrishivCatalog::local(dir.path()).await.unwrap());
catalog
.create_table("sales", "orders", sample_schema(), "")
.await
.unwrap();
let bridge = IcebergCatalogBridge::new(catalog, "iceberg");
let schema = bridge.schema("sales").expect("namespace schema");
assert!(schema.table_exist("orders"));
let provider = schema.table("orders").await.unwrap();
assert!(provider.is_some(), "iceberg table provider expected");
let provider = provider.unwrap();
let arrow_schema = TableProvider::schema(&*provider);
assert!(arrow_schema.field_with_name("id").is_ok());
}
}