use std::any::Any;
use std::sync::Arc;
pub mod diskann;
#[allow(dead_code)]
mod graph;
pub mod ivf;
#[cfg(feature = "opq")]
pub mod opq;
pub mod pq;
mod traits;
mod utils;
use lance_core::io::Reader;
use lance_index::vector::{ivf::IvfBuildParams, pq::PQBuildParams};
use lance_linalg::distance::*;
use nohash_hasher::IntMap;
use object_store::path::Path;
use snafu::{location, Location};
use tracing::instrument;
use uuid::Uuid;
use self::{
ivf::{build_ivf_pq_index, remap_index_file, IVFIndex},
pq::PQIndex,
};
use super::{pb, DatasetIndexInternalExt, IndexParams};
#[cfg(feature = "opq")]
use crate::index::vector::opq::{OPQIndex, OptimizedProductQuantizer};
use crate::{
dataset::Dataset,
index::{
pb::vector_index_stage::Stage,
vector::{
diskann::{DiskANNIndex, DiskANNParams},
ivf::Ivf,
},
},
Error, Result,
};
pub use traits::*;
#[derive(Debug, Clone)]
pub enum StageParams {
Ivf(IvfBuildParams),
PQ(PQBuildParams),
DiskANN(DiskANNParams),
}
#[derive(Debug, Clone)]
pub struct VectorIndexParams {
pub stages: Vec<StageParams>,
pub metric_type: MetricType,
}
impl VectorIndexParams {
pub fn ivf_pq(
num_partitions: usize,
num_bits: u8,
num_sub_vectors: usize,
use_opq: bool,
metric_type: MetricType,
max_iterations: usize,
) -> Self {
let mut stages: Vec<StageParams> = vec![];
stages.push(StageParams::Ivf(IvfBuildParams::new(num_partitions)));
let pq_params = PQBuildParams {
num_bits: num_bits as usize,
num_sub_vectors,
use_opq,
max_iters: max_iterations,
max_opq_iters: max_iterations,
..Default::default()
};
stages.push(StageParams::PQ(pq_params));
Self {
stages,
metric_type,
}
}
pub fn with_ivf_pq_params(
metric_type: MetricType,
ivf: IvfBuildParams,
pq: PQBuildParams,
) -> Self {
let stages = vec![StageParams::Ivf(ivf), StageParams::PQ(pq)];
Self {
stages,
metric_type,
}
}
pub fn with_diskann_params(metric_type: MetricType, diskann: DiskANNParams) -> Self {
let stages = vec![StageParams::DiskANN(diskann)];
Self {
stages,
metric_type,
}
}
}
impl IndexParams for VectorIndexParams {
fn as_any(&self) -> &dyn Any {
self
}
}
fn is_ivf_pq(stages: &[StageParams]) -> bool {
if stages.len() < 2 {
return false;
}
let len = stages.len();
matches!(&stages[len - 1], StageParams::PQ(_))
&& matches!(&stages[len - 2], StageParams::Ivf(_))
}
fn is_diskann(stages: &[StageParams]) -> bool {
if stages.is_empty() {
return false;
}
let last = stages.last().unwrap();
matches!(last, StageParams::DiskANN(_))
}
#[instrument(skip(dataset))]
pub(crate) async fn build_vector_index(
dataset: &Dataset,
column: &str,
name: &str,
uuid: &str,
params: &VectorIndexParams,
) -> Result<()> {
let stages = ¶ms.stages;
if stages.is_empty() {
return Err(Error::Index {
message: "Build Vector Index: must have at least 1 stage".to_string(),
location: location!(),
});
};
if is_ivf_pq(stages) {
let len = stages.len();
let StageParams::Ivf(ivf_params) = &stages[len - 2] else {
return Err(Error::Index {
message: format!("Build Vector Index: invalid stages: {:?}", stages),
location: location!(),
});
};
let StageParams::PQ(pq_params) = &stages[len - 1] else {
return Err(Error::Index {
message: format!("Build Vector Index: invalid stages: {:?}", stages),
location: location!(),
});
};
build_ivf_pq_index(
dataset,
column,
name,
uuid,
params.metric_type,
ivf_params,
pq_params,
)
.await?
} else if is_diskann(stages) {
use self::diskann::build_diskann_index;
let StageParams::DiskANN(params) = stages.last().unwrap() else {
return Err(Error::Index {
message: format!("Build Vector Index: invalid stages: {:?}", stages),
location: location!(),
});
};
build_diskann_index(dataset, column, name, uuid, params.clone()).await?;
} else {
return Err(Error::Index {
message: format!("Build Vector Index: invalid stages: {:?}", stages),
location: location!(),
});
}
Ok(())
}
#[instrument(skip_all, fields(old_uuid = old_uuid.to_string(), new_uuid = new_uuid.to_string(), num_rows = mapping.len()))]
pub(crate) async fn remap_vector_index(
dataset: Arc<Dataset>,
column: &str,
old_uuid: &Uuid,
new_uuid: &Uuid,
old_metadata: &crate::format::Index,
mapping: &IntMap<u64, Option<u64>>,
) -> Result<()> {
let old_index = dataset
.open_vector_index(column, &old_uuid.to_string())
.await?;
old_index.check_can_remap()?;
let ivf_index: &IVFIndex =
old_index
.as_any()
.downcast_ref()
.ok_or_else(|| Error::NotSupported {
source: "Only IVF indexes can be remapped currently".into(),
location: location!(),
})?;
remap_index_file(
dataset.as_ref(),
&old_uuid.to_string(),
&new_uuid.to_string(),
old_metadata.dataset_version,
ivf_index,
mapping,
old_metadata.name.clone(),
column.to_string(),
vec![],
)
.await?;
Ok(())
}
#[instrument(level = "debug", skip(dataset, vec_idx, index_dir, reader))]
pub(crate) async fn open_vector_index(
dataset: Arc<Dataset>,
column: &str,
uuid: &str,
vec_idx: &lance_index::pb::VectorIndex,
index_dir: Path,
reader: Arc<dyn Reader>,
) -> Result<Arc<dyn VectorIndex>> {
let metric_type = pb::VectorMetricType::try_from(vec_idx.metric_type)?.into();
let mut last_stage: Option<Arc<dyn VectorIndex>> = None;
for stg in vec_idx.stages.iter().rev() {
match stg.stage.as_ref() {
#[allow(unused_variables)]
Some(Stage::Transform(tf)) => {
if last_stage.is_none() {
return Err(Error::Index {
message: format!("Invalid vector index stages: {:?}", vec_idx.stages),
location: location!(),
});
}
#[cfg(feature = "opq")]
match tf.r#type() {
pb::TransformType::Opq => {
let opq = OptimizedProductQuantizer::load(
reader.as_ref(),
tf.position as usize,
tf.shape
.iter()
.map(|s| *s as usize)
.collect::<Vec<_>>()
.as_slice(),
)
.await?;
last_stage = Some(Arc::new(OPQIndex::new(
last_stage.as_ref().unwrap().clone(),
opq,
)));
}
}
}
Some(Stage::Ivf(ivf_pb)) => {
if last_stage.is_none() {
return Err(Error::Index {
message: format!("Invalid vector index stages: {:?}", vec_idx.stages),
location: location!(),
});
}
let ivf = Ivf::try_from(ivf_pb)?;
last_stage = Some(Arc::new(IVFIndex::try_new(
dataset.session.clone(),
uuid,
ivf,
reader.clone(),
last_stage.unwrap(),
metric_type,
)?));
}
Some(Stage::Pq(pq_proto)) => {
if last_stage.is_some() {
return Err(Error::Index {
message: format!("Invalid vector index stages: {:?}", vec_idx.stages),
location: location!(),
});
};
let pq = lance_index::vector::pq::builder::from_proto(pq_proto, metric_type)?;
last_stage = Some(Arc::new(PQIndex::new(pq, metric_type)));
}
Some(Stage::Diskann(diskann_proto)) => {
if last_stage.is_some() {
return Err(Error::Index {
message: format!(
"DiskANN should be the only stage, but we got stages: {:?}",
vec_idx.stages
),
location: location!(),
});
};
let graph_path = index_dir.child(diskann_proto.filename.as_str());
let diskann =
Arc::new(DiskANNIndex::try_new(dataset.clone(), column, &graph_path).await?);
last_stage = Some(diskann);
}
_ => {}
}
}
if last_stage.is_none() {
return Err(Error::Index {
message: format!("Invalid index stages: {:?}", vec_idx.stages),
location: location!(),
});
}
let idx = last_stage.unwrap();
dataset.session.index_cache.insert_vector(uuid, idx.clone());
Ok(idx)
}