use crate::distance::DistanceMetric;
use std::collections::HashMap;
use std::io::Write;
use std::path::Path;
use std::sync::atomic::{AtomicU64, Ordering};
pub(crate) struct HnswMeta {
pub dimension: usize,
pub metric: DistanceMetric,
pub enable_vector_storage: bool,
pub storage_mode: crate::StorageMode,
pub generation: u64,
}
pub(crate) struct HnswMappingsData {
pub id_to_idx: HashMap<u64, usize>,
pub idx_to_id: HashMap<usize, u64>,
pub next_idx: usize,
pub generation: u64,
}
pub(crate) struct HnswVectorsData {
pub vectors: Vec<(usize, Vec<f32>)>,
pub generation: u64,
}
pub(crate) fn save_meta(path: &Path, meta: &HnswMeta) -> std::io::Result<()> {
let meta_path = path.join("native_meta.bin");
let bytes = postcard::to_allocvec(&(
meta.dimension,
meta.metric as u8,
meta.enable_vector_storage,
storage_mode_to_u8(meta.storage_mode),
meta.generation,
))
.map_err(std::io::Error::other)?;
atomic_write(&meta_path, &bytes)
}
pub(crate) fn load_meta(path: &Path) -> std::io::Result<HnswMeta> {
let meta_path = path.join("native_meta.bin");
let bytes = std::fs::read(meta_path)?;
if let Ok((dimension, metric_u8, enable_vector_storage, storage_mode_u8, generation)) =
postcard::from_bytes::<(usize, u8, bool, u8, u64)>(&bytes)
{
let metric = metric_from_u8(metric_u8)?;
let storage_mode = storage_mode_from_u8(storage_mode_u8);
return Ok(HnswMeta {
dimension,
metric,
enable_vector_storage,
storage_mode,
generation,
});
}
if let Ok((dimension, metric_u8, enable_vector_storage, storage_mode_u8)) =
postcard::from_bytes::<(usize, u8, bool, u8)>(&bytes)
{
let metric = metric_from_u8(metric_u8)?;
let storage_mode = storage_mode_from_u8(storage_mode_u8);
return Ok(HnswMeta {
dimension,
metric,
enable_vector_storage,
storage_mode,
generation: 0,
});
}
let (dimension, metric_u8, enable_vector_storage): (usize, u8, bool) =
postcard::from_bytes(&bytes).map_err(std::io::Error::other)?;
let metric = metric_from_u8(metric_u8)?;
Ok(HnswMeta {
dimension,
metric,
enable_vector_storage,
storage_mode: crate::StorageMode::Full,
generation: 0,
})
}
pub(crate) fn save_mappings(path: &Path, data: &HnswMappingsData) -> std::io::Result<()> {
let mappings_path = path.join("native_mappings.bin");
let bytes = postcard::to_allocvec(&(
&data.id_to_idx,
&data.idx_to_id,
data.next_idx,
data.generation,
))
.map_err(std::io::Error::other)?;
atomic_write(&mappings_path, &bytes)
}
pub(crate) fn load_mappings(path: &Path) -> std::io::Result<HnswMappingsData> {
let mappings_path = path.join("native_mappings.bin");
let bytes = std::fs::read(mappings_path)?;
if let Ok((id_to_idx, idx_to_id, next_idx, generation)) =
postcard::from_bytes::<(HashMap<u64, usize>, HashMap<usize, u64>, usize, u64)>(&bytes)
{
return Ok(HnswMappingsData {
id_to_idx,
idx_to_id,
next_idx,
generation,
});
}
let (id_to_idx, idx_to_id, next_idx): (HashMap<u64, usize>, HashMap<usize, u64>, usize) =
postcard::from_bytes(&bytes).map_err(std::io::Error::other)?;
Ok(HnswMappingsData {
id_to_idx,
idx_to_id,
next_idx,
generation: 0,
})
}
pub(crate) fn save_vectors(path: &Path, data: &HnswVectorsData) -> std::io::Result<()> {
let vectors_path = path.join("native_vectors.bin");
let bytes =
postcard::to_allocvec(&(&data.vectors, data.generation)).map_err(std::io::Error::other)?;
atomic_write(&vectors_path, &bytes)
}
pub(crate) fn load_vectors(path: &Path) -> std::io::Result<HnswVectorsData> {
let vectors_path = path.join("native_vectors.bin");
let bytes = std::fs::read(vectors_path)?;
if let Ok((vectors, generation)) = postcard::from_bytes::<(Vec<(usize, Vec<f32>)>, u64)>(&bytes)
{
return Ok(HnswVectorsData {
vectors,
generation,
});
}
let vectors: Vec<(usize, Vec<f32>)> =
postcard::from_bytes(&bytes).map_err(std::io::Error::other)?;
Ok(HnswVectorsData {
vectors,
generation: 0,
})
}
fn atomic_write(final_path: &Path, data: &[u8]) -> std::io::Result<()> {
static COUNTER: AtomicU64 = AtomicU64::new(0);
let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
let pid = std::process::id();
let tid = std::thread::current().id();
let file_name = final_path.file_name().unwrap_or_default().to_string_lossy();
let tmp_name = format!("{file_name}.tmp.{pid}.{tid:?}.{seq}");
let tmp_path = final_path.with_file_name(&tmp_name);
let result = atomic_write_inner(&tmp_path, final_path, data);
if result.is_err() {
let _ = std::fs::remove_file(&tmp_path);
}
result
}
fn atomic_write_inner(tmp_path: &Path, final_path: &Path, data: &[u8]) -> std::io::Result<()> {
let file = std::fs::File::create(tmp_path)?;
let mut writer = std::io::BufWriter::new(file);
writer.write_all(data)?;
writer.flush()?;
writer.get_ref().sync_all()?;
std::fs::rename(tmp_path, final_path)
}
pub(crate) fn load_vectors_or_disable(
path: &Path,
meta: &HnswMeta,
) -> std::io::Result<(super::sharded_vectors::ShardedVectors, bool, u64)> {
use super::sharded_vectors::ShardedVectors;
if !meta.enable_vector_storage {
return Ok((ShardedVectors::new(meta.dimension), false, 0));
}
match load_vectors(path) {
Ok(vectors_data) => {
let vectors = ShardedVectors::new(meta.dimension);
vectors.insert_batch(vectors_data.vectors);
Ok((vectors, true, vectors_data.generation))
}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
tracing::debug!(
"native_vectors.bin missing during HNSW load; disabling vector storage for safety"
);
Ok((ShardedVectors::new(meta.dimension), false, 0))
}
Err(err) => Err(err),
}
}
pub(crate) fn save_or_cleanup_vectors(
path: &Path,
enable_vector_storage: bool,
vectors: &super::sharded_vectors::ShardedVectors,
generation: u64,
) -> std::io::Result<()> {
if enable_vector_storage {
save_vectors(
path,
&HnswVectorsData {
vectors: vectors.collect_for_parallel(),
generation,
},
)
} else {
let vectors_path = path.join("native_vectors.bin");
if vectors_path.exists() {
std::fs::remove_file(vectors_path)?;
}
Ok(())
}
}
fn read_current_generation(path: &Path) -> std::io::Result<Option<u64>> {
match load_meta(path) {
Ok(meta) => Ok(Some(meta.generation)),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(err) => Err(err),
}
}
pub(crate) fn next_generation(path: &Path) -> std::io::Result<u64> {
Ok(read_current_generation(path)?
.unwrap_or(0)
.saturating_add(1))
}
pub(crate) fn save_graph_generation(path: &Path, generation: u64) -> std::io::Result<()> {
let marker_path = path.join("native_hnsw.gen");
let bytes = postcard::to_allocvec(&generation).map_err(std::io::Error::other)?;
atomic_write(&marker_path, &bytes)
}
pub(crate) fn load_graph_generation(path: &Path) -> std::io::Result<u64> {
let marker_path = path.join("native_hnsw.gen");
match std::fs::read(&marker_path) {
Ok(bytes) => postcard::from_bytes::<u64>(&bytes).map_err(std::io::Error::other),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(0),
Err(err) => Err(err),
}
}
pub(crate) fn save_sidecars(
path: &Path,
mappings: &super::sharded_mappings::ShardedMappings,
vectors: &super::sharded_vectors::ShardedVectors,
meta: &HnswMeta,
new_gen: u64,
) -> std::io::Result<()> {
let (id_to_idx, idx_to_id, next_idx) = mappings.as_parts();
save_mappings(
path,
&HnswMappingsData {
id_to_idx,
idx_to_id,
next_idx,
generation: new_gen,
},
)?;
save_or_cleanup_vectors(path, meta.enable_vector_storage, vectors, new_gen)?;
let stamped_meta = HnswMeta {
generation: new_gen,
..*meta
};
save_meta(path, &stamped_meta)
}
pub(crate) fn load_sidecars(
path: &Path,
meta: &HnswMeta,
) -> std::io::Result<(
super::sharded_mappings::ShardedMappings,
super::sharded_vectors::ShardedVectors,
bool,
)> {
let graph_generation = load_graph_generation(path)?;
if graph_generation != meta.generation {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"incomplete save detected: graph generation {} but meta generation {} \
(crash between graph dump and sidecar writes, database state inconsistent)",
graph_generation, meta.generation,
),
));
}
let mappings_data = load_mappings(path)?;
if mappings_data.generation != meta.generation {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"incomplete save detected: mappings generation {} but meta generation {} \
(crash between sidecar writes, database state inconsistent)",
mappings_data.generation, meta.generation,
),
));
}
let (vectors, enable_vector_storage, vectors_generation) = load_vectors_or_disable(path, meta)?;
if enable_vector_storage && vectors_generation != meta.generation {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"incomplete save detected: vectors generation {} but meta generation {} \
(crash between sidecar writes, database state inconsistent)",
vectors_generation, meta.generation,
),
));
}
let mappings = super::sharded_mappings::ShardedMappings::from_parts(
mappings_data.id_to_idx,
mappings_data.idx_to_id,
mappings_data.next_idx,
);
Ok((mappings, vectors, enable_vector_storage))
}
fn metric_from_u8(value: u8) -> std::io::Result<DistanceMetric> {
match value {
0 => Ok(DistanceMetric::Cosine),
1 => Ok(DistanceMetric::Euclidean),
2 => Ok(DistanceMetric::DotProduct),
3 => Ok(DistanceMetric::Hamming),
4 => Ok(DistanceMetric::Jaccard),
_ => Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Unknown distance metric",
)),
}
}
const fn storage_mode_to_u8(mode: crate::StorageMode) -> u8 {
match mode {
crate::StorageMode::Full => 0,
crate::StorageMode::SQ8 => 1,
crate::StorageMode::Binary => 2,
crate::StorageMode::ProductQuantization => 3,
crate::StorageMode::RaBitQ => 4,
}
}
const fn storage_mode_from_u8(value: u8) -> crate::StorageMode {
match value {
1 => crate::StorageMode::SQ8,
2 => crate::StorageMode::Binary,
3 => crate::StorageMode::ProductQuantization,
4 => crate::StorageMode::RaBitQ,
_ => crate::StorageMode::Full,
}
}