use std::sync::Arc;
use async_trait::async_trait;
use lance_core::{Error, Result};
use lance_index::DatasetIndexExt;
use lance_index::mem_wal::{MEM_WAL_INDEX_NAME, MemWalIndexDetails, RegionSpec};
use lance_index::vector::ivf::storage::IvfModel;
use lance_index::vector::pq::ProductQuantizer;
use lance_io::object_store::ObjectStore;
use lance_linalg::distance::DistanceType;
use uuid::Uuid;
use crate::Dataset;
use crate::dataset::CommitBuilder;
use crate::dataset::transaction::{Operation, Transaction};
use crate::index::DatasetIndexInternalExt;
use crate::index::mem_wal::new_mem_wal_index_meta;
use super::RegionWriterConfig;
use super::write::MemIndexConfig;
use super::write::RegionWriter;
#[derive(Debug, Clone, Default)]
pub struct MemWalConfig {
pub region_spec: Option<RegionSpec>,
pub maintained_indexes: Vec<String>,
}
#[async_trait]
pub trait DatasetMemWalExt {
async fn initialize_mem_wal(&mut self, config: MemWalConfig) -> Result<()>;
async fn mem_wal_writer(
&self,
region_id: Uuid,
config: RegionWriterConfig,
) -> Result<RegionWriter>;
}
#[async_trait]
impl DatasetMemWalExt for Dataset {
async fn initialize_mem_wal(&mut self, config: MemWalConfig) -> Result<()> {
let pk_fields = self.schema().unenforced_primary_key();
if pk_fields.is_empty() {
return Err(Error::invalid_input(
"MemWAL requires a primary key on the dataset. \
Define a primary key using the 'lance-schema:unenforced-primary-key' Arrow field metadata.",
));
}
let indices = self.load_indices().await?;
for index_name in &config.maintained_indexes {
if !indices.iter().any(|idx| &idx.name == index_name) {
return Err(Error::invalid_input(format!(
"Index '{}' not found on dataset. maintained_indexes must reference existing indexes.",
index_name
)));
}
}
if indices.iter().any(|idx| idx.name == MEM_WAL_INDEX_NAME) {
return Err(Error::invalid_input(
"MemWAL is already initialized on this dataset. Use update methods instead.",
));
}
let details = MemWalIndexDetails {
region_specs: config.region_spec.into_iter().collect(),
maintained_indexes: config.maintained_indexes,
..Default::default()
};
let index_meta = new_mem_wal_index_meta(self.manifest.version, details)?;
let transaction = Transaction::new(
self.manifest.version,
Operation::CreateIndex {
new_indices: vec![index_meta],
removed_indices: vec![],
},
None,
);
let new_dataset = CommitBuilder::new(Arc::new(self.clone()))
.execute(transaction)
.await?;
*self = new_dataset;
Ok(())
}
async fn mem_wal_writer(
&self,
region_id: Uuid,
mut config: RegionWriterConfig,
) -> Result<RegionWriter> {
use lance_index::metrics::NoOpMetricsCollector;
let mem_wal_index = self
.open_mem_wal_index(&NoOpMetricsCollector)
.await?
.ok_or_else(|| {
Error::invalid_input(
"MemWAL is not initialized on this dataset. Call initialize_mem_wal() first.",
)
})?;
let maintained_indexes = &mem_wal_index.details.maintained_indexes;
let mut index_configs = Vec::new();
for index_name in maintained_indexes {
let index_meta = self.load_index_by_name(index_name).await?.ok_or_else(|| {
Error::invalid_input(format!(
"Index '{}' from maintained_indexes not found on dataset",
index_name
))
})?;
let type_url = index_meta
.index_details
.as_ref()
.map(|d| d.type_url.as_str())
.unwrap_or("");
let index_type = MemIndexConfig::detect_index_type(type_url)?;
match index_type {
"btree" => {
index_configs.push(MemIndexConfig::btree_from_metadata(
&index_meta,
self.schema(),
)?);
}
"fts" => {
index_configs.push(MemIndexConfig::fts_from_metadata(
&index_meta,
self.schema(),
)?);
}
"vector" => {
let vector_config =
load_vector_index_config(self, index_name, &index_meta).await?;
index_configs.push(vector_config);
}
_ => {
return Err(Error::invalid_input(format!(
"Unknown index type: {}",
index_type
)));
}
};
}
config.region_id = region_id;
let base_uri = self.uri();
let (store, base_path) = ObjectStore::from_uri(base_uri).await?;
RegionWriter::open(
store,
base_path,
base_uri,
config,
Arc::new(self.schema().into()),
index_configs,
)
.await
}
}
async fn load_vector_index_config(
dataset: &Dataset,
index_name: &str,
index_meta: &lance_table::format::IndexMetadata,
) -> Result<MemIndexConfig> {
use lance_index::metrics::NoOpMetricsCollector;
let field_id = index_meta.fields.first().ok_or_else(|| {
Error::invalid_input(format!("Vector index '{}' has no fields", index_name))
})?;
let field = dataset.schema().field_by_id(*field_id).ok_or_else(|| {
Error::invalid_input(format!("Field not found for vector index '{}'", index_name))
})?;
let column = field.name.clone();
let index_uuid = index_meta.uuid.to_string();
let (ivf_model, pq, distance_type) = load_ivf_pq_components(
dataset,
index_name,
&index_uuid,
&column,
&NoOpMetricsCollector,
)
.await?;
Ok(MemIndexConfig::ivf_pq(
index_name.to_string(),
*field_id,
column,
ivf_model,
pq,
distance_type,
))
}
async fn load_ivf_pq_components(
dataset: &Dataset,
index_name: &str,
index_uuid: &str,
column_name: &str,
metrics: &dyn lance_index::metrics::MetricsCollector,
) -> Result<(IvfModel, ProductQuantizer, DistanceType)> {
use crate::index::vector::ivf::v2::IvfPq;
use lance_index::vector::VectorIndex;
let index = dataset
.open_vector_index(column_name, index_uuid, metrics)
.await?;
let ivf_index = index.as_any().downcast_ref::<IvfPq>().ok_or_else(|| {
Error::invalid_input(format!(
"Vector index '{}' is not an IVF-PQ index. Only IVF-PQ indexes are supported for MemWAL.",
index_name
))
})?;
let ivf_model = ivf_index.ivf_model().clone();
let distance_type = ivf_index.metric_type();
let quantizer = ivf_index.quantizer();
let pq = ProductQuantizer::try_from(quantizer)?;
Ok((ivf_model, pq, distance_type))
}