use std::sync::Arc;
use iceberg::io::FileIO;
use iceberg::spec::TableMetadata;
use iceberg::{Error, Result};
use moka::future::Cache;
pub const DEFAULT_METADATA_CACHE_BYTES: u64 = 128 * 1024 * 1024;
#[derive(Clone)]
pub(crate) struct MetadataCache {
inner: Option<Cache<String, Arc<TableMetadata>>>,
}
impl MetadataCache {
pub(crate) fn new(max_bytes: u64) -> Self {
let inner = if max_bytes == 0 {
None
} else {
Some(
Cache::builder()
.name("nornir-catalog.metadata")
.max_capacity(max_bytes)
.weigher(|_loc: &String, meta: &Arc<TableMetadata>| weight_of(meta))
.build(),
)
};
Self { inner }
}
pub(crate) async fn get_or_load(
&self,
fileio: &FileIO,
metadata_location: &str,
) -> Result<Arc<TableMetadata>> {
let Some(cache) = &self.inner else {
return TableMetadata::read_from(fileio, metadata_location)
.await
.map(Arc::new);
};
if let Some(hit) = cache.get(metadata_location).await {
return Ok(hit);
}
let fileio = fileio.clone();
let loc = metadata_location.to_string();
cache
.try_get_with_by_ref(metadata_location, async move {
TableMetadata::read_from(&fileio, &loc).await.map(Arc::new)
})
.await
.map_err(|e: Arc<Error>| Error::new(e.kind(), e.to_string()))
}
}
impl std::fmt::Debug for MetadataCache {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.inner {
Some(c) => f
.debug_struct("MetadataCache")
.field("entries", &c.entry_count())
.field("weighted_bytes", &c.weighted_size())
.finish(),
None => f
.debug_struct("MetadataCache")
.field("enabled", &false)
.finish(),
}
}
}
fn weight_of(meta: &TableMetadata) -> u32 {
const BASE: usize = 4 * 1024;
const PER_SNAPSHOT: usize = 1024;
const PER_SCHEMA: usize = 2 * 1024;
const PER_LOG: usize = 256;
let props: usize = meta
.properties()
.iter()
.map(|(k, v)| k.len() + v.len() + 32)
.sum();
let est = BASE
+ meta.snapshots().len() * PER_SNAPSHOT
+ meta.schemas_iter().len() * PER_SCHEMA
+ meta.metadata_log().len() * PER_LOG
+ props;
est.min(u32::MAX as usize) as u32
}