use std::collections::HashMap;
use std::sync::Arc;
use chrono::{DateTime, Utc};
use deepsize::DeepSizeOf;
use futures::StreamExt;
use lance_io::object_store::ObjectStore;
use object_store::path::Path;
use roaring::RoaringBitmap;
use uuid::Uuid;
use super::pb;
use lance_core::{Error, Result};
#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
pub struct IndexFile {
pub path: String,
pub size_bytes: u64,
}
#[derive(Debug, Clone, PartialEq)]
pub struct IndexMetadata {
pub uuid: Uuid,
pub fields: Vec<i32>,
pub name: String,
pub dataset_version: u64,
pub fragment_bitmap: Option<RoaringBitmap>,
pub index_details: Option<Arc<prost_types::Any>>,
pub index_version: i32,
pub created_at: Option<DateTime<Utc>>,
pub base_id: Option<u32>,
pub files: Option<Vec<IndexFile>>,
}
impl IndexMetadata {
pub fn effective_fragment_bitmap(
&self,
existing_fragments: &RoaringBitmap,
) -> Option<RoaringBitmap> {
let fragment_bitmap = self.fragment_bitmap.as_ref()?;
Some(fragment_bitmap & existing_fragments)
}
pub fn file_size_map(&self) -> HashMap<String, u64> {
self.files
.as_ref()
.map(|files| {
files
.iter()
.map(|f| (f.path.clone(), f.size_bytes))
.collect()
})
.unwrap_or_default()
}
pub fn total_size_bytes(&self) -> Option<u64> {
self.files
.as_ref()
.map(|files| files.iter().map(|f| f.size_bytes).sum())
}
pub fn deleted_fragment_bitmap(
&self,
existing_fragments: &RoaringBitmap,
) -> Option<RoaringBitmap> {
let fragment_bitmap = self.fragment_bitmap.as_ref()?;
Some(fragment_bitmap - existing_fragments)
}
}
impl DeepSizeOf for IndexMetadata {
fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
self.uuid.as_bytes().deep_size_of_children(context)
+ self.fields.deep_size_of_children(context)
+ self.name.deep_size_of_children(context)
+ self.dataset_version.deep_size_of_children(context)
+ self
.fragment_bitmap
.as_ref()
.map(|fragment_bitmap| fragment_bitmap.serialized_size())
.unwrap_or(0)
+ self.files.deep_size_of_children(context)
}
}
impl TryFrom<pb::IndexMetadata> for IndexMetadata {
type Error = Error;
fn try_from(proto: pb::IndexMetadata) -> Result<Self> {
let fragment_bitmap = if proto.fragment_bitmap.is_empty() {
None
} else {
Some(RoaringBitmap::deserialize_from(
&mut proto.fragment_bitmap.as_slice(),
)?)
};
let files = if proto.files.is_empty() {
None
} else {
Some(
proto
.files
.into_iter()
.map(|f| IndexFile {
path: f.path,
size_bytes: f.size_bytes,
})
.collect(),
)
};
Ok(Self {
uuid: proto.uuid.as_ref().map(Uuid::try_from).ok_or_else(|| {
Error::invalid_input("uuid field does not exist in Index metadata".to_string())
})??,
name: proto.name,
fields: proto.fields,
dataset_version: proto.dataset_version,
fragment_bitmap,
index_details: proto.index_details.map(Arc::new),
index_version: proto.index_version.unwrap_or_default(),
created_at: proto.created_at.map(|ts| {
DateTime::from_timestamp_millis(ts as i64)
.expect("Invalid timestamp in index metadata")
}),
base_id: proto.base_id,
files,
})
}
}
impl From<&IndexMetadata> for pb::IndexMetadata {
fn from(idx: &IndexMetadata) -> Self {
let mut fragment_bitmap = Vec::new();
if let Some(bitmap) = &idx.fragment_bitmap
&& let Err(e) = bitmap.serialize_into(&mut fragment_bitmap)
{
log::error!("Failed to serialize fragment bitmap: {}", e);
fragment_bitmap.clear();
}
let files = idx
.files
.as_ref()
.map(|files| {
files
.iter()
.map(|f| pb::IndexFile {
path: f.path.clone(),
size_bytes: f.size_bytes,
})
.collect()
})
.unwrap_or_default();
Self {
uuid: Some((&idx.uuid).into()),
name: idx.name.clone(),
fields: idx.fields.clone(),
dataset_version: idx.dataset_version,
fragment_bitmap,
index_details: idx
.index_details
.as_ref()
.map(|details| details.as_ref().clone()),
index_version: Some(idx.index_version),
created_at: idx.created_at.map(|dt| dt.timestamp_millis() as u64),
base_id: idx.base_id,
files,
}
}
}
pub async fn list_index_files_with_sizes(
object_store: &ObjectStore,
index_dir: &Path,
) -> Result<Vec<IndexFile>> {
let mut files = Vec::new();
let mut stream = object_store.read_dir_all(index_dir, None);
while let Some(meta) = stream.next().await {
let meta = meta?;
let relative_path = meta
.location
.as_ref()
.strip_prefix(index_dir.as_ref())
.map(|s| s.trim_start_matches('/').to_string())
.unwrap_or_else(|| meta.location.filename().unwrap_or("").to_string());
files.push(IndexFile {
path: relative_path,
size_bytes: meta.size,
});
}
Ok(files)
}