use super::batch_schedule::compute_batch_ef_schedule;
use super::distance::DistanceEngine;
use super::graph::{NativeHnsw, NO_ENTRY_POINT};
use super::layer::NodeId;
use crate::distance::DistanceMetric;
use rayon::prelude::*;
use std::path::Path;
pub trait NativeHnswBackend: Send + Sync {
fn search(&self, query: &[f32], k: usize, ef_search: usize) -> Vec<NativeNeighbour>;
fn insert(&self, data: (&[f32], usize)) -> crate::error::Result<()>;
fn parallel_insert(&self, data: &[(&[f32], usize)]) -> crate::error::Result<Vec<usize>>;
fn set_searching_mode(&mut self, mode: bool);
fn file_dump(&self, path: &Path, basename: &str) -> std::io::Result<()>;
fn transform_score(&self, raw_distance: f32) -> f32;
fn len(&self) -> usize;
fn is_empty(&self) -> bool {
self.len() == 0
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct NativeNeighbour {
pub d_id: usize,
pub distance: f32,
}
impl NativeNeighbour {
#[must_use]
pub fn new(d_id: usize, distance: f32) -> Self {
Self { d_id, distance }
}
}
impl<D: DistanceEngine + Send + Sync> NativeHnsw<D> {
pub fn parallel_insert(&self, data: &[(&[f32], usize)]) -> crate::error::Result<Vec<usize>> {
if data.len() < 100 {
let mut assigned_ids = Vec::with_capacity(data.len());
for (vec, _idx) in data {
assigned_ids.push(self.insert(vec)?);
}
return Ok(assigned_ids);
}
let vectors: Vec<&[f32]> = data.iter().map(|(v, _)| *v).collect();
let (assignments, processed) = self.allocate_batch(&vectors)?;
if assignments.is_empty() {
return Ok(Vec::new());
}
let first_node = assignments[0].0;
let connect_start = self.bootstrap_entry_point(&assignments);
self.connect_batch_chunked(&assignments[connect_start..], &processed, first_node)?;
self.finalize_batch(&assignments, connect_start);
#[cfg(feature = "gpu")]
self.invalidate_gpu_caches();
let assigned_ids: Vec<usize> = assignments.iter().map(|(node_id, _)| *node_id).collect();
Ok(assigned_ids)
}
fn bootstrap_entry_point(&self, assignments: &[(NodeId, usize)]) -> usize {
if self.entry_point.load(std::sync::atomic::Ordering::Acquire) == NO_ENTRY_POINT {
let (node_id, layer) = assignments[0];
self.promote_entry_point(node_id, layer);
1
} else {
0
}
}
fn finalize_batch(&self, assignments: &[(NodeId, usize)], connect_start: usize) {
if let Some(best) = assignments.iter().max_by_key(|(_, layer)| *layer) {
self.promote_entry_point(best.0, best.1);
}
if connect_start > 0 {
self.count
.fetch_add(connect_start, std::sync::atomic::Ordering::Relaxed);
}
}
#[must_use]
pub(in crate::index::hnsw::native) fn compute_chunk_size(batch_len: usize) -> usize {
const DEFAULT_CHUNK: usize = 1000;
const MAX_CHUNK: usize = 5000;
(batch_len / 50).clamp(DEFAULT_CHUNK, MAX_CHUNK)
}
#[allow(dead_code)] #[must_use]
pub(in crate::index::hnsw::native) fn adaptive_ef_for_batch(
&self,
batch_size: usize,
) -> (usize, usize) {
let base = self.ef_construction;
let scale = if batch_size > 50_000 {
0.60
} else if batch_size > 10_000 {
0.75
} else if batch_size > 1_000 {
0.85
} else {
return (base, 0);
};
#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
let scaled = (base as f64 * scale) as usize;
let effective_ef = scaled.max(self.max_connections * 4);
let stagnation = effective_ef / 2;
(effective_ef, stagnation)
}
fn connect_batch_chunked(
&self,
assignments: &[(NodeId, usize)],
processed: &[std::borrow::Cow<'_, [f32]>],
first_node: NodeId,
) -> crate::error::Result<()> {
let chunk_size = Self::compute_chunk_size(assignments.len());
let schedule = compute_batch_ef_schedule(
self.ef_construction,
assignments.len(),
self.max_connections,
);
let mut nodes_connected: usize = 0;
for chunk in assignments.chunks(chunk_size) {
let loaded = self.entry_point.load(std::sync::atomic::Ordering::Acquire);
let ep_id = if loaded == NO_ENTRY_POINT {
first_node
} else {
loaded
};
let chunk_offset = nodes_connected;
chunk.par_iter().enumerate().try_for_each(
|(i, (node_id, layer))| -> crate::error::Result<()> {
let batch_idx = node_id - first_node;
let query: &[f32] = &processed[batch_idx];
let current_ep = self.greedy_descent_upper_layers(query, *layer, ep_id);
let ef = schedule.ef_for_position(chunk_offset + i);
let stagnation = ef / 2;
self.connect_node_with_ef(*node_id, query, *layer, current_ep, ef, stagnation);
Ok(())
},
)?;
if let Some(best) = chunk.iter().max_by_key(|(_, layer)| *layer) {
self.promote_entry_point(best.0, best.1);
}
self.count
.fetch_add(chunk.len(), std::sync::atomic::Ordering::Relaxed);
nodes_connected += chunk.len();
}
Ok(())
}
pub fn set_searching_mode(&mut self, _mode: bool) {
}
#[must_use]
pub fn search_neighbours(
&self,
query: &[f32],
k: usize,
ef_search: usize,
) -> Vec<NativeNeighbour> {
self.search(query, k, ef_search)
.into_iter()
.map(|(id, dist)| NativeNeighbour::new(id, dist))
.collect()
}
#[must_use]
pub fn transform_score(&self, raw_distance: f32) -> f32 {
match self.distance.metric() {
DistanceMetric::Cosine => (1.0 - raw_distance).clamp(0.0, 1.0),
DistanceMetric::Euclidean => raw_distance.sqrt(),
DistanceMetric::Hamming | DistanceMetric::Jaccard => raw_distance,
DistanceMetric::DotProduct => -raw_distance,
}
}
}
impl<D: DistanceEngine + Send + Sync> NativeHnswBackend for NativeHnsw<D> {
fn search(&self, query: &[f32], k: usize, ef_search: usize) -> Vec<NativeNeighbour> {
self.search_neighbours(query, k, ef_search)
}
fn insert(&self, data: (&[f32], usize)) -> crate::error::Result<()> {
let (vector, expected_idx) = data;
let assigned_id = self.insert(vector)?;
if assigned_id != expected_idx {
tracing::warn!(
"NativeHnsw node_id mismatch: expected {expected_idx}, got {assigned_id}"
);
}
Ok(())
}
fn parallel_insert(&self, data: &[(&[f32], usize)]) -> crate::error::Result<Vec<usize>> {
NativeHnsw::parallel_insert(self, data)
}
fn set_searching_mode(&mut self, mode: bool) {
NativeHnsw::set_searching_mode(self, mode);
}
fn file_dump(&self, path: &Path, basename: &str) -> std::io::Result<()> {
NativeHnsw::file_dump(self, path, basename)
}
fn transform_score(&self, raw_distance: f32) -> f32 {
NativeHnsw::transform_score(self, raw_distance)
}
fn len(&self) -> usize {
NativeHnsw::len(self)
}
}