use crate::core::error::{Error, Result, VectorError};
use crate::core::id::NodeId;
use crate::core::property::MAX_VECTOR_DIMENSIONS;
use crate::core::vector::validate_vector;
use crate::index::vector::{DistanceMetric, Quantization, StorageMode, VectorIndex};
use dashmap::DashMap;
use parking_lot::{Mutex, RwLock};
use std::fs::File;
use std::io::BufWriter;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use usearch::{Index, IndexOptions, MetricKind, ScalarKind, ffi::Matches};
pub mod config;
pub mod persistence;
pub mod stats;
#[cfg(test)]
mod tests;
pub use config::{HnswConfig, HnswIndexBuilder};
use persistence::{load_mappings_with_integrity, verify_index_header, write_mappings_to_writer};
use stats::{IndexStats, MAX_SEARCH_ATTEMPTS};
std::thread_local! {
pub(crate) static IN_FILTER_CALLBACK: std::cell::Cell<bool> = const { std::cell::Cell::new(false) };
}
#[cfg(test)]
type TestRaceHook = fn(&HnswIndex, NodeId);
#[cfg(test)]
std::thread_local! {
pub(crate) static TEST_RACE_HOOK: std::cell::Cell<Option<TestRaceHook>> = const { std::cell::Cell::new(None) };
}
#[cfg(test)]
pub(crate) static TEST_SKIP_CAPACITY_CHECK: std::sync::atomic::AtomicBool =
std::sync::atomic::AtomicBool::new(false);
pub(crate) struct FilterCallbackGuard {
prev: bool,
}
impl FilterCallbackGuard {
pub(crate) fn new() -> Self {
let prev = IN_FILTER_CALLBACK.with(|flag| flag.replace(true));
FilterCallbackGuard { prev }
}
}
impl Drop for FilterCallbackGuard {
fn drop(&mut self) {
IN_FILTER_CALLBACK.with(|flag| flag.set(self.prev));
}
}
const MAX_K: usize = 100_000;
const NUM_ENTRY_LOCKS: usize = 64;
fn to_usearch_metric(metric: DistanceMetric) -> MetricKind {
match metric {
DistanceMetric::Cosine => MetricKind::Cos,
DistanceMetric::Euclidean => MetricKind::L2sq,
DistanceMetric::DotProduct => MetricKind::IP,
DistanceMetric::Haversine => MetricKind::Haversine,
DistanceMetric::Hamming => MetricKind::Hamming,
DistanceMetric::Tanimoto => MetricKind::Tanimoto,
}
}
fn to_usearch_index_metric(metric: DistanceMetric) -> MetricKind {
match metric {
DistanceMetric::Tanimoto => MetricKind::Cos,
_ => to_usearch_metric(metric),
}
}
fn to_usearch_scalar(quantization: Quantization) -> ScalarKind {
match quantization {
Quantization::F32 => ScalarKind::F32,
Quantization::F16 => ScalarKind::F16,
Quantization::I8 => ScalarKind::I8,
}
}
fn validate_metric_quantization(config: &HnswConfig) -> Result<()> {
if config.metric == DistanceMetric::Tanimoto && config.quantization != Quantization::F32 {
return Err(Error::Vector(VectorError::InvalidVector {
reason: format!(
"Tanimoto metric requires F32 quantization because it uses an F32 callback \
distance (requested {:?})",
config.quantization
),
}));
}
Ok(())
}
fn tanimoto_distance(a: &[f32], b: &[f32]) -> f32 {
let mut dot = 0.0f64;
let mut norm_a = 0.0f64;
let mut norm_b = 0.0f64;
for (&left, &right) in a.iter().zip(b.iter()) {
let left = f64::from(left);
let right = f64::from(right);
dot += left * right;
norm_a += left * left;
norm_b += right * right;
}
let denominator = norm_a + norm_b - dot;
if denominator <= 0.0 {
return 0.0;
}
let similarity = dot / denominator;
if similarity.is_finite() {
(1.0 - similarity.clamp(0.0, 1.0)) as f32
} else {
f32::MAX
}
}
fn install_runtime_metric(index: &mut Index, config: &HnswConfig) {
if let Some(ref custom) = config.custom_metric {
let dims = config.dimensions;
let distance_fn = Arc::clone(&custom.distance_fn);
let metric_wrapper = create_metric_wrapper(dims, distance_fn);
index.change_metric(metric_wrapper);
} else if config.metric == DistanceMetric::Tanimoto {
let metric_wrapper = create_metric_wrapper(config.dimensions, Arc::new(tanimoto_distance));
index.change_metric(metric_wrapper);
}
}
pub(crate) fn is_retryable_usearch_error(error_msg: &str) -> bool {
error_msg.contains("No available threads to lock")
}
pub(crate) fn create_metric_wrapper<F>(
dims: usize,
distance_fn: Arc<F>,
) -> Box<dyn Fn(*const f32, *const f32) -> f32 + Send + Sync>
where
F: Fn(&[f32], &[f32]) -> f32 + Send + Sync + 'static + ?Sized,
{
Box::new(move |a: *const f32, b: *const f32| {
if a.is_null() || b.is_null() {
eprintln!("usearch passed null pointer to metric function - returning max distance");
return f32::MAX;
}
let align_mask = std::mem::align_of::<f32>() - 1;
if (a as usize) & align_mask != 0 || (b as usize) & align_mask != 0 {
eprintln!(
"usearch passed unaligned pointer to metric function (expected alignment {}) - returning max distance",
std::mem::align_of::<f32>()
);
return f32::MAX;
}
let slice_a = unsafe { std::slice::from_raw_parts(a, dims) };
let slice_b = unsafe { std::slice::from_raw_parts(b, dims) };
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
distance_fn(slice_a, slice_b)
}));
match result {
Ok(val) => val,
Err(_) => {
eprintln!(
"Panic in custom metric function - returning max distance to avoid FFI UB"
);
f32::MAX
}
}
})
}
pub struct HnswIndex {
pub(crate) inner: Arc<RwLock<Index>>,
pub(crate) config: HnswConfig,
pub(crate) id_mapping: Arc<DashMap<NodeId, u64>>,
pub(crate) reverse_mapping: Arc<DashMap<u64, NodeId>>,
pub(crate) next_key: AtomicU64,
pub(crate) stats: Arc<IndexStats>,
max_k: usize,
is_mmap: bool,
save_lock: Arc<RwLock<()>>,
entry_locks: Vec<Mutex<()>>,
}
unsafe impl Send for HnswIndex {}
unsafe impl Sync for HnswIndex {}
impl std::fmt::Debug for HnswIndex {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HnswIndex")
.field("config", &self.config)
.field("is_mmap", &self.is_mmap)
.field("stats", &self.stats)
.finish_non_exhaustive()
}
}
impl HnswIndex {
pub(crate) fn new_internal(config: HnswConfig) -> Result<Self> {
if config.dimensions == 0 {
return Err(Error::Vector(VectorError::InvalidVector {
reason: "dimensions must be > 0".to_string(),
}));
}
if config.dimensions > MAX_VECTOR_DIMENSIONS {
return Err(Error::Vector(VectorError::InvalidVector {
reason: format!(
"dimensions {} exceeds maximum allowed {}",
config.dimensions, MAX_VECTOR_DIMENSIONS
),
}));
}
if config.m == 0 || config.m > 64 {
return Err(Error::Vector(VectorError::InvalidVector {
reason: format!("M must be in range [1, 64], got {}", config.m),
}));
}
if config.ef_construction < 10 || config.ef_construction > 4096 {
return Err(Error::Vector(VectorError::InvalidVector {
reason: format!(
"ef_construction must be in range [10, 4096], got {}",
config.ef_construction
),
}));
}
if config.ef_search < 1 || config.ef_search > 4096 {
return Err(Error::Vector(VectorError::InvalidVector {
reason: format!(
"ef_search must be in range [1, 4096], got {}",
config.ef_search
),
}));
}
if config.custom_metric.is_some() && config.quantization != Quantization::F32 {
return Err(Error::Vector(VectorError::InvalidVector {
reason: format!(
"Custom metrics are only supported with F32 quantization (requested {:?}). \
Using other quantization levels with custom metrics causes memory safety issues.",
config.quantization
),
}));
}
validate_metric_quantization(&config)?;
let options = IndexOptions {
dimensions: config.dimensions,
metric: to_usearch_index_metric(config.metric),
quantization: to_usearch_scalar(config.quantization),
connectivity: config.m,
expansion_add: config.ef_construction,
expansion_search: config.ef_search,
multi: false,
};
let mut index = Index::new(&options).map_err(|e| {
Error::Vector(VectorError::IndexError(format!(
"Failed to create usearch index: {}",
e
)))
})?;
let capacity_to_reserve = if config.capacity > 0 {
config.capacity
} else {
1024
};
index.reserve(capacity_to_reserve).map_err(|e| {
Error::Vector(VectorError::IndexError(format!(
"Failed to reserve capacity: {}",
e
)))
})?;
install_runtime_metric(&mut index, &config);
if let StorageMode::MemoryMapped { ref path } = config.storage {
index
.save(path.to_str().ok_or_else(|| {
Error::Vector(VectorError::IndexError(
"Path contains invalid UTF-8".to_string(),
))
})?)
.map_err(|e| {
Error::Vector(VectorError::IndexError(format!(
"Failed to create memory-mapped index: {}",
e
)))
})?;
index
.view(path.to_str().ok_or_else(|| {
Error::Vector(VectorError::IndexError(
"Path contains invalid UTF-8".to_string(),
))
})?)
.map_err(|e| {
Error::Vector(VectorError::IndexError(format!(
"Failed to memory-map index: {}",
e
)))
})?;
}
Ok(HnswIndex {
inner: Arc::new(RwLock::new(index)),
config,
id_mapping: Arc::new(DashMap::new()),
reverse_mapping: Arc::new(DashMap::new()),
next_key: AtomicU64::new(0),
stats: Arc::new(IndexStats::default()),
max_k: MAX_K,
is_mmap: false,
save_lock: Arc::new(RwLock::new(())),
entry_locks: (0..NUM_ENTRY_LOCKS).map(|_| Mutex::new(())).collect(),
})
}
pub fn new(config: HnswConfig) -> Result<Self> {
HnswIndexBuilder::from_config(&config).build()
}
pub fn set_ef_search(&self, ef_search: usize) {
let index = self.inner.read();
index.change_expansion_search(ef_search);
}
pub fn get_ef_search(&self) -> usize {
self.inner.read().expansion_search()
}
pub fn config(&self) -> HnswConfig {
self.config.clone()
}
pub fn m(&self) -> usize {
self.config.m
}
pub(crate) fn get_id_mappings(&self) -> Vec<(u64, u64)> {
self.id_mapping
.iter()
.map(|entry| (entry.key().as_u64(), *entry.value()))
.collect()
}
pub(crate) fn restore_mapping(&self, node_id: crate::core::id::NodeId, usearch_key: u64) {
self.id_mapping.insert(node_id, usearch_key);
self.reverse_mapping.insert(usearch_key, node_id);
self.next_key.fetch_max(usearch_key + 1, Ordering::SeqCst);
}
pub fn load(path: &Path, config: HnswConfig) -> Result<Self> {
if config.dimensions > MAX_VECTOR_DIMENSIONS {
return Err(Error::Vector(VectorError::InvalidVector {
reason: format!(
"dimensions {} exceeds maximum allowed {}",
config.dimensions, MAX_VECTOR_DIMENSIONS
),
}));
}
if config.custom_metric.is_some() && config.quantization != Quantization::F32 {
return Err(Error::Vector(VectorError::InvalidVector {
reason: format!(
"Custom metrics are only supported with F32 quantization (requested {:?}). \
Using other quantization levels with custom metrics causes memory safety issues.",
config.quantization
),
}));
}
validate_metric_quantization(&config)?;
verify_index_header(path, config.dimensions, config.quantization)?;
let options = IndexOptions {
dimensions: config.dimensions,
metric: to_usearch_index_metric(config.metric),
quantization: to_usearch_scalar(config.quantization),
connectivity: config.m,
expansion_add: config.ef_construction,
expansion_search: config.ef_search,
multi: false,
};
let mut index = Index::new(&options).map_err(|e| {
Error::Vector(VectorError::IndexError(format!(
"Failed to create index for loading: {}",
e
)))
})?;
index
.load(path.to_str().ok_or_else(|| {
Error::Vector(VectorError::IndexError(
"Path contains invalid UTF-8".to_string(),
))
})?)
.map_err(|e| {
Error::Vector(VectorError::IndexError(format!(
"Failed to load index: {}",
e
)))
})?;
if index.dimensions() != config.dimensions {
return Err(Error::Vector(VectorError::IndexError(format!(
"Index dimension mismatch: usearch index has {}, config has {}",
index.dimensions(),
config.dimensions
))));
}
install_runtime_metric(&mut index, &config);
let mappings_path = path.with_extension("usearch.mappings");
let (id_mapping, reverse_mapping, max_key, metadata) =
load_mappings_with_integrity(&mappings_path)?;
persistence::validate_metadata(metadata, &config)?;
Ok(HnswIndex {
inner: Arc::new(RwLock::new(index)),
config,
id_mapping: Arc::new(id_mapping),
reverse_mapping: Arc::new(reverse_mapping),
next_key: AtomicU64::new(max_key + 1),
stats: Arc::new(IndexStats::default()),
max_k: MAX_K,
is_mmap: false,
save_lock: Arc::new(RwLock::new(())),
entry_locks: (0..NUM_ENTRY_LOCKS).map(|_| Mutex::new(())).collect(),
})
}
pub fn open_mmap(path: &Path) -> Result<Self> {
let mut index = Index::new(&IndexOptions::default()).map_err(|e| {
Error::Vector(VectorError::IndexError(format!(
"Failed to create index: {}",
e
)))
})?;
index
.view(path.to_str().ok_or_else(|| {
Error::Vector(VectorError::IndexError(
"Path contains invalid UTF-8".to_string(),
))
})?)
.map_err(|e| {
Error::Vector(VectorError::IndexError(format!(
"Failed to memory-map index: {}",
e
)))
})?;
let dimensions = index.dimensions();
let connectivity = index.connectivity();
if dimensions > MAX_VECTOR_DIMENSIONS {
return Err(Error::Vector(VectorError::InvalidVector {
reason: format!(
"Memory-mapped index dimensions {} exceeds maximum allowed {}",
dimensions, MAX_VECTOR_DIMENSIONS
),
}));
}
let mappings_path = path.with_extension("usearch.mappings");
let (id_mapping, reverse_mapping, max_key, metadata) =
load_mappings_with_integrity(&mappings_path)?;
let (quantization, metric) = if let Some(meta) = metadata {
if meta.dimensions != dimensions {
return Err(Error::Vector(VectorError::IndexError(format!(
"Index dimension mismatch: usearch reported {}, metadata says {}",
dimensions, meta.dimensions
))));
}
(meta.quantization, meta.metric)
} else {
(Quantization::default(), DistanceMetric::Cosine)
};
verify_index_header(path, dimensions, quantization)?;
let config = HnswConfig {
dimensions,
m: connectivity,
quantization,
metric,
storage: StorageMode::MemoryMapped {
path: path.to_path_buf(),
},
..Default::default()
};
validate_metric_quantization(&config)?;
install_runtime_metric(&mut index, &config);
Ok(HnswIndex {
inner: Arc::new(RwLock::new(index)),
config,
id_mapping: Arc::new(id_mapping),
reverse_mapping: Arc::new(reverse_mapping),
next_key: AtomicU64::new(max_key + 1),
stats: Arc::new(IndexStats::default()),
max_k: MAX_K,
is_mmap: true,
save_lock: Arc::new(RwLock::new(())),
entry_locks: (0..NUM_ENTRY_LOCKS).map(|_| Mutex::new(())).collect(),
})
}
pub(crate) fn maybe_block_in_place<F, R>(&self, f: F) -> R
where
F: FnOnce() -> R,
{
#[cfg(any(feature = "tokio", feature = "embeddings"))]
if let Ok(handle) = tokio::runtime::Handle::try_current() {
#[allow(clippy::collapsible_if)]
if handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread {
return tokio::task::block_in_place(f);
}
}
f()
}
pub(crate) fn retry_usearch<F, T, E>(&self, mut op: F, context: &str) -> Result<T>
where
F: FnMut() -> std::result::Result<T, E>,
E: std::fmt::Display,
{
for attempt in 0..MAX_SEARCH_ATTEMPTS {
match op() {
Ok(val) => return Ok(val),
Err(e) => {
let error_msg = e.to_string();
if is_retryable_usearch_error(&error_msg) && attempt + 1 < MAX_SEARCH_ATTEMPTS {
self.stats.search_retries.fetch_add(1, Ordering::Relaxed);
let delay_ms = 1u64 << attempt;
std::thread::sleep(std::time::Duration::from_millis(delay_ms));
continue;
}
if attempt > 0 {
self.stats
.search_retry_failures
.fetch_add(1, Ordering::Relaxed);
}
return Err(Error::Vector(VectorError::IndexError(format!(
"{}: {}",
context, e
))));
}
}
}
unreachable!("Retry loop should always return")
}
pub(crate) fn check_and_expand_capacity(&self, vectors_to_add: usize) -> Result<()> {
#[cfg(test)]
if TEST_SKIP_CAPACITY_CHECK.load(Ordering::Relaxed) {
return Ok(());
}
const CAPACITY_PADDING: usize = 1024;
let index = self.inner.read();
if index.size() + vectors_to_add + CAPACITY_PADDING <= index.capacity() {
return Ok(());
}
drop(index);
let index = self.inner.write();
if index.size() + vectors_to_add + CAPACITY_PADDING <= index.capacity() {
return Ok(());
}
let new_capacity = (index.capacity() * 2).max(1024);
self.retry_usearch(|| index.reserve(new_capacity), "Failed to expand capacity")?;
Ok(())
}
pub(crate) fn save_internal(&self, path: &Path) -> Result<()> {
let _save_guard = self.save_lock.write();
let index = self.inner.read();
let mappings: Vec<(NodeId, u64)> = self
.id_mapping
.iter()
.map(|e| (*e.key(), *e.value()))
.collect();
let count = mappings.len();
index
.save(path.to_str().ok_or_else(|| {
Error::Vector(VectorError::IndexError(
"Path contains invalid UTF-8".to_string(),
))
})?)
.map_err(|e| {
Error::Vector(VectorError::IndexError(format!(
"Failed to save index: {}",
e
)))
})?;
drop(index);
drop(_save_guard);
let mappings_path = path.with_extension("usearch.mappings");
let file = File::create(&mappings_path).map_err(|e| {
Error::Vector(VectorError::IndexError(format!(
"Failed to create mappings file: {}",
e
)))
})?;
let mut writer = BufWriter::new(file);
write_mappings_to_writer(&mut writer, mappings.into_iter(), count, &self.config)
}
pub(crate) fn convert_matches(&self, matches: Matches) -> Vec<(NodeId, f32)> {
self.convert_matches_internal(matches, usize::MAX, |_| true)
}
fn convert_matches_internal<F>(
&self,
matches: Matches,
limit: usize,
predicate: F,
) -> Vec<(NodeId, f32)>
where
F: Fn(NodeId) -> bool,
{
let count = matches.keys.len();
let mut results: Vec<(NodeId, f32)> = Vec::with_capacity(count.min(limit));
match self.config.metric {
DistanceMetric::Cosine => {
for (key, distance) in matches.keys.iter().zip(matches.distances.iter()) {
let node_id = if let Some(node_id_ref) = self.reverse_mapping.get(key) {
*node_id_ref.value()
} else {
continue;
};
if predicate(node_id) {
let sim = 1.0 - distance;
let val = if sim.is_nan() {
0.0
} else {
sim.clamp(-1.0, 1.0)
};
results.push((node_id, val));
if results.len() >= limit {
break;
}
}
}
}
DistanceMetric::Euclidean | DistanceMetric::Haversine | DistanceMetric::Hamming => {
for (key, distance) in matches.keys.iter().zip(matches.distances.iter()) {
let node_id = if let Some(node_id_ref) = self.reverse_mapping.get(key) {
*node_id_ref.value()
} else {
continue;
};
if predicate(node_id) {
results.push((node_id, -distance));
if results.len() >= limit {
break;
}
}
}
}
DistanceMetric::DotProduct | DistanceMetric::Tanimoto => {
for (key, distance) in matches.keys.iter().zip(matches.distances.iter()) {
let node_id = if let Some(node_id_ref) = self.reverse_mapping.get(key) {
*node_id_ref.value()
} else {
continue;
};
if predicate(node_id) {
results.push((node_id, 1.0 - distance));
if results.len() >= limit {
break;
}
}
}
}
}
results
}
pub fn len_mappings(&self) -> usize {
self.id_mapping.len()
}
}
impl VectorIndex for HnswIndex {
fn add(&self, id: NodeId, vector: &[f32]) -> Result<()> {
if IN_FILTER_CALLBACK.with(|flag| flag.get()) {
return Err(Error::Vector(VectorError::IndexError(
"Cannot modify index from within a search_with_filter callback. \
This would cause a deadlock due to lock re-entrancy. \
Consider collecting modifications and applying them after the search completes."
.to_string(),
)));
}
if self.is_mmap {
return Err(Error::Vector(VectorError::IndexError(
"Cannot modify memory-mapped index (read-only)".to_string(),
)));
}
validate_vector(vector)?;
if vector.len() != self.config.dimensions {
return Err(Error::Vector(VectorError::DimensionMismatch {
expected: self.config.dimensions,
actual: vector.len(),
}));
}
self.maybe_block_in_place(|| {
self.check_and_expand_capacity(1)?;
let lock_idx = (id.as_u64() as usize) % self.entry_locks.len();
let _key_guard = self.entry_locks[lock_idx].lock();
let _save_guard = self.save_lock.read();
match self.id_mapping.entry(id) {
dashmap::mapref::entry::Entry::Occupied(entry) => {
let existing_key = *entry.get();
drop(entry);
#[cfg(test)]
{
if let Some(hook) = TEST_RACE_HOOK.with(|h| h.get()) {
hook(self, id);
}
}
let index = self.inner.write();
if let Some(current_entry) = self.id_mapping.get(&id) {
if *current_entry != existing_key {
return Err(Error::Vector(VectorError::IndexError(
"Concurrent modification detected during update (mapping changed)"
.to_string(),
)));
}
} else {
return Err(Error::Vector(VectorError::IndexError(
"Concurrent modification detected during update (node removed)"
.to_string(),
)));
}
if index.contains(existing_key) {
self.retry_usearch(
|| index.remove(existing_key),
"Failed to remove existing vector",
)?;
} else if index.size() >= index.capacity() {
let new_capacity = (index.capacity() * 2).max(1024);
self.retry_usearch(
|| index.reserve(new_capacity),
"Failed to expand capacity (race recovery)",
)?;
}
if let Err(e) = self
.retry_usearch(|| index.add(existing_key, vector), "Failed to add vector")
{
if e.to_string().contains("Duplicate keys") {
self.retry_usearch(
|| index.remove(existing_key),
"Failed to force remove existing vector",
)?;
self.retry_usearch(
|| index.add(existing_key, vector),
"Failed to add vector after force remove",
)?;
} else {
return Err(e);
}
}
self.stats.vectors_added.fetch_add(1, Ordering::Relaxed);
Ok(())
}
dashmap::mapref::entry::Entry::Vacant(entry) => {
const MAX_VALID_KEY: u64 = u64::MAX - 1000;
drop(entry);
let key = loop {
let current = self.next_key.load(Ordering::SeqCst);
if current > MAX_VALID_KEY {
return Err(Error::Vector(VectorError::IndexError(
"Maximum number of vectors exceeded (key overflow protection)"
.to_string(),
)));
}
match self.next_key.compare_exchange(
current,
current + 1,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(key) => break key,
Err(_) => continue,
}
};
let index = self.inner.write();
if index.size() >= index.capacity() {
let new_capacity = (index.capacity() * 2).max(1024);
self.retry_usearch(
|| index.reserve(new_capacity),
"Failed to expand capacity (race recovery)",
)?;
}
self.retry_usearch(|| index.add(key, vector), "Failed to add vector")?;
#[cfg(test)]
{
if let Some(hook) = TEST_RACE_HOOK.with(|h| h.get()) {
hook(self, id);
}
}
let race_detected = match self.id_mapping.entry(id) {
dashmap::mapref::entry::Entry::Occupied(_) => true,
dashmap::mapref::entry::Entry::Vacant(e) => {
e.insert(key);
false
}
};
if race_detected {
self.retry_usearch(
|| index.remove(key),
"Failed to rollback vector after concurrent add",
)?;
return Err(Error::Vector(VectorError::IndexError(
"Concurrent add detected for same NodeId, vector already exists"
.to_string(),
)));
}
self.reverse_mapping.insert(key, id);
self.stats.vectors_added.fetch_add(1, Ordering::Relaxed);
drop(index);
Ok(())
}
}
})
}
fn remove(&self, id: NodeId) -> Result<()> {
if IN_FILTER_CALLBACK.with(|flag| flag.get()) {
return Err(Error::Vector(VectorError::IndexError(
"Cannot modify index from within a search_with_filter callback. \
This would cause a deadlock due to lock re-entrancy. \
Consider collecting modifications and applying them after the search completes."
.to_string(),
)));
}
if self.is_mmap {
return Err(Error::Vector(VectorError::IndexError(
"Cannot modify memory-mapped index (read-only)".to_string(),
)));
}
self.maybe_block_in_place(|| {
let lock_idx = (id.as_u64() as usize) % self.entry_locks.len();
let _key_guard = self.entry_locks[lock_idx].lock();
let _save_guard = self.save_lock.read();
if let Some((_, key)) = self.id_mapping.remove(&id) {
self.reverse_mapping.remove(&key);
let index = self.inner.write();
self.retry_usearch(|| index.remove(key), "Failed to remove vector")?;
self.stats.vectors_removed.fetch_add(1, Ordering::Relaxed);
}
Ok(())
})
}
fn search(&self, query: &[f32], k: usize) -> Result<Vec<(NodeId, f32)>> {
if IN_FILTER_CALLBACK.with(|flag| flag.get()) {
return Err(Error::Vector(VectorError::IndexError(
"Cannot perform search from within a search_with_filter callback. \
This prevents deadlocks when concurrent writers are pending."
.to_string(),
)));
}
validate_vector(query)?;
if query.len() != self.config.dimensions {
return Err(Error::Vector(VectorError::DimensionMismatch {
expected: self.config.dimensions,
actual: query.len(),
}));
}
let k_capped = k.min(self.max_k);
self.maybe_block_in_place(|| {
let matches = self.retry_usearch(
|| {
let index = self.inner.read();
index.search(query, k_capped)
},
"Search failed",
)?;
self.stats
.searches_performed
.fetch_add(1, Ordering::Relaxed);
let results = self.convert_matches(matches);
Ok(results)
})
}
fn search_with_filter<F>(
&self,
query: &[f32],
k: usize,
predicate: F,
) -> Result<Vec<(NodeId, f32)>>
where
F: Fn(&NodeId) -> bool + Send + Sync,
{
if IN_FILTER_CALLBACK.with(|flag| flag.get()) {
return Err(Error::Vector(VectorError::IndexError(
"Cannot perform search_with_filter from within a search_with_filter callback. \
This prevents deadlocks when concurrent writers are pending."
.to_string(),
)));
}
validate_vector(query)?;
if query.len() != self.config.dimensions {
return Err(Error::Vector(VectorError::DimensionMismatch {
expected: self.config.dimensions,
actual: query.len(),
}));
}
let k_capped = k.min(self.max_k);
if k_capped == 0 {
return Ok(Vec::new());
}
let max_candidates = self.len().min(self.max_k);
if max_candidates == 0 {
return Ok(Vec::new());
}
self.maybe_block_in_place(|| {
let mut candidate_k = k_capped.min(max_candidates);
loop {
let filtered = {
let matches = self.retry_usearch(
|| {
let index = self.inner.read();
index.search(query, candidate_k)
},
"Filtered search failed",
)?;
self.convert_matches_internal(matches, k_capped, |id| {
let _guard = FilterCallbackGuard::new();
predicate(&id)
})
};
if filtered.len() >= k_capped || candidate_k == max_candidates {
self.stats
.searches_performed
.fetch_add(1, Ordering::Relaxed);
return Ok(filtered);
}
let next_candidate_k = (candidate_k.saturating_mul(2)).min(max_candidates);
if next_candidate_k == candidate_k {
self.stats
.searches_performed
.fetch_add(1, Ordering::Relaxed);
return Ok(filtered);
}
candidate_k = next_candidate_k;
}
})
}
fn len(&self) -> usize {
self.inner.read().size()
}
fn dimensions(&self) -> usize {
self.config.dimensions
}
fn distance_metric(&self) -> DistanceMetric {
self.config.metric
}
fn add_batch(&self, items: &[(NodeId, Vec<f32>)]) -> Result<()> {
for (id, vec) in items {
self.add(*id, vec)?;
}
Ok(())
}
fn remove_batch(&self, ids: &[NodeId]) -> Result<()> {
for id in ids {
self.remove(*id)?;
}
Ok(())
}
fn save(&self, path: &Path) -> Result<()> {
if IN_FILTER_CALLBACK.with(|flag| flag.get()) {
return Err(Error::Vector(VectorError::IndexError(
"Cannot save index from within a search_with_filter callback. \
This would cause a deadlock due to lock re-entrancy. \
Consider saving after the search completes."
.to_string(),
)));
}
#[cfg(any(feature = "tokio", feature = "embeddings"))]
if let Ok(handle) = tokio::runtime::Handle::try_current() {
#[allow(clippy::collapsible_if)]
if handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread {
return tokio::task::block_in_place(|| self.save_internal(path));
}
}
self.save_internal(path)
}
fn memory_usage(&self) -> usize {
self.inner.read().memory_usage()
}
fn quantization(&self) -> Quantization {
self.config.quantization
}
fn compact(&self) -> Result<()> {
Ok(())
}
}