use std::sync::Arc;
use iceberg::io::FileIO;
use iceberg::spec::TableMetadata;
use iceberg::table::Table;
use iceberg::{Result, TableIdent};
use moka::future::Cache;
pub const DEFAULT_TABLE_HANDLE_CACHE_CAPACITY: u64 = 16_384;
#[derive(Clone)]
pub(crate) struct TableHandleCache {
inner: Option<Cache<String, Table>>,
}
impl TableHandleCache {
pub(crate) fn new(capacity: u64) -> Self {
let inner = if capacity == 0 {
None
} else {
Some(
Cache::builder()
.name("nornir-catalog.table-handles")
.max_capacity(capacity)
.build(),
)
};
Self { inner }
}
pub(crate) async fn get(
&self,
metadata_location: &str,
identifier: &TableIdent,
) -> Option<Table> {
let hit = self.inner.as_ref()?.get(metadata_location).await?;
(hit.identifier() == identifier).then_some(hit)
}
pub(crate) async fn build_and_insert(
&self,
fileio: &FileIO,
identifier: &TableIdent,
metadata_location: String,
metadata: Arc<TableMetadata>,
) -> Result<Table> {
let built = build(fileio, identifier, metadata_location.clone(), metadata)?;
if let Some(cache) = &self.inner {
cache.insert(metadata_location, built.clone()).await;
}
Ok(built)
}
}
impl std::fmt::Debug for TableHandleCache {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.inner {
Some(c) => f
.debug_struct("TableHandleCache")
.field("entries", &c.entry_count())
.finish(),
None => f
.debug_struct("TableHandleCache")
.field("enabled", &false)
.finish(),
}
}
}
fn build(
fileio: &FileIO,
identifier: &TableIdent,
metadata_location: String,
metadata: Arc<TableMetadata>,
) -> Result<Table> {
Ok(Table::builder()
.file_io(fileio.clone())
.identifier(identifier.clone())
.metadata_location(metadata_location)
.metadata(metadata)
.build()?)
}