use std::collections::HashMap;
use std::sync::Arc;
use deepsize::DeepSizeOf;
use lance_core::cache::LanceCache;
use lance_core::{Error, Result};
use lance_index::IndexType;
use lance_io::object_store::ObjectStoreRegistry;
use crate::dataset::{DEFAULT_INDEX_CACHE_SIZE, DEFAULT_METADATA_CACHE_SIZE};
use crate::session::caches::GlobalMetadataCache;
use crate::session::index_caches::GlobalIndexCache;
use self::index_extension::IndexExtension;
pub(crate) mod caches;
pub(crate) mod index_caches;
pub(crate) mod index_extension;
#[derive(Clone)]
pub struct Session {
pub(crate) index_cache: GlobalIndexCache,
pub(crate) metadata_cache: caches::GlobalMetadataCache,
pub(crate) index_extensions: HashMap<(IndexType, String), Arc<dyn IndexExtension>>,
store_registry: Arc<ObjectStoreRegistry>,
}
impl DeepSizeOf for Session {
fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
let mut size = 0;
size += self.index_cache.deep_size_of_children(context);
size += self.metadata_cache.deep_size_of_children(context);
for ext in self.index_extensions.values() {
size += ext.deep_size_of_children(context);
}
size
}
}
impl std::fmt::Debug for Session {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Session")
.field(
"index_cache",
&format!("IndexCache(items={})", self.index_cache.0.approx_size(),),
)
.field(
"file_metadata_cache",
&format!(
"LanceCache(items={}, size_bytes={})",
self.metadata_cache.0.approx_size(),
self.metadata_cache.0.approx_size_bytes(),
),
)
.field(
"index_extensions",
&self.index_extensions.keys().collect::<Vec<_>>(),
)
.finish()
}
}
impl Session {
pub fn new(
index_cache_size: usize,
metadata_cache_size: usize,
store_registry: Arc<ObjectStoreRegistry>,
) -> Self {
Self {
index_cache: GlobalIndexCache(LanceCache::with_capacity(index_cache_size)),
metadata_cache: GlobalMetadataCache(LanceCache::with_capacity(metadata_cache_size)),
index_extensions: HashMap::new(),
store_registry,
}
}
pub fn register_index_extension(
&mut self,
name: String,
extension: Arc<dyn IndexExtension>,
) -> Result<()> {
match extension.index_type() {
IndexType::Vector => {
if self
.index_extensions
.contains_key(&(IndexType::Vector, name.clone()))
{
return Err(Error::invalid_input(format!(
"{name} is already registered"
)));
}
if let Some(ext) = extension.to_vector() {
self.index_extensions
.insert((IndexType::Vector, name), ext.to_generic());
} else {
return Err(Error::invalid_input(format!(
"{name} is not a vector index extension"
)));
}
}
_ => {
return Err(Error::invalid_input(format!(
"scalar index extension is not support yet: {}",
extension.index_type()
)));
}
}
Ok(())
}
pub fn size_bytes(&self) -> u64 {
self.deep_size_of() as u64
}
pub fn approx_num_items(&self) -> usize {
self.index_cache.0.approx_size()
+ self.metadata_cache.0.approx_size()
+ self.index_extensions.len()
}
pub fn store_registry(&self) -> Arc<ObjectStoreRegistry> {
self.store_registry.clone()
}
pub async fn metadata_cache_stats(&self) -> lance_core::cache::CacheStats {
self.metadata_cache.0.stats().await
}
pub async fn index_cache_stats(&self) -> lance_core::cache::CacheStats {
self.index_cache.0.stats().await
}
}
impl Default for Session {
fn default() -> Self {
Self::new(
DEFAULT_INDEX_CACHE_SIZE,
DEFAULT_METADATA_CACHE_SIZE,
Arc::new(ObjectStoreRegistry::default()),
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use lance_index::vector::VectorIndex;
#[tokio::test]
async fn test_disable_index_cache() {
let no_cache = Session::new(0, 0, Default::default());
assert!(
no_cache
.index_cache
.get_unsized::<dyn VectorIndex>("abc")
.await
.is_none()
);
}
}