use super::distance::DistanceEngine;
use super::graph::NativeHnsw;
use super::layer::{Layer, NodeId};
use crate::distance::DistanceMetric;
use rayon::prelude::*;
use std::fs::File;
use std::io::{BufReader, BufWriter, Read, Write};
use std::path::Path;
struct LoadedGraph {
layers: Vec<Layer>,
num_layers: usize,
max_connections: usize,
max_connections_0: usize,
ef_construction: usize,
entry_point: usize,
max_layer: usize,
}
struct GraphFileHeader {
num_layers: u32,
max_connections: u32,
max_connections_0: u32,
ef_construction: u32,
entry_point: u64,
max_layer: u32,
}
fn read_u32_field(reader: &mut BufReader<File>) -> std::io::Result<usize> {
let mut buf = [0u8; 4];
reader.read_exact(&mut buf)?;
Ok(u32::from_le_bytes(buf) as usize)
}
fn read_u64_field(reader: &mut BufReader<File>) -> std::io::Result<usize> {
let mut buf = [0u8; 8];
reader.read_exact(&mut buf)?;
Ok(u64::from_le_bytes(buf) as usize)
}
pub trait NativeHnswBackend: Send + Sync {
fn search(&self, query: &[f32], k: usize, ef_search: usize) -> Vec<NativeNeighbour>;
fn insert(&self, data: (&[f32], usize)) -> crate::error::Result<()>;
fn parallel_insert(&self, data: &[(&[f32], usize)]) -> crate::error::Result<Vec<usize>>;
fn set_searching_mode(&mut self, mode: bool);
fn file_dump(&self, path: &Path, basename: &str) -> std::io::Result<()>;
fn transform_score(&self, raw_distance: f32) -> f32;
fn len(&self) -> usize;
fn is_empty(&self) -> bool {
self.len() == 0
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct NativeNeighbour {
pub d_id: usize,
pub distance: f32,
}
impl NativeNeighbour {
#[must_use]
pub fn new(d_id: usize, distance: f32) -> Self {
Self { d_id, distance }
}
}
impl<D: DistanceEngine + Send + Sync> NativeHnsw<D> {
pub fn parallel_insert(&self, data: &[(&[f32], usize)]) -> crate::error::Result<Vec<usize>> {
if data.len() < 100 {
let mut assigned_ids = Vec::with_capacity(data.len());
for (vec, _idx) in data {
assigned_ids.push(self.insert(vec)?);
}
return Ok(assigned_ids);
}
let vectors: Vec<&[f32]> = data.iter().map(|(v, _)| *v).collect();
let (assignments, processed) = self.allocate_batch(&vectors)?;
if assignments.is_empty() {
return Ok(Vec::new());
}
let first_node = assignments[0].0;
let connect_start = self.bootstrap_entry_point(&assignments);
self.connect_batch_chunked(&assignments[connect_start..], &processed, first_node)?;
self.finalize_batch(&assignments, connect_start);
let assigned_ids: Vec<usize> = assignments.iter().map(|(node_id, _)| *node_id).collect();
Ok(assigned_ids)
}
fn bootstrap_entry_point(&self, assignments: &[(NodeId, usize)]) -> usize {
if self.entry_point.read().is_none() {
let (node_id, layer) = assignments[0];
self.promote_entry_point(node_id, layer);
1
} else {
0
}
}
fn finalize_batch(&self, assignments: &[(NodeId, usize)], connect_start: usize) {
if let Some(best) = assignments.iter().max_by_key(|(_, layer)| *layer) {
self.promote_entry_point(best.0, best.1);
}
if connect_start > 0 {
self.count
.fetch_add(connect_start, std::sync::atomic::Ordering::Relaxed);
}
}
#[must_use]
pub(in crate::index::hnsw::native) fn compute_chunk_size(batch_len: usize) -> usize {
const DEFAULT_CHUNK: usize = 1000;
const MAX_CHUNK: usize = 5000;
(batch_len / 50).clamp(DEFAULT_CHUNK, MAX_CHUNK)
}
fn connect_batch_chunked(
&self,
assignments: &[(NodeId, usize)],
processed: &[std::borrow::Cow<'_, [f32]>],
first_node: NodeId,
) -> crate::error::Result<()> {
let chunk_size = Self::compute_chunk_size(assignments.len());
for chunk in assignments.chunks(chunk_size) {
let ep_id = (*self.entry_point.read()).unwrap_or(first_node);
chunk
.par_iter()
.try_for_each(|(node_id, layer)| -> crate::error::Result<()> {
let batch_idx = node_id - first_node;
let query: &[f32] = &processed[batch_idx];
let current_ep = self.greedy_descent_upper_layers(query, *layer, ep_id);
self.connect_node(*node_id, query, *layer, current_ep);
Ok(())
})?;
if let Some(best) = chunk.iter().max_by_key(|(_, layer)| *layer) {
self.promote_entry_point(best.0, best.1);
}
self.count
.fetch_add(chunk.len(), std::sync::atomic::Ordering::Relaxed);
}
Ok(())
}
pub fn set_searching_mode(&mut self, _mode: bool) {
}
#[must_use]
pub fn search_neighbours(
&self,
query: &[f32],
k: usize,
ef_search: usize,
) -> Vec<NativeNeighbour> {
self.search(query, k, ef_search)
.into_iter()
.map(|(id, dist)| NativeNeighbour::new(id, dist))
.collect()
}
#[must_use]
pub fn transform_score(&self, raw_distance: f32) -> f32 {
match self.distance.metric() {
DistanceMetric::Cosine => (1.0 - raw_distance).clamp(0.0, 1.0),
DistanceMetric::Euclidean | DistanceMetric::Hamming | DistanceMetric::Jaccard => {
raw_distance
}
DistanceMetric::DotProduct => -raw_distance,
}
}
pub fn file_dump(&self, path: &Path, basename: &str) -> std::io::Result<()> {
let count = self.dump_vectors_file(path, basename)?;
self.dump_graph_file(path, basename, count)?;
Ok(())
}
fn dump_vectors_file(&self, path: &Path, basename: &str) -> std::io::Result<u64> {
let vectors_path = path.join(format!("{basename}.vectors"));
let vectors_guard = self.vectors.read();
let mut writer = BufWriter::new(File::create(&vectors_path)?);
#[allow(clippy::cast_possible_truncation)]
let (count, dimension): (u64, u32) = match vectors_guard.as_ref() {
Some(v) => (v.len() as u64, v.dimension() as u32),
None => (0, 0),
};
Self::write_vectors_header(&mut writer, count, dimension)?;
if let Some(vectors) = vectors_guard.as_ref() {
Self::write_vector_data(&mut writer, vectors)?;
}
writer.flush()?;
Ok(count)
}
fn write_vectors_header(
writer: &mut BufWriter<File>,
count: u64,
dimension: u32,
) -> std::io::Result<()> {
let version: u32 = 1;
writer.write_all(&version.to_le_bytes())?;
writer.write_all(&count.to_le_bytes())?;
writer.write_all(&dimension.to_le_bytes())?;
Ok(())
}
fn write_vector_data(
writer: &mut BufWriter<File>,
vectors: &crate::perf_optimizations::ContiguousVectors,
) -> std::io::Result<()> {
for i in 0..vectors.len() {
if let Some(vec) = vectors.get(i) {
for &val in vec {
writer.write_all(&val.to_le_bytes())?;
}
}
}
Ok(())
}
fn dump_graph_file(&self, path: &Path, basename: &str, count: u64) -> std::io::Result<()> {
let graph_path = path.join(format!("{basename}.graph"));
let layers = self.layers.read();
let mut writer = BufWriter::new(File::create(&graph_path)?);
#[allow(clippy::cast_possible_truncation)]
let header = GraphFileHeader {
num_layers: layers.len() as u32,
max_connections: self.max_connections as u32,
max_connections_0: self.max_connections_0 as u32,
ef_construction: self.ef_construction as u32,
entry_point: self.entry_point.read().unwrap_or(0) as u64,
max_layer: self.max_layer.load(std::sync::atomic::Ordering::Relaxed) as u32,
};
Self::write_graph_header(&mut writer, &header, count)?;
Self::write_layer_data(&mut writer, &layers)?;
writer.flush()
}
fn write_graph_header(
writer: &mut BufWriter<File>,
header: &GraphFileHeader,
count: u64,
) -> std::io::Result<()> {
let version: u32 = 1;
writer.write_all(&version.to_le_bytes())?;
writer.write_all(&header.num_layers.to_le_bytes())?;
writer.write_all(&header.max_connections.to_le_bytes())?;
writer.write_all(&header.max_connections_0.to_le_bytes())?;
writer.write_all(&header.ef_construction.to_le_bytes())?;
writer.write_all(&header.entry_point.to_le_bytes())?;
writer.write_all(&header.max_layer.to_le_bytes())?;
writer.write_all(&count.to_le_bytes())?;
Ok(())
}
fn write_layer_data(writer: &mut BufWriter<File>, layers: &[Layer]) -> std::io::Result<()> {
for layer in layers {
let num_nodes = layer.neighbors.len() as u64;
writer.write_all(&num_nodes.to_le_bytes())?;
for node_neighbors in &layer.neighbors {
let neighbors = node_neighbors.read();
#[allow(clippy::cast_possible_truncation)]
let num_neighbors = neighbors.len() as u32;
writer.write_all(&num_neighbors.to_le_bytes())?;
for &neighbor in neighbors.iter() {
#[allow(clippy::cast_possible_truncation)]
let neighbor_u32 = neighbor as u32;
writer.write_all(&neighbor_u32.to_le_bytes())?;
}
}
}
Ok(())
}
pub fn file_load(path: &Path, basename: &str, distance: D) -> std::io::Result<Self> {
let vectors_path = path.join(format!("{basename}.vectors"));
let (vectors, count) = Self::load_vectors_file(&vectors_path)?;
let graph_path = path.join(format!("{basename}.graph"));
let graph = Self::load_graph_file(&graph_path)?;
let level_mult = 1.0 / (graph.max_connections as f64).ln();
let entry_point = if count > 0 {
Some(graph.entry_point)
} else {
None
};
Ok(Self {
distance,
vectors: parking_lot::RwLock::new(vectors),
layers: parking_lot::RwLock::new(graph.layers),
entry_point: parking_lot::RwLock::new(entry_point),
max_layer: std::sync::atomic::AtomicUsize::new(graph.max_layer),
count: std::sync::atomic::AtomicUsize::new(count),
rng_state: std::sync::atomic::AtomicU64::new(0x5DEE_CE66_D1A4_B5B5),
max_connections: graph.max_connections,
max_connections_0: graph.max_connections_0,
ef_construction: graph.ef_construction,
level_mult,
alpha: 1.0,
stagnation_limit: graph.ef_construction / 4,
pre_allocated_capacity: std::sync::atomic::AtomicUsize::new(0),
})
}
fn load_vectors_file(
path: &Path,
) -> std::io::Result<(Option<crate::perf_optimizations::ContiguousVectors>, usize)> {
let mut reader = BufReader::new(File::open(path)?);
let (count, dimension) = Self::read_vectors_header(&mut reader)?;
if count == 0 || dimension == 0 {
return Ok((None, 0));
}
let storage = Self::read_vector_data(&mut reader, count, dimension)?;
Ok((Some(storage), count))
}
fn read_vectors_header(reader: &mut BufReader<File>) -> std::io::Result<(usize, usize)> {
let mut buf4 = [0u8; 4];
let mut buf8 = [0u8; 8];
reader.read_exact(&mut buf4)?;
let version = u32::from_le_bytes(buf4);
if version != 1 {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Unsupported version: {version}"),
));
}
reader.read_exact(&mut buf8)?;
let count = u64::from_le_bytes(buf8) as usize;
reader.read_exact(&mut buf4)?;
let dimension = u32::from_le_bytes(buf4) as usize;
Ok((count, dimension))
}
fn read_vector_data(
reader: &mut BufReader<File>,
count: usize,
dimension: usize,
) -> std::io::Result<crate::perf_optimizations::ContiguousVectors> {
let mut storage =
crate::perf_optimizations::ContiguousVectors::new(dimension, count.max(16))
.map_err(|e| std::io::Error::other(e.to_string()))?;
let mut buf4 = [0u8; 4];
let mut buf_vec = vec![0f32; dimension];
for _ in 0..count {
for slot in &mut buf_vec {
reader.read_exact(&mut buf4)?;
*slot = f32::from_le_bytes(buf4);
}
storage
.push(&buf_vec)
.map_err(|e| std::io::Error::other(e.to_string()))?;
}
Ok(storage)
}
fn load_graph_file(path: &Path) -> std::io::Result<LoadedGraph> {
let mut reader = BufReader::new(File::open(path)?);
let graph_header = Self::read_graph_header(&mut reader)?;
let layers = Self::read_graph_layers(&mut reader, graph_header.num_layers)?;
Ok(LoadedGraph {
layers,
num_layers: graph_header.num_layers,
max_connections: graph_header.max_connections,
max_connections_0: graph_header.max_connections_0,
ef_construction: graph_header.ef_construction,
entry_point: graph_header.entry_point,
max_layer: graph_header.max_layer,
})
}
fn read_graph_header(reader: &mut BufReader<File>) -> std::io::Result<LoadedGraph> {
Self::validate_graph_version(reader)?;
Self::read_graph_header_fields(reader)
}
fn validate_graph_version(reader: &mut BufReader<File>) -> std::io::Result<()> {
let mut buf4 = [0u8; 4];
reader.read_exact(&mut buf4)?;
let version = u32::from_le_bytes(buf4);
if version != 1 {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Unsupported graph version: {version}"),
));
}
Ok(())
}
fn read_graph_header_fields(reader: &mut BufReader<File>) -> std::io::Result<LoadedGraph> {
let num_layers = read_u32_field(reader)?;
let max_connections = read_u32_field(reader)?;
let max_connections_0 = read_u32_field(reader)?;
let ef_construction = read_u32_field(reader)?;
let entry_point = read_u64_field(reader)?;
let max_layer = read_u32_field(reader)?;
let _count_check = read_u64_field(reader)?;
Ok(LoadedGraph {
layers: Vec::new(), num_layers,
max_connections,
max_connections_0,
ef_construction,
entry_point,
max_layer,
})
}
fn read_graph_layers(
reader: &mut BufReader<File>,
num_layers: usize,
) -> std::io::Result<Vec<Layer>> {
let mut buf4 = [0u8; 4];
let mut buf8 = [0u8; 8];
let mut layers = Vec::with_capacity(num_layers);
for _ in 0..num_layers {
reader.read_exact(&mut buf8)?;
let num_nodes = u64::from_le_bytes(buf8) as usize;
let layer = Layer::new(num_nodes);
for node_id in 0..num_nodes {
reader.read_exact(&mut buf4)?;
let num_neighbors = u32::from_le_bytes(buf4) as usize;
let mut neighbors = Vec::with_capacity(num_neighbors);
for _ in 0..num_neighbors {
reader.read_exact(&mut buf4)?;
neighbors.push(u32::from_le_bytes(buf4) as usize);
}
layer.set_neighbors(node_id, neighbors);
}
layers.push(layer);
}
Ok(layers)
}
}
impl<D: DistanceEngine + Send + Sync> NativeHnswBackend for NativeHnsw<D> {
fn search(&self, query: &[f32], k: usize, ef_search: usize) -> Vec<NativeNeighbour> {
self.search_neighbours(query, k, ef_search)
}
fn insert(&self, data: (&[f32], usize)) -> crate::error::Result<()> {
let (vector, expected_idx) = data;
let assigned_id = self.insert(vector)?;
if assigned_id != expected_idx {
tracing::warn!(
"NativeHnsw node_id mismatch: expected {expected_idx}, got {assigned_id}"
);
}
Ok(())
}
fn parallel_insert(&self, data: &[(&[f32], usize)]) -> crate::error::Result<Vec<usize>> {
NativeHnsw::parallel_insert(self, data)
}
fn set_searching_mode(&mut self, mode: bool) {
NativeHnsw::set_searching_mode(self, mode);
}
fn file_dump(&self, path: &Path, basename: &str) -> std::io::Result<()> {
NativeHnsw::file_dump(self, path, basename)
}
fn transform_score(&self, raw_distance: f32) -> f32 {
NativeHnsw::transform_score(self, raw_distance)
}
fn len(&self) -> usize {
NativeHnsw::len(self)
}
}