use std::any::Any;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use async_trait::async_trait;
use uuid::Uuid;
pub mod pb {
#![allow(clippy::use_self)]
include!(concat!(env!("OUT_DIR"), "/lance.index.pb.rs"));
}
pub(crate) mod append;
pub(crate) mod cache;
pub(crate) mod prefilter;
pub mod vector;
use crate::dataset::transaction::{Operation, Transaction};
use crate::format::Index as IndexMetadata;
use crate::index::append::append_index;
use crate::index::vector::remap_vector_index;
use crate::io::commit::commit_transaction;
use crate::{dataset::Dataset, Error, Result};
use self::vector::{build_vector_index, VectorIndexParams};
pub(crate) trait Index: Send + Sync {
fn as_any(&self) -> &dyn Any;
fn statistics(&self) -> Result<serde_json::Value>;
}
pub enum IndexType {
Vector = 100,
}
impl fmt::Display for IndexType {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Vector => write!(f, "Vector"),
}
}
}
#[async_trait]
pub trait IndexBuilder {
fn index_type() -> IndexType;
async fn build(&self) -> Result<()>;
}
pub trait IndexParams: Send + Sync {
fn as_any(&self) -> &dyn Any;
}
pub(crate) async fn remap_index(
dataset: &Dataset,
index_id: &Uuid,
row_id_map: &HashMap<u64, Option<u64>>,
) -> Result<Uuid> {
let indices = dataset.load_indices().await?;
let matched = indices
.iter()
.find(|i| i.uuid == *index_id)
.ok_or_else(|| Error::Index {
message: format!("Index with id {} does not exist", index_id),
})?;
if matched.fields.len() > 1 {
return Err(Error::Index {
message: "Remapping indices with multiple fields is not supported".to_string(),
});
}
let field = matched
.fields
.first()
.expect("An index existed with no fields");
let field = dataset.schema().field_by_id(*field).unwrap();
let new_id = Uuid::new_v4();
remap_vector_index(
Arc::new(dataset.clone()),
&field.name,
index_id,
&new_id,
matched,
row_id_map,
)
.await?;
Ok(new_id)
}
#[async_trait]
pub trait DatasetIndexExt {
async fn create_index(
&mut self,
columns: &[&str],
index_type: IndexType,
name: Option<String>,
params: &dyn IndexParams,
replace: bool,
) -> Result<()>;
async fn optimize_indices(&mut self) -> Result<()>;
}
#[async_trait]
impl DatasetIndexExt for Dataset {
async fn create_index(
&mut self,
columns: &[&str],
index_type: IndexType,
name: Option<String>,
params: &dyn IndexParams,
replace: bool,
) -> Result<()> {
if columns.len() != 1 {
return Err(Error::Index {
message: "Only support building index on 1 column at the moment".to_string(),
});
}
let column = columns[0];
let Some(field) = self.schema().field(column) else {
return Err(Error::Index {
message: format!("CreateIndex: column '{column}' does not exist"),
});
};
let indices = self.load_indices().await?;
let index_name = name.unwrap_or(format!("{column}_idx"));
if let Some(idx) = indices.iter().find(|i| i.name == index_name) {
if idx.fields == [field.id] && !replace {
return Err(Error::Index {
message: format!(
"Index name '{index_name} already exists, \
please specify a different name or use replace=True"
),
});
};
if idx.fields != [field.id] {
return Err(Error::Index {
message: format!(
"Index name '{index_name} already exists with different fields, \
please specify a different name"
),
});
}
}
let index_id = Uuid::new_v4();
match index_type {
IndexType::Vector => {
let vec_params = params
.as_any()
.downcast_ref::<VectorIndexParams>()
.ok_or_else(|| Error::Index {
message: "Vector index type must take a VectorIndexParams".to_string(),
})?;
build_vector_index(self, column, &index_name, &index_id.to_string(), vec_params)
.await?;
}
}
let new_idx = IndexMetadata {
uuid: index_id,
name: index_name,
fields: vec![field.id],
dataset_version: self.manifest.version,
fragment_bitmap: Some(self.get_fragments().iter().map(|f| f.id() as u32).collect()),
};
let transaction = Transaction::new(
self.manifest.version,
Operation::CreateIndex {
new_indices: vec![new_idx],
removed_indices: vec![],
},
None,
);
let new_manifest = commit_transaction(
self,
self.object_store(),
&transaction,
&Default::default(),
&Default::default(),
)
.await?;
self.manifest = Arc::new(new_manifest);
Ok(())
}
async fn optimize_indices(&mut self) -> Result<()> {
let dataset = Arc::new(self.clone());
let indices = self.load_indices().await?;
let mut new_indices = vec![];
let mut removed_indices = vec![];
for idx in indices.as_slice() {
if idx.dataset_version == self.manifest.version {
continue;
}
let Some(new_id) = append_index(dataset.clone(), idx).await? else {
continue;
};
let new_idx = IndexMetadata {
uuid: new_id,
name: idx.name.clone(),
fields: idx.fields.clone(),
dataset_version: self.manifest.version,
fragment_bitmap: Some(self.get_fragments().iter().map(|f| f.id() as u32).collect()),
};
removed_indices.push(idx.clone());
new_indices.push(new_idx);
}
if new_indices.is_empty() {
return Ok(());
}
let transaction = Transaction::new(
self.manifest.version,
Operation::CreateIndex {
new_indices,
removed_indices,
},
None,
);
let new_manifest = commit_transaction(
self,
self.object_store(),
&transaction,
&Default::default(),
&Default::default(),
)
.await?;
self.manifest = Arc::new(new_manifest);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use arrow_array::{FixedSizeListArray, RecordBatch, RecordBatchIterator};
use arrow_schema::{DataType, Field, Schema};
use lance_arrow::*;
use lance_linalg::distance::MetricType;
use lance_testing::datagen::generate_random_array;
use tempfile::tempdir;
#[tokio::test]
async fn test_recreate_index() {
const DIM: i32 = 8;
let schema = Arc::new(Schema::new(vec![
Field::new(
"v",
DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), DIM),
true,
),
Field::new(
"o",
DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), DIM),
true,
),
]));
let data = generate_random_array(2048 * DIM as usize);
let batches: Vec<RecordBatch> = vec![RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(FixedSizeListArray::try_new_from_values(data.clone(), DIM).unwrap()),
Arc::new(FixedSizeListArray::try_new_from_values(data, DIM).unwrap()),
],
)
.unwrap()];
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
let params = VectorIndexParams::ivf_pq(2, 8, 2, false, MetricType::L2, 2);
dataset
.create_index(&["v"], IndexType::Vector, None, ¶ms, true)
.await
.unwrap();
dataset
.create_index(&["o"], IndexType::Vector, None, ¶ms, true)
.await
.unwrap();
dataset
.create_index(&["v"], IndexType::Vector, None, ¶ms, true)
.await
.unwrap();
assert!(dataset
.create_index(
&["v"],
IndexType::Vector,
Some("o_idx".to_string()),
¶ms,
true,
)
.await
.is_err());
}
}