use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use anyhow::{anyhow, Result};
use tokio::sync::RwLock;
use usearch::{Index, IndexOptions, MetricKind};
use super::super::store_config::{MmapServeMode, VectorQuant};
use super::types::StoreKeyMap;
pub(super) const INITIAL_CAPACITY: usize = 64;
const DEFAULT_HNSW_MAX_ELEMENTS: usize = 1_000_000;
pub(super) fn hnsw_max_elements() -> usize {
std::env::var("TRUSTY_MAX_CHUNKS")
.ok()
.and_then(|v| v.parse().ok())
.filter(|&n: &usize| n > 0)
.unwrap_or(DEFAULT_HNSW_MAX_ELEMENTS)
}
pub(super) fn validate_embedding(v: &[f32]) -> std::result::Result<(), &'static str> {
let mut sum_sq = 0.0f32;
for &x in v {
if !x.is_finite() {
return Err("contains a non-finite component (NaN or infinity)");
}
sum_sq += x * x;
}
if sum_sq < 1e-12 {
return Err("is an all-zero (degenerate) vector");
}
Ok(())
}
pub struct UsearchStore {
pub(super) index: Arc<RwLock<Index>>,
pub(super) id_to_key: Arc<RwLock<HashMap<String, u64>>>,
pub(super) key_to_id: Arc<RwLock<HashMap<u64, String>>>,
pub(super) next_key: Arc<AtomicU64>,
pub(super) dim: usize,
pub(super) is_view: Arc<AtomicBool>,
pub(super) hnsw_path: Arc<RwLock<Option<PathBuf>>>,
}
impl UsearchStore {
pub fn new(dim: usize) -> Result<Self> {
Self::with_capacity_hint(dim, INITIAL_CAPACITY)
}
pub fn with_capacity_hint(dim: usize, expected_chunks: usize) -> Result<Self> {
let (connectivity, expansion_add, expansion_search) = if expected_chunks > 50_000 {
(32, 128, 64)
} else {
(0, 0, 0)
};
let options = IndexOptions {
dimensions: dim,
metric: MetricKind::Cos,
quantization: VectorQuant::from_env().scalar_kind(),
connectivity,
expansion_add,
expansion_search,
multi: false,
};
let index = Index::new(&options).map_err(|e| anyhow!("usearch Index::new failed: {e}"))?;
let initial = expected_chunks
.max(INITIAL_CAPACITY)
.min(hnsw_max_elements());
index
.reserve(initial)
.map_err(|e| anyhow!("usearch reserve failed: {e}"))?;
Ok(Self {
index: Arc::new(RwLock::new(index)),
id_to_key: Arc::new(RwLock::new(HashMap::new())),
key_to_id: Arc::new(RwLock::new(HashMap::new())),
next_key: Arc::new(AtomicU64::new(1)), dim,
is_view: Arc::new(AtomicBool::new(false)),
hnsw_path: Arc::new(RwLock::new(None)),
})
}
pub fn dim(&self) -> usize {
self.dim
}
#[doc(hidden)]
pub fn in_view_mode(&self) -> bool {
self.is_view.load(Ordering::Acquire)
}
pub async fn save(&self, hnsw_path: &Path) -> Result<()> {
if self.is_view.load(Ordering::Acquire) {
let same_path = {
let guard = self.hnsw_path.read().await;
guard.as_deref() == Some(hnsw_path)
};
if same_path {
tracing::debug!(
"usearch: skipping save for {} — index is in view mode, snapshot is clean",
hnsw_path.display()
);
return Ok(());
}
self.ensure_mutable().await?;
}
let key_map = {
let id_to_key = self.id_to_key.read().await;
StoreKeyMap {
id_to_key: id_to_key.clone(),
next_key: self.next_key.load(Ordering::Relaxed),
dim: self.dim,
}
};
if let Some(parent) = hnsw_path.parent() {
std::fs::create_dir_all(parent)
.map_err(|e| anyhow!("create parent of {}: {e}", hnsw_path.display()))?;
}
let tmp_hnsw = hnsw_path.with_extension("usearch.tmp");
let tmp_hnsw_str = tmp_hnsw
.to_str()
.ok_or_else(|| anyhow!("non-utf8 path: {}", tmp_hnsw.display()))?;
{
let index = self.index.write().await;
index
.save(tmp_hnsw_str)
.map_err(|e| anyhow!("usearch save failed: {e}"))?;
}
std::fs::rename(&tmp_hnsw, hnsw_path).map_err(|e| anyhow!("rename hnsw snapshot: {e}"))?;
let sidecar = hnsw_path.with_extension("keys.json");
let sidecar_tmp = sidecar.with_extension("json.tmp");
let json =
serde_json::to_vec(&key_map).map_err(|e| anyhow!("serialize hnsw key map: {e}"))?;
std::fs::write(&sidecar_tmp, &json)
.map_err(|e| anyhow!("write hnsw key sidecar tmp: {e}"))?;
std::fs::rename(&sidecar_tmp, &sidecar)
.map_err(|e| anyhow!("rename hnsw key sidecar: {e}"))?;
Ok(())
}
pub async fn load_from(hnsw_path: &Path) -> Result<Option<Self>> {
let sidecar = hnsw_path.with_extension("keys.json");
if !hnsw_path.exists() || !sidecar.exists() {
return Ok(None);
}
let json = match std::fs::read(&sidecar) {
Ok(b) => b,
Err(e) => {
tracing::warn!(
"could not read hnsw key sidecar {}: {e} — discarding snapshot",
sidecar.display()
);
return Ok(None);
}
};
let key_map: StoreKeyMap = match serde_json::from_slice(&json) {
Ok(m) => m,
Err(e) => {
tracing::warn!(
"hnsw key sidecar {} is corrupt ({e}) — discarding snapshot",
sidecar.display()
);
return Ok(None);
}
};
let expected_chunks = key_map.id_to_key.len();
let store = Self::with_capacity_hint(key_map.dim, expected_chunks)?;
let hnsw_str = match hnsw_path.to_str() {
Some(s) => s,
None => {
tracing::warn!(
"non-utf8 hnsw path {} — discarding snapshot",
hnsw_path.display()
);
return Ok(None);
}
};
{
let index = store.index.write().await;
if let Err(e) = index.view(hnsw_str) {
tracing::warn!(
"usearch failed to view {} ({e}) — discarding snapshot",
hnsw_path.display()
);
return Ok(None);
}
}
store.is_view.store(true, Ordering::Release);
*store.hnsw_path.write().await = Some(hnsw_path.to_path_buf());
{
let mut id_map = store.id_to_key.write().await;
let mut key_map_rev = store.key_to_id.write().await;
for (id, key) in &key_map.id_to_key {
id_map.insert(id.clone(), *key);
key_map_rev.insert(*key, id.clone());
}
}
store
.next_key
.store(key_map.next_key.max(1), Ordering::Relaxed);
if MmapServeMode::from_env().promote_on_load() {
store.promote_view_to_mutable().await?;
}
Ok(Some(store))
}
pub(super) async fn ensure_mutable(&self) -> Result<()> {
if !self.is_view.load(Ordering::Acquire) {
return Ok(());
}
self.promote_view_to_mutable().await
}
pub(super) async fn promote_view_to_mutable(&self) -> Result<()> {
let path = {
let guard = self.hnsw_path.read().await;
guard.clone()
};
let path = path.ok_or_else(|| {
anyhow!("usearch index is in view mode but has no source path to promote from")
})?;
let path_str = path
.to_str()
.ok_or_else(|| anyhow!("non-utf8 hnsw path: {}", path.display()))?
.to_string();
let index = self.index.write().await;
if !self.is_view.load(Ordering::Acquire) {
return Ok(());
}
index
.load(&path_str)
.map_err(|e| anyhow!("usearch failed to promote view → mutable load: {e}"))?;
let size = index.size();
if index.capacity() < size {
index
.reserve(size.max(INITIAL_CAPACITY))
.map_err(|e| anyhow!("usearch reserve after promote failed: {e}"))?;
}
self.is_view.store(false, Ordering::Release);
tracing::info!(
"usearch: promoted view → mutable for {} ({} vectors)",
path.display(),
size
);
Ok(())
}
pub(super) fn ensure_capacity(index: &Index) -> Result<()> {
let size = index.size();
let cap = index.capacity();
let max_elem = hnsw_max_elements();
if size >= max_elem {
return Err(anyhow!(
"usearch index at TRUSTY_MAX_CHUNKS cap ({} elements) — refusing further upserts",
max_elem
));
}
if size + 1 > cap {
let mut new_cap = (cap.max(1)).saturating_mul(2);
if new_cap > max_elem {
new_cap = max_elem;
}
index
.reserve(new_cap)
.map_err(|e| anyhow!("usearch reserve grow failed: {e}"))?;
}
Ok(())
}
}