pub mod frag_reuse;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use crate::Dataset;
use crate::dataset::optimize::RemappedIndex;
use crate::dataset::optimize::remapping::RemapResult;
use crate::index::remap_index;
use crate::index::scalar::infer_scalar_index_details;
use arrow_schema::DataType;
use async_trait::async_trait;
use lance_core::{Error, Result};
use lance_encoding::version::LanceFileVersion;
use lance_index::DatasetIndexExt;
use lance_index::frag_reuse::FRAG_REUSE_INDEX_NAME;
use lance_index::scalar::lance_format::LanceIndexStore;
use lance_table::format::IndexMetadata;
use lance_table::format::pb::VectorIndexDetails;
use serde::{Deserialize, Serialize};
use super::optimize::{IndexRemapper, IndexRemapperOptions};
#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct DatasetIndexRemapperOptions {}
impl IndexRemapperOptions for DatasetIndexRemapperOptions {
fn create_remapper(
&self,
dataset: &Dataset,
) -> crate::Result<Box<dyn super::optimize::IndexRemapper>> {
Ok(Box::new(DatasetIndexRemapper {
dataset: Arc::new(dataset.clone()),
}))
}
}
struct DatasetIndexRemapper {
dataset: Arc<Dataset>,
}
impl DatasetIndexRemapper {
async fn remap_index(
&self,
index: &IndexMetadata,
mapping: &HashMap<u64, Option<u64>>,
) -> Result<RemapResult> {
remap_index(&self.dataset, &index.uuid, mapping).await
}
}
#[async_trait]
impl IndexRemapper for DatasetIndexRemapper {
async fn remap_indices(
&self,
mapping: HashMap<u64, Option<u64>>,
affected_fragment_ids: &[u64],
) -> Result<Vec<RemappedIndex>> {
let affected_frag_ids = HashSet::<u64>::from_iter(affected_fragment_ids.iter().copied());
let indices = self.dataset.load_indices().await?;
let mut remapped = Vec::with_capacity(indices.len());
for index in indices.iter() {
let needs_remapped = index.name != FRAG_REUSE_INDEX_NAME
&& match &index.fragment_bitmap {
None => true,
Some(fragment_bitmap) => fragment_bitmap
.iter()
.any(|frag_idx| affected_frag_ids.contains(&(frag_idx as u64))),
};
if needs_remapped {
let remap_result = self.remap_index(index, &mapping).await?;
match remap_result {
RemapResult::Drop => continue,
RemapResult::Keep(id) => {
let index_details = match &index.index_details {
Some(index_details) => index_details.as_ref().clone(),
None => {
assert!(index.fields.len() == 1);
let field = index.fields.first().unwrap();
let field =
self.dataset.schema().field_by_id(*field).ok_or_else(|| {
Error::internal(format!(
"Index {} references field {} which does not exist",
index.uuid, field
))
})?;
if matches!(field.data_type(), DataType::FixedSizeList(..)) {
prost_types::Any::from_msg(&VectorIndexDetails::default())?
} else {
infer_scalar_index_details(&self.dataset, &field.name, index)
.await?
.as_ref()
.clone()
}
}
};
remapped.push(RemappedIndex {
old_id: id,
new_id: id,
index_details,
index_version: index.index_version as u32,
files: index.files.clone(),
});
}
RemapResult::Remapped(remapped_index) => {
remapped.push(remapped_index);
}
}
}
}
Ok(remapped)
}
}
pub trait LanceIndexStoreExt {
fn from_dataset_for_new(dataset: &Dataset, uuid: &str) -> Result<Self>
where
Self: Sized;
fn from_dataset_for_existing(dataset: &Dataset, index: &IndexMetadata) -> Result<Self>
where
Self: Sized;
}
pub(crate) fn dataset_format_version(dataset: &Dataset) -> LanceFileVersion {
dataset
.manifest
.data_storage_format
.lance_file_version()
.ok()
.map(|v| v.resolve().max(LanceFileVersion::V2_0))
.unwrap_or(LanceFileVersion::V2_0)
}
impl LanceIndexStoreExt for LanceIndexStore {
fn from_dataset_for_new(dataset: &Dataset, uuid: &str) -> Result<Self> {
let index_dir = dataset.indices_dir().child(uuid);
let cache = dataset.metadata_cache.file_metadata_cache(&index_dir);
let format_version = dataset_format_version(dataset);
Ok(Self::with_format_version(
dataset.object_store.clone(),
index_dir,
Arc::new(cache),
format_version,
))
}
fn from_dataset_for_existing(dataset: &Dataset, index: &IndexMetadata) -> Result<Self> {
let index_dir = dataset
.indice_files_dir(index)?
.child(index.uuid.to_string());
let cache = dataset.metadata_cache.file_metadata_cache(&index_dir);
let format_version = dataset_format_version(dataset);
let store = Self::with_format_version(
dataset.object_store.clone(),
index_dir,
Arc::new(cache),
format_version,
);
Ok(store.with_file_sizes(index.file_size_map()))
}
}