use std::any::Any;
use std::fmt;
use std::sync::Arc;
use async_trait::async_trait;
use uuid::Uuid;
pub mod pb {
#![allow(clippy::use_self)]
include!(concat!(env!("OUT_DIR"), "/lance.index.pb.rs"));
}
pub(crate) mod cache;
pub(crate) mod prefilter;
pub mod vector;
use crate::dataset::transaction::{Operation, Transaction};
use crate::format::Index as IndexMetadata;
use crate::io::commit::commit_transaction;
use crate::session::Session;
use crate::{dataset::Dataset, Error, Result};
use self::vector::{build_vector_index, VectorIndexParams};
pub(crate) trait Index: Send + Sync {
fn as_any(&self) -> &dyn Any;
fn statistics(&self) -> Result<serde_json::Value>;
}
pub enum IndexType {
Vector = 100,
}
impl fmt::Display for IndexType {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Vector => write!(f, "Vector"),
}
}
}
#[async_trait]
pub trait IndexBuilder {
fn index_type() -> IndexType;
async fn build(&self) -> Result<()>;
}
pub trait IndexParams: Send + Sync {
fn as_any(&self) -> &dyn Any;
}
#[async_trait]
pub trait DatasetIndexExt {
async fn create_index(
&self,
columns: &[&str],
index_type: IndexType,
name: Option<String>,
params: &dyn IndexParams,
replace: bool,
) -> Result<Dataset>;
}
#[async_trait]
impl DatasetIndexExt for Dataset {
async fn create_index(
&self,
columns: &[&str],
index_type: IndexType,
name: Option<String>,
params: &dyn IndexParams,
replace: bool,
) -> Result<Self> {
if columns.len() != 1 {
return Err(Error::Index {
message: "Only support building index on 1 column at the moment".to_string(),
});
}
let column = columns[0];
let Some(field) = self.schema().field(column) else {
return Err(Error::Index {
message: format!("CreateIndex: column '{column}' does not exist"),
});
};
let indices = self.load_indices().await?;
let index_name = name.unwrap_or(format!("{column}_idx"));
if let Some(idx) = indices.iter().find(|i| i.name == index_name) {
if idx.fields == [field.id] && !replace {
return Err(Error::Index {
message: format!(
"Index name '{index_name} already exists, \
please specify a different name or use replace=True"
),
});
};
if idx.fields != [field.id] {
return Err(Error::Index {
message: format!(
"Index name '{index_name} already exists with different fields, \
please specify a different name"
),
});
}
}
let index_id = Uuid::new_v4();
match index_type {
IndexType::Vector => {
let vec_params = params
.as_any()
.downcast_ref::<VectorIndexParams>()
.ok_or_else(|| Error::Index {
message: "Vector index type must take a VectorIndexParams".to_string(),
})?;
build_vector_index(self, column, &index_name, &index_id.to_string(), vec_params)
.await?;
}
}
let new_idx = IndexMetadata::new(index_id, &index_name, &[field.id], self.manifest.version);
let transaction = Transaction::new(
self.manifest.version,
Operation::CreateIndex {
new_indices: vec![new_idx],
},
None,
);
let new_manifest = commit_transaction(
self,
self.object_store(),
&transaction,
&Default::default(),
&Default::default(),
)
.await?;
Ok(Self {
object_store: self.object_store.clone(),
base: self.base.clone(),
manifest: Arc::new(new_manifest),
session: Arc::new(Session::default()),
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use arrow_array::{FixedSizeListArray, RecordBatch, RecordBatchIterator};
use arrow_schema::{DataType, Field, Schema};
use tempfile::tempdir;
use crate::{arrow::*, index::vector::MetricType, utils::datagen::generate_random_array};
#[tokio::test]
async fn test_recreate_index() {
const DIM: i32 = 8;
let schema = Arc::new(Schema::new(vec![
Field::new(
"v",
DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), DIM),
true,
),
Field::new(
"o",
DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), DIM),
true,
),
]));
let data = generate_random_array(2048 * DIM as usize);
let batches: Vec<RecordBatch> = vec![RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(FixedSizeListArray::try_new_from_values(data.clone(), DIM).unwrap()),
Arc::new(FixedSizeListArray::try_new_from_values(data, DIM).unwrap()),
],
)
.unwrap()];
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
let dataset = Dataset::write(reader, test_uri, None).await.unwrap();
let params = VectorIndexParams::ivf_pq(2, 8, 2, false, MetricType::L2, 2);
let dataset = dataset
.create_index(&["v"], IndexType::Vector, None, ¶ms, true)
.await
.unwrap();
let dataset = dataset
.create_index(&["o"], IndexType::Vector, None, ¶ms, true)
.await
.unwrap();
let dataset = dataset
.create_index(&["v"], IndexType::Vector, None, ¶ms, true)
.await
.unwrap();
assert!(dataset
.create_index(
&["v"],
IndexType::Vector,
Some("o_idx".to_string()),
¶ms,
true,
)
.await
.is_err());
}
}