use manifoldb_core::{Entity, EntityId, PointId, TransactionError, Value};
use manifoldb_storage::Transaction;
use manifoldb_vector::distance::DistanceMetric;
use manifoldb_vector::index::{
clear_index_tx, delete_node_tx, hnsw_table_name, load_graph_tx, load_metadata_tx,
save_graph_tx, search_layer_filtered, FilteredSearchConfig, HnswConfig, HnswGraph,
HnswIndexEntry, HnswNode, HnswRegistry, SearchResult,
};
use manifoldb_vector::types::Embedding;
use crate::collection::VectorConfig;
use crate::transaction::DatabaseTransaction;
#[derive(Debug, thiserror::Error)]
pub enum VectorIndexError {
#[error("Index already exists: {0}")]
IndexExists(String),
#[error("Index not found: {0}")]
IndexNotFound(String),
#[error("Table not found: {0}")]
TableNotFound(String),
#[error("Column not found: {0}")]
ColumnNotFound(String),
#[error("Dimension mismatch: expected {expected}, got {actual}")]
DimensionMismatch {
expected: usize,
actual: usize,
},
#[error("Invalid vector in entity {entity_id}: {reason}")]
InvalidVector {
entity_id: EntityId,
reason: String,
},
#[error("Transaction error: {0}")]
Transaction(#[from] TransactionError),
#[error("Vector error: {0}")]
Vector(#[from] manifoldb_vector::error::VectorError),
#[error("Storage error: {0}")]
Storage(String),
#[error("Storage error: {0}")]
StorageError(#[from] manifoldb_storage::StorageError),
}
pub struct HnswIndexBuilder {
name: String,
table: String,
column: String,
dimension: Option<usize>,
distance_metric: DistanceMetric,
config: HnswConfig,
}
impl HnswIndexBuilder {
#[must_use]
pub fn new(
name: impl Into<String>,
table: impl Into<String>,
column: impl Into<String>,
) -> Self {
Self {
name: name.into(),
table: table.into(),
column: column.into(),
dimension: None,
distance_metric: DistanceMetric::Cosine,
config: HnswConfig::default(),
}
}
#[must_use]
pub const fn dimension(mut self, dimension: usize) -> Self {
self.dimension = Some(dimension);
self
}
#[must_use]
pub const fn distance_metric(mut self, metric: DistanceMetric) -> Self {
self.distance_metric = metric;
self
}
#[must_use]
pub fn m(mut self, m: usize) -> Self {
self.config.m = m;
self.config.m_max0 = 2 * m;
self
}
#[must_use]
pub fn ef_construction(mut self, ef: usize) -> Self {
self.config.ef_construction = ef;
self
}
#[must_use]
pub fn ef_search(mut self, ef: usize) -> Self {
self.config.ef_search = ef;
self
}
pub fn build<T: Transaction>(
self,
tx: &mut DatabaseTransaction<T>,
) -> Result<(), VectorIndexError> {
use crate::collection::{CollectionManager, CollectionName};
use manifoldb_storage::Cursor;
use manifoldb_vector::{
encoding::{decode_collection_vector_key, encode_collection_vector_prefix},
TABLE_COLLECTION_VECTORS,
};
use std::ops::Bound;
let storage = tx.storage_ref()?;
if HnswRegistry::exists(storage, &self.name)? {
return Err(VectorIndexError::IndexExists(self.name));
}
let collection_name = CollectionName::new(&self.table)
.map_err(|e| VectorIndexError::Storage(e.to_string()))?;
let collection_id = CollectionManager::get(tx, &collection_name)
.map_err(|e| VectorIndexError::Storage(e.to_string()))?
.map(|c| c.id());
let mut vectors: Vec<(EntityId, Embedding)> = Vec::new();
let mut inferred_dimension: Option<usize> = self.dimension;
if let Some(coll_id) = collection_id {
let prefix = encode_collection_vector_prefix(coll_id);
let prefix_end = {
let mut end = prefix.clone();
for byte in end.iter_mut().rev() {
if *byte < 0xFF {
*byte += 1;
break;
}
}
end
};
let target_hash = manifoldb_vector::encoding::hash_name(&self.column);
let storage = tx.storage_ref()?;
let mut cursor = storage.range(
TABLE_COLLECTION_VECTORS,
Bound::Included(prefix.as_slice()),
Bound::Excluded(prefix_end.as_slice()),
)?;
while let Some((ref key, ref value)) = cursor.next()? {
if let Some(decoded) = decode_collection_vector_key(&key) {
if decoded.vector_name_hash == target_hash {
if let Ok((vector_data, _)) = manifoldb_vector::decode_vector_value(&value)
{
if let Some(dense) = vector_data.as_dense() {
let embedding = Embedding::new(dense.to_vec())
.map_err(|e| VectorIndexError::Vector(e))?;
match inferred_dimension {
None => inferred_dimension = Some(embedding.len()),
Some(dim) if dim != embedding.len() => {
return Err(VectorIndexError::DimensionMismatch {
expected: dim,
actual: embedding.len(),
});
}
_ => {}
}
vectors.push((decoded.entity_id, embedding));
}
}
}
}
}
}
let dimension = inferred_dimension.unwrap_or(1);
let mut graph = HnswGraph::new(dimension, self.distance_metric);
for (entity_id, embedding) in vectors {
insert_into_graph(&mut graph, entity_id, embedding, &self.config)?;
}
let table_name = hnsw_table_name(&self.name);
let storage = tx.storage_mut_ref()?;
save_graph_tx(storage, &table_name, &graph, &self.config)?;
let entry = HnswIndexEntry::new(
&self.name,
&self.table,
&self.column,
dimension,
self.distance_metric,
&self.config,
);
HnswRegistry::register(storage, &entry)?;
Ok(())
}
}
pub fn drop_index<T: Transaction>(
tx: &mut DatabaseTransaction<T>,
name: &str,
if_exists: bool,
) -> Result<bool, VectorIndexError> {
let storage = tx.storage_ref()?;
if !HnswRegistry::exists(storage, name)? {
if if_exists {
return Ok(false);
}
return Err(VectorIndexError::IndexNotFound(name.to_string()));
}
let table_name = hnsw_table_name(name);
let storage = tx.storage_mut_ref()?;
clear_index_tx(storage, &table_name)?;
HnswRegistry::drop(storage, name)?;
Ok(true)
}
pub fn load_index<T: Transaction>(
tx: &DatabaseTransaction<T>,
name: &str,
) -> Result<(HnswGraph, HnswConfig), VectorIndexError> {
let storage = tx.storage_ref()?;
let entry = HnswRegistry::get(storage, name)?
.ok_or_else(|| VectorIndexError::IndexNotFound(name.to_string()))?;
let table_name = hnsw_table_name(name);
let metadata = load_metadata_tx(storage, &table_name)?
.ok_or_else(|| VectorIndexError::IndexNotFound(name.to_string()))?;
let graph = load_graph_tx(storage, &table_name, &metadata)?;
Ok((graph, entry.config()))
}
pub fn find_index_for_column<T: Transaction>(
tx: &DatabaseTransaction<T>,
table: &str,
column: &str,
) -> Result<Option<String>, VectorIndexError> {
let storage = tx.storage_ref()?;
let indexes = HnswRegistry::list_for_table(storage, table)?;
for entry in indexes {
if entry.column == column {
return Ok(Some(entry.name.clone()));
}
}
Ok(None)
}
pub fn search_index<T: Transaction>(
tx: &DatabaseTransaction<T>,
index_name: &str,
query: &Embedding,
k: usize,
ef_search: Option<usize>,
) -> Result<Vec<SearchResult>, VectorIndexError> {
let (graph, config) = load_index(tx, index_name)?;
if graph.dimension != query.len() {
return Err(VectorIndexError::DimensionMismatch {
expected: graph.dimension,
actual: query.len(),
});
}
if graph.nodes.is_empty() {
return Ok(Vec::new());
}
let entry_point =
graph.entry_point.ok_or_else(|| VectorIndexError::Storage("no entry point".to_string()))?;
let ef = ef_search.unwrap_or(config.ef_search).max(k);
let mut current_ep = vec![entry_point];
for layer in (1..=graph.max_layer).rev() {
let candidates =
manifoldb_vector::index::search_layer(&graph, query, ¤t_ep, 1, layer);
current_ep = candidates.into_iter().map(|c| c.entity_id).collect();
if current_ep.is_empty() {
current_ep = vec![entry_point];
}
}
let candidates = manifoldb_vector::index::search_layer(&graph, query, ¤t_ep, ef, 0);
let results: Vec<SearchResult> = candidates
.into_iter()
.take(k)
.map(|c| SearchResult::new(c.entity_id, c.distance))
.collect();
Ok(results)
}
pub fn search_index_filtered<T, F>(
tx: &DatabaseTransaction<T>,
index_name: &str,
query: &Embedding,
k: usize,
predicate: F,
ef_search: Option<usize>,
filter_config: Option<FilteredSearchConfig>,
) -> Result<Vec<SearchResult>, VectorIndexError>
where
T: Transaction,
F: Fn(EntityId) -> bool,
{
let (graph, config) = load_index(tx, index_name)?;
if graph.dimension != query.len() {
return Err(VectorIndexError::DimensionMismatch {
expected: graph.dimension,
actual: query.len(),
});
}
if graph.nodes.is_empty() {
return Ok(Vec::new());
}
let entry_point =
graph.entry_point.ok_or_else(|| VectorIndexError::Storage("no entry point".to_string()))?;
let fc = filter_config.unwrap_or_default();
let base_ef = ef_search.unwrap_or(config.ef_search).max(k);
let ef = fc.adjusted_ef(base_ef, None);
let mut current_ep = vec![entry_point];
for layer in (1..=graph.max_layer).rev() {
let candidates =
manifoldb_vector::index::search_layer(&graph, query, ¤t_ep, 1, layer);
current_ep = candidates.into_iter().map(|c| c.entity_id).collect();
if current_ep.is_empty() {
current_ep = vec![entry_point];
}
}
let candidates = search_layer_filtered(&graph, query, ¤t_ep, ef, 0, &predicate);
let results: Vec<SearchResult> = candidates
.into_iter()
.take(k)
.map(|c| SearchResult::new(c.entity_id, c.distance))
.collect();
Ok(results)
}
pub fn update_entity_in_indexes<T: Transaction>(
tx: &mut DatabaseTransaction<T>,
entity: &Entity,
old_entity: Option<&Entity>,
) -> Result<(), VectorIndexError> {
let mut index_operations: Vec<(String, Option<Embedding>, Option<Embedding>)> = Vec::new();
{
let storage = tx.storage_ref()?;
for label in &entity.labels {
let indexes = HnswRegistry::list_for_table(storage, label.as_str())?;
for entry in indexes {
let old_embedding =
old_entity.and_then(|e| extract_embedding(e, &entry.column).ok().flatten());
let new_embedding = extract_embedding(entity, &entry.column)?;
index_operations.push((entry.name.clone(), old_embedding, new_embedding));
}
}
}
for (index_name, old_embedding, new_embedding) in index_operations {
match (old_embedding, new_embedding) {
(None, Some(embedding)) => {
add_to_index(tx, &index_name, entity.id, embedding)?;
}
(Some(_), None) => {
remove_from_index(tx, &index_name, entity.id)?;
}
(Some(old), Some(new)) if old.as_slice() != new.as_slice() => {
remove_from_index(tx, &index_name, entity.id)?;
add_to_index(tx, &index_name, entity.id, new)?;
}
_ => {
}
}
}
Ok(())
}
pub fn remove_entity_from_indexes<T: Transaction>(
tx: &mut DatabaseTransaction<T>,
entity: &Entity,
) -> Result<(), VectorIndexError> {
let mut indexes_to_update: Vec<String> = Vec::new();
{
let storage = tx.storage_ref()?;
for label in &entity.labels {
let indexes = HnswRegistry::list_for_table(storage, label.as_str())?;
for entry in indexes {
if extract_embedding(entity, &entry.column)?.is_some() {
indexes_to_update.push(entry.name.clone());
}
}
}
}
for index_name in indexes_to_update {
remove_from_index(tx, &index_name, entity.id)?;
}
Ok(())
}
fn add_to_index<T: Transaction>(
tx: &mut DatabaseTransaction<T>,
index_name: &str,
entity_id: EntityId,
embedding: Embedding,
) -> Result<(), VectorIndexError> {
let (mut graph, config) = load_index(tx, index_name)?;
if graph.dimension != embedding.len() && !graph.nodes.is_empty() {
return Err(VectorIndexError::DimensionMismatch {
expected: graph.dimension,
actual: embedding.len(),
});
}
if graph.nodes.is_empty() {
graph.dimension = embedding.len();
}
insert_into_graph(&mut graph, entity_id, embedding, &config)?;
let table_name = hnsw_table_name(index_name);
let storage = tx.storage_mut_ref()?;
save_graph_tx(storage, &table_name, &graph, &config)?;
Ok(())
}
fn remove_from_index<T: Transaction>(
tx: &mut DatabaseTransaction<T>,
index_name: &str,
entity_id: EntityId,
) -> Result<(), VectorIndexError> {
let (mut graph, config) = load_index(tx, index_name)?;
let max_layer = graph.nodes.get(&entity_id).map(|n| n.max_layer);
remove_from_graph(&mut graph, entity_id)?;
let table_name = hnsw_table_name(index_name);
let storage = tx.storage_mut_ref()?;
if let Some(max_layer) = max_layer {
delete_node_tx(storage, &table_name, entity_id, max_layer)?;
}
save_graph_tx(storage, &table_name, &graph, &config)?;
Ok(())
}
fn extract_embedding(entity: &Entity, column: &str) -> Result<Option<Embedding>, VectorIndexError> {
match entity.get_property(column) {
Some(Value::Vector(v)) => {
let embedding = Embedding::new(v.clone()).map_err(|e| {
VectorIndexError::InvalidVector { entity_id: entity.id, reason: e.to_string() }
})?;
Ok(Some(embedding))
}
Some(_) => Ok(None), None => Ok(None), }
}
fn insert_into_graph(
graph: &mut HnswGraph,
entity_id: EntityId,
embedding: Embedding,
config: &HnswConfig,
) -> Result<(), VectorIndexError> {
let level = generate_level(config.ml, config.m);
let node = HnswNode::new(entity_id, embedding.clone(), level);
if graph.nodes.is_empty() {
graph.entry_point = Some(entity_id);
graph.max_layer = level;
graph.nodes.insert(entity_id, node);
return Ok(());
}
let entry_point =
graph.entry_point.ok_or_else(|| VectorIndexError::Storage("no entry point".to_string()))?;
let mut current = entry_point;
for layer in (level + 1..=graph.max_layer).rev() {
current = search_layer_greedy(graph, &embedding, current, layer)?;
}
for layer in (0..=level.min(graph.max_layer)).rev() {
let neighbors =
search_layer_candidates(graph, &embedding, current, layer, config.ef_construction)?;
let m = if layer == 0 { config.m_max0 } else { config.m };
let selected: Vec<EntityId> = neighbors.into_iter().take(m).collect();
graph.nodes.entry(entity_id).or_insert_with(|| node.clone()).connections[layer]
.clone_from(&selected);
for &neighbor_id in &selected {
if let Some(neighbor) = graph.nodes.get_mut(&neighbor_id) {
if layer < neighbor.connections.len() {
let neighbor_m = if layer == 0 { config.m_max0 } else { config.m };
if !neighbor.connections[layer].contains(&entity_id) {
neighbor.connections[layer].push(entity_id);
if neighbor.connections[layer].len() > neighbor_m {
prune_connections(graph, neighbor_id, layer, neighbor_m)?;
}
}
}
}
}
if !selected.is_empty() {
current = selected[0];
}
}
if level > graph.max_layer {
graph.max_layer = level;
graph.entry_point = Some(entity_id);
}
Ok(())
}
fn remove_from_graph(graph: &mut HnswGraph, entity_id: EntityId) -> Result<(), VectorIndexError> {
let max_layer = match graph.nodes.get(&entity_id) {
Some(node) => node.max_layer,
None => return Ok(()), };
for layer in 0..=max_layer {
if let Some(node) = graph.nodes.get(&entity_id) {
let neighbors = node.connections.get(layer).cloned().unwrap_or_default();
for neighbor_id in neighbors {
if let Some(neighbor) = graph.nodes.get_mut(&neighbor_id) {
if layer < neighbor.connections.len() {
neighbor.connections[layer].retain(|&id| id != entity_id);
}
}
}
}
}
graph.nodes.remove(&entity_id);
if graph.entry_point == Some(entity_id) {
graph.entry_point = graph.nodes.iter().max_by_key(|(_, n)| n.max_layer).map(|(&id, _)| id);
graph.max_layer = graph.nodes.values().map(|n| n.max_layer).max().unwrap_or(0);
}
Ok(())
}
fn generate_level(ml: f64, _m: usize) -> usize {
use std::hash::{Hash, Hasher};
let mut hasher = std::collections::hash_map::DefaultHasher::new();
std::time::SystemTime::now().hash(&mut hasher);
let random = hasher.finish();
let r = (random as f64) / (u64::MAX as f64);
(-r.ln() * ml).floor() as usize
}
fn search_layer_greedy(
graph: &HnswGraph,
query: &Embedding,
entry: EntityId,
layer: usize,
) -> Result<EntityId, VectorIndexError> {
let mut current = entry;
let mut current_dist = compute_distance(graph, query, current)?;
loop {
let mut changed = false;
if let Some(node) = graph.nodes.get(¤t) {
if layer < node.connections.len() {
for &neighbor_id in &node.connections[layer] {
let dist = compute_distance(graph, query, neighbor_id)?;
if dist < current_dist {
current = neighbor_id;
current_dist = dist;
changed = true;
}
}
}
}
if !changed {
break;
}
}
Ok(current)
}
fn search_layer_candidates(
graph: &HnswGraph,
query: &Embedding,
entry: EntityId,
layer: usize,
ef: usize,
) -> Result<Vec<EntityId>, VectorIndexError> {
use std::cmp::Reverse;
use std::collections::{BinaryHeap, HashSet};
#[derive(PartialEq)]
struct Candidate {
distance: f32,
id: EntityId,
}
impl Eq for Candidate {}
impl PartialOrd for Candidate {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for Candidate {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.distance.partial_cmp(&other.distance).unwrap_or(std::cmp::Ordering::Equal)
}
}
let entry_dist = compute_distance(graph, query, entry)?;
let mut visited = HashSet::new();
visited.insert(entry);
let mut candidates = BinaryHeap::new();
candidates.push(Reverse(Candidate { distance: entry_dist, id: entry }));
let mut results = BinaryHeap::new();
results.push(Candidate { distance: entry_dist, id: entry });
while let Some(Reverse(Candidate { distance: current_dist, id: current })) = candidates.pop() {
let worst_dist = results.peek().map(|c| c.distance).unwrap_or(f32::MAX);
if current_dist > worst_dist && results.len() >= ef {
break;
}
if let Some(node) = graph.nodes.get(¤t) {
if layer < node.connections.len() {
for &neighbor_id in &node.connections[layer] {
if visited.insert(neighbor_id) {
let dist = compute_distance(graph, query, neighbor_id)?;
if results.len() < ef || dist < worst_dist {
candidates.push(Reverse(Candidate { distance: dist, id: neighbor_id }));
results.push(Candidate { distance: dist, id: neighbor_id });
if results.len() > ef {
results.pop();
}
}
}
}
}
}
}
let mut result_vec: Vec<_> = results.into_iter().map(|c| (c.distance, c.id)).collect();
result_vec.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
Ok(result_vec.into_iter().map(|(_, id)| id).collect())
}
fn prune_connections(
graph: &mut HnswGraph,
node_id: EntityId,
layer: usize,
max_connections: usize,
) -> Result<(), VectorIndexError> {
let node = match graph.nodes.get(&node_id) {
Some(n) => n,
None => return Ok(()),
};
if layer >= node.connections.len() {
return Ok(());
}
let embedding = node.embedding.clone();
let connections = node.connections[layer].clone();
let mut distances: Vec<(f32, EntityId)> = Vec::new();
for &neighbor_id in &connections {
if let Some(neighbor) = graph.nodes.get(&neighbor_id) {
let dist =
compute_distance_embeddings(&embedding, &neighbor.embedding, graph.distance_metric);
distances.push((dist, neighbor_id));
}
}
distances.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
let new_connections: Vec<EntityId> =
distances.into_iter().take(max_connections).map(|(_, id)| id).collect();
if let Some(node) = graph.nodes.get_mut(&node_id) {
if layer < node.connections.len() {
node.connections[layer] = new_connections;
}
}
Ok(())
}
fn compute_distance(
graph: &HnswGraph,
query: &Embedding,
node_id: EntityId,
) -> Result<f32, VectorIndexError> {
let node = graph
.nodes
.get(&node_id)
.ok_or_else(|| VectorIndexError::Storage(format!("node not found: {:?}", node_id)))?;
Ok(compute_distance_embeddings(query, &node.embedding, graph.distance_metric))
}
fn compute_distance_embeddings(a: &Embedding, b: &Embedding, metric: DistanceMetric) -> f32 {
let a = a.as_slice();
let b = b.as_slice();
match metric {
DistanceMetric::Euclidean => {
a.iter().zip(b.iter()).map(|(x, y)| (x - y).powi(2)).sum::<f32>().sqrt()
}
DistanceMetric::Cosine => {
let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
let norm_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
let norm_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
if norm_a == 0.0 || norm_b == 0.0 {
f32::MAX
} else {
1.0 - (dot / (norm_a * norm_b))
}
}
DistanceMetric::DotProduct => -a.iter().zip(b.iter()).map(|(x, y)| x * y).sum::<f32>(),
DistanceMetric::Manhattan => a.iter().zip(b.iter()).map(|(x, y)| (x - y).abs()).sum(),
DistanceMetric::Chebyshev => {
a.iter().zip(b.iter()).map(|(x, y)| (x - y).abs()).fold(0.0f32, f32::max)
}
}
}
pub fn create_indexes_for_collection<T: Transaction>(
tx: &mut DatabaseTransaction<T>,
collection_name: &str,
vectors: impl IntoIterator<Item = (String, VectorConfig)>,
) -> Result<Vec<String>, VectorIndexError> {
use crate::collection::IndexMethod;
let mut created = Vec::new();
for (vector_name, config) in vectors {
if let Some(dimension) = config.dimension() {
if let IndexMethod::Hnsw(hnsw_params) = &config.index.method {
let distance_metric = match &config.distance {
crate::collection::DistanceType::Dense(m) => *m,
_ => continue, };
let hnsw_config = HnswConfig {
m: hnsw_params.m,
m_max0: hnsw_params.m_max0,
ef_construction: hnsw_params.ef_construction,
ef_search: hnsw_params.ef_search,
..HnswConfig::default()
};
let index_name = create_index_for_named_vector(
tx,
collection_name,
&vector_name,
dimension,
distance_metric,
&hnsw_config,
)?;
created.push(index_name);
}
}
}
Ok(created)
}
pub fn create_index_for_named_vector<T: Transaction>(
tx: &mut DatabaseTransaction<T>,
collection_name: &str,
vector_name: &str,
dimension: usize,
distance_metric: DistanceMetric,
config: &HnswConfig,
) -> Result<String, VectorIndexError> {
let index_name = HnswRegistry::index_name_for_vector(collection_name, vector_name);
let storage = tx.storage_ref()?;
if HnswRegistry::exists(storage, &index_name)? {
return Err(VectorIndexError::IndexExists(index_name));
}
let graph = HnswGraph::new(dimension, distance_metric);
let table_name = hnsw_table_name(&index_name);
let storage = tx.storage_mut_ref()?;
save_graph_tx(storage, &table_name, &graph, config)?;
let entry = HnswIndexEntry::for_named_vector(
collection_name,
vector_name,
dimension,
distance_metric,
config,
);
HnswRegistry::register(storage, &entry)?;
Ok(index_name)
}
pub fn drop_indexes_for_collection<T: Transaction>(
tx: &mut DatabaseTransaction<T>,
collection_name: &str,
) -> Result<Vec<String>, VectorIndexError> {
let mut dropped = Vec::new();
let entries = {
let storage = tx.storage_ref()?;
HnswRegistry::list_for_collection(storage, collection_name)?
};
for entry in entries {
drop_index(tx, &entry.name, true)?;
dropped.push(entry.name);
}
Ok(dropped)
}
pub fn find_index_for_named_vector<T: Transaction>(
tx: &DatabaseTransaction<T>,
collection_name: &str,
vector_name: &str,
) -> Result<Option<String>, VectorIndexError> {
let storage = tx.storage_ref()?;
if let Some(entry) = HnswRegistry::get_for_named_vector(storage, collection_name, vector_name)?
{
return Ok(Some(entry.name.clone()));
}
Ok(None)
}
pub fn has_index_for_named_vector<T: Transaction>(
tx: &DatabaseTransaction<T>,
collection_name: &str,
vector_name: &str,
) -> Result<bool, VectorIndexError> {
let storage = tx.storage_ref()?;
Ok(HnswRegistry::exists_for_named_vector(storage, collection_name, vector_name)?)
}
pub fn list_indexes_for_collection<T: Transaction>(
tx: &DatabaseTransaction<T>,
collection_name: &str,
) -> Result<Vec<HnswIndexEntry>, VectorIndexError> {
let storage = tx.storage_ref()?;
Ok(HnswRegistry::list_for_collection(storage, collection_name)?)
}
pub fn update_point_vector_in_index<T: Transaction>(
tx: &mut DatabaseTransaction<T>,
collection_name: &str,
vector_name: &str,
point_id: PointId,
vector_data: &[f32],
) -> Result<(), VectorIndexError> {
let index_name = HnswRegistry::index_name_for_vector(collection_name, vector_name);
let storage = tx.storage_ref()?;
if !HnswRegistry::exists(storage, &index_name)? {
return Ok(());
}
let embedding = Embedding::new(vector_data.to_vec())?;
let entity_id = EntityId::new(point_id.as_u64());
remove_from_index(tx, &index_name, entity_id)?;
add_to_index(tx, &index_name, entity_id, embedding)?;
Ok(())
}
pub fn remove_point_vector_from_index<T: Transaction>(
tx: &mut DatabaseTransaction<T>,
collection_name: &str,
vector_name: &str,
point_id: PointId,
) -> Result<(), VectorIndexError> {
let index_name = HnswRegistry::index_name_for_vector(collection_name, vector_name);
let storage = tx.storage_ref()?;
if !HnswRegistry::exists(storage, &index_name)? {
return Ok(());
}
let entity_id = EntityId::new(point_id.as_u64());
remove_from_index(tx, &index_name, entity_id)?;
Ok(())
}
pub fn remove_point_from_collection_indexes<T: Transaction>(
tx: &mut DatabaseTransaction<T>,
collection_name: &str,
point_id: PointId,
) -> Result<(), VectorIndexError> {
let entries = {
let storage = tx.storage_ref()?;
HnswRegistry::list_for_collection(storage, collection_name)?
};
let entity_id = EntityId::new(point_id.as_u64());
for entry in entries {
remove_from_index(tx, &entry.name, entity_id)?;
}
Ok(())
}
pub fn search_named_vector_index<T: Transaction>(
tx: &DatabaseTransaction<T>,
collection_name: &str,
vector_name: &str,
query: &Embedding,
k: usize,
ef_search: Option<usize>,
) -> Result<Vec<SearchResult>, VectorIndexError> {
let index_name = HnswRegistry::index_name_for_vector(collection_name, vector_name);
search_index(tx, &index_name, query, k, ef_search)
}
pub fn search_named_vector_index_filtered<T, F>(
tx: &DatabaseTransaction<T>,
collection_name: &str,
vector_name: &str,
query: &Embedding,
k: usize,
predicate: F,
ef_search: Option<usize>,
filter_config: Option<FilteredSearchConfig>,
) -> Result<Vec<SearchResult>, VectorIndexError>
where
T: Transaction,
F: Fn(EntityId) -> bool,
{
let index_name = HnswRegistry::index_name_for_vector(collection_name, vector_name);
search_index_filtered(tx, &index_name, query, k, predicate, ef_search, filter_config)
}
use manifoldb_core::CollectionId;
use manifoldb_query::exec::CollectionVectorProvider;
use manifoldb_storage::StorageEngine;
use manifoldb_vector::error::VectorError;
use manifoldb_vector::index::VectorIndexCoordinator;
use manifoldb_vector::types::VectorData;
pub struct CollectionVectorAdapter<E: StorageEngine> {
coordinator: VectorIndexCoordinator<E>,
}
impl<E: StorageEngine> CollectionVectorAdapter<E> {
pub fn new(coordinator: VectorIndexCoordinator<E>) -> Self {
Self { coordinator }
}
pub fn coordinator(&self) -> &VectorIndexCoordinator<E> {
&self.coordinator
}
pub fn coordinator_mut(&mut self) -> &mut VectorIndexCoordinator<E> {
&mut self.coordinator
}
pub fn into_coordinator(self) -> VectorIndexCoordinator<E> {
self.coordinator
}
}
impl<E: StorageEngine + Send + Sync + 'static> CollectionVectorProvider
for CollectionVectorAdapter<E>
{
fn upsert_vector(
&self,
collection_id: CollectionId,
entity_id: EntityId,
collection_name: &str,
vector_name: &str,
data: &VectorData,
) -> Result<(), VectorError> {
self.coordinator.upsert_vector(collection_id, entity_id, collection_name, vector_name, data)
}
fn delete_vector(
&self,
collection_id: CollectionId,
entity_id: EntityId,
collection_name: &str,
vector_name: &str,
) -> Result<bool, VectorError> {
self.coordinator.delete_vector(collection_id, entity_id, collection_name, vector_name)
}
fn delete_entity_vectors(
&self,
collection_id: CollectionId,
entity_id: EntityId,
collection_name: &str,
) -> Result<usize, VectorError> {
self.coordinator.delete_entity_vectors(collection_id, entity_id, collection_name)
}
fn get_vector(
&self,
collection_id: CollectionId,
entity_id: EntityId,
vector_name: &str,
) -> Result<Option<VectorData>, VectorError> {
self.coordinator.get_vector(collection_id, entity_id, vector_name)
}
fn get_all_vectors(
&self,
collection_id: CollectionId,
entity_id: EntityId,
) -> Result<std::collections::HashMap<String, VectorData>, VectorError> {
self.coordinator.get_all_vectors(collection_id, entity_id)
}
fn search(
&self,
collection_name: &str,
vector_name: &str,
query: &Embedding,
k: usize,
ef_search: Option<usize>,
) -> Result<Vec<SearchResult>, VectorError> {
self.coordinator.search(collection_name, vector_name, query, k, ef_search)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::collection::{CollectionManager, CollectionName};
use crate::transaction::TransactionManager;
use manifoldb_storage::backends::RedbEngine;
use manifoldb_vector::{
encode_vector_value, encoding::encode_collection_vector_key, VectorData,
TABLE_COLLECTION_VECTORS,
};
fn store_vector_for_test<T: Transaction>(
tx: &mut DatabaseTransaction<T>,
collection_name: &str,
entity_id: EntityId,
vector_name: &str,
data: &[f32],
) {
let coll_name = CollectionName::new(collection_name).unwrap();
let collection_id = match CollectionManager::get(tx, &coll_name).unwrap() {
Some(collection) => collection.id(),
None => {
let collection =
CollectionManager::create(tx, &coll_name, std::iter::empty()).unwrap();
collection.id()
}
};
let key = encode_collection_vector_key(collection_id, entity_id, vector_name);
let value = encode_vector_value(&VectorData::Dense(data.to_vec()), vector_name);
let storage = tx.storage_mut().unwrap();
storage.put(TABLE_COLLECTION_VECTORS, &key, &value).unwrap();
}
#[test]
fn test_hnsw_index_builder() {
let engine = RedbEngine::in_memory().unwrap();
let manager = TransactionManager::new(engine);
{
let mut tx = manager.begin_write().unwrap();
let entity1 = tx.create_entity().unwrap().with_label("documents");
tx.put_entity(&entity1).unwrap();
store_vector_for_test(
&mut tx,
"documents",
entity1.id,
"embedding",
&[0.1f32, 0.2, 0.3, 0.4],
);
let entity2 = tx.create_entity().unwrap().with_label("documents");
tx.put_entity(&entity2).unwrap();
store_vector_for_test(
&mut tx,
"documents",
entity2.id,
"embedding",
&[0.2f32, 0.3, 0.4, 0.5],
);
let entity3 = tx.create_entity().unwrap().with_label("documents");
tx.put_entity(&entity3).unwrap();
store_vector_for_test(
&mut tx,
"documents",
entity3.id,
"embedding",
&[0.3f32, 0.4, 0.5, 0.6],
);
tx.commit().unwrap();
}
{
let mut tx = manager.begin_write().unwrap();
HnswIndexBuilder::new("test_idx", "documents", "embedding")
.dimension(4)
.distance_metric(DistanceMetric::Cosine)
.m(4)
.ef_construction(16)
.build(&mut tx)
.unwrap();
tx.commit().unwrap();
}
{
let tx = manager.begin_read().unwrap();
let (graph, config) = load_index(&tx, "test_idx").unwrap();
assert_eq!(graph.dimension, 4);
assert_eq!(graph.nodes.len(), 3);
assert_eq!(config.m, 4);
}
}
#[test]
fn test_drop_index() {
let engine = RedbEngine::in_memory().unwrap();
let manager = TransactionManager::new(engine);
{
let mut tx = manager.begin_write().unwrap();
HnswIndexBuilder::new("to_drop", "test", "vec").dimension(4).build(&mut tx).unwrap();
tx.commit().unwrap();
}
{
let tx = manager.begin_read().unwrap();
assert!(load_index(&tx, "to_drop").is_ok());
}
{
let mut tx = manager.begin_write().unwrap();
assert!(drop_index(&mut tx, "to_drop", false).unwrap());
tx.commit().unwrap();
}
{
let tx = manager.begin_read().unwrap();
assert!(load_index(&tx, "to_drop").is_err());
}
}
#[test]
fn test_update_single_entity_hnsw() {
let engine = RedbEngine::in_memory().unwrap();
let manager = TransactionManager::new(engine);
let entity_id;
{
let mut tx = manager.begin_write().unwrap();
let entity = tx.create_entity().unwrap().with_label("docs");
entity_id = entity.id;
tx.put_entity(&entity).unwrap();
store_vector_for_test(
&mut tx,
"docs",
entity_id,
"embedding",
&[1.0f32, 0.0, 0.0, 0.0],
);
tx.commit().unwrap();
}
{
let mut tx = manager.begin_write().unwrap();
HnswIndexBuilder::new("single_entity_idx", "docs", "embedding")
.dimension(4)
.distance_metric(DistanceMetric::Cosine)
.m(4)
.ef_construction(16)
.build(&mut tx)
.unwrap();
tx.commit().unwrap();
}
{
let tx = manager.begin_read().unwrap();
let (graph, _) = load_index(&tx, "single_entity_idx").unwrap();
assert_eq!(graph.nodes.len(), 1);
assert_eq!(graph.entry_point, Some(entity_id));
}
{
let mut tx = manager.begin_write().unwrap();
remove_from_index(&mut tx, "single_entity_idx", entity_id).unwrap();
let new_embedding = Embedding::new(vec![0.0f32, 0.0, 0.0, 1.0]).unwrap();
add_to_index(&mut tx, "single_entity_idx", entity_id, new_embedding).unwrap();
tx.commit().unwrap();
}
{
let tx = manager.begin_read().unwrap();
let (graph, _) = load_index(&tx, "single_entity_idx").unwrap();
assert_eq!(graph.nodes.len(), 1);
assert_eq!(graph.entry_point, Some(entity_id));
let node = graph.nodes.get(&entity_id).unwrap();
assert_eq!(node.embedding.as_slice(), &[0.0f32, 0.0, 0.0, 1.0]);
}
{
let tx = manager.begin_read().unwrap();
let query = Embedding::new(vec![0.0f32, 0.0, 0.0, 1.0]).unwrap();
let results = search_index(&tx, "single_entity_idx", &query, 1, None).unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].entity_id, entity_id);
assert!(results[0].distance < 0.0001);
}
}
}