use crate::index::HnswIndex;
use crate::storage::{MmapStorage, PayloadStorage, VectorStorage};
use parking_lot::RwLock;
use std::sync::Arc;
pub(crate) fn recover_hnsw_gap(
vector_storage: &Arc<RwLock<MmapStorage>>,
index: &Arc<HnswIndex>,
dimension: usize,
) -> crate::error::Result<usize> {
let storage = vector_storage.read();
let storage_count = storage.len();
let hnsw_count = index.len();
if storage_count == 0 || storage_count == hnsw_count {
tracing::debug!(
storage_count,
hnsw_count,
"gap recovery skipped — counts match, no scan needed"
);
return Ok(0);
}
let gap_ids = find_gap_ids(&storage, index);
if gap_ids.is_empty() {
return Ok(0);
}
let vectors = retrieve_valid_vectors(&storage, &gap_ids, dimension)?;
let gap_total = gap_ids.len();
drop(storage);
let recovered = reindex_vectors(index, &vectors);
tracing::warn!(
recovered,
gap_total,
"Crash recovery: re-indexed gap vectors into HNSW"
);
Ok(recovered)
}
fn find_gap_ids(storage: &MmapStorage, index: &HnswIndex) -> Vec<u64> {
storage
.ids()
.into_iter()
.filter(|id| !index.mappings.contains(*id))
.collect()
}
fn retrieve_valid_vectors(
storage: &MmapStorage,
gap_ids: &[u64],
dimension: usize,
) -> crate::error::Result<Vec<(u64, Vec<f32>)>> {
let mut vectors = Vec::with_capacity(gap_ids.len());
for &id in gap_ids {
match storage.retrieve(id) {
Ok(Some(v)) if v.len() == dimension => vectors.push((id, v)),
Ok(Some(v)) => tracing::warn!(
id,
expected = dimension,
actual = v.len(),
"Skipping gap vector with mismatched dimension"
),
Ok(None) => {} Err(e) => {
return Err(crate::error::Error::Storage(format!(
"failed to retrieve gap vector {id}: {e}"
)))
}
}
}
Ok(vectors)
}
fn reindex_vectors(index: &HnswIndex, vectors: &[(u64, Vec<f32>)]) -> usize {
if vectors.is_empty() {
return 0;
}
let refs: Vec<(u64, &[f32])> = vectors.iter().map(|(id, v)| (*id, v.as_slice())).collect();
index.insert_batch_parallel(refs)
}
use crate::collection::types::CollectionConfig;
use crate::error::{Error, Result};
use crate::storage::LogPayloadStorage;
pub(super) fn load_config(path: &std::path::Path) -> Result<CollectionConfig> {
let config_path = path.join("config.json");
let config_data = std::fs::read_to_string(&config_path)?;
let config: CollectionConfig =
serde_json::from_str(&config_data).map_err(|e| Error::Serialization(e.to_string()))?;
validate_schema_version(&config)?;
Ok(config)
}
fn validate_schema_version(config: &CollectionConfig) -> Result<()> {
use crate::collection::types::CURRENT_SCHEMA_VERSION;
let version = if config.schema_version == 0 {
1
} else {
config.schema_version
};
if version > CURRENT_SCHEMA_VERSION {
return Err(Error::IncompatibleSchemaVersion {
found: version,
supported: CURRENT_SCHEMA_VERSION,
});
}
Ok(())
}
pub(super) fn reconcile_point_count(
config: &CollectionConfig,
vector_storage: &Arc<RwLock<MmapStorage>>,
payload_storage: &Arc<RwLock<LogPayloadStorage>>,
) -> usize {
if config.metadata_only {
payload_storage.read().ids().len()
} else {
vector_storage.read().len()
}
}
#[cfg(feature = "persistence")]
pub(super) fn run_crash_recovery(
config: &CollectionConfig,
vector_storage: &Arc<RwLock<MmapStorage>>,
index: &Arc<HnswIndex>,
wal_touched_ids: &[u64],
) -> Result<bool> {
if config.metadata_only || config.dimension == 0 {
return Ok(false);
}
let recovered = recover_hnsw_gap(vector_storage, index, config.dimension)?;
let orphans = remove_orphan_ids(vector_storage, index);
let stale = reindex_stale_wal_ids(vector_storage, index, wal_touched_ids, config.dimension)?;
if recovered + orphans + stale > 0 {
tracing::info!(
collection = %config.name,
recovered,
orphans,
stale,
"Collection index reconciliation completed on open"
);
}
Ok(recovered + orphans + stale > 0)
}
#[cfg(not(feature = "persistence"))]
pub(super) fn run_crash_recovery(
_config: &CollectionConfig,
_vector_storage: &Arc<RwLock<MmapStorage>>,
_index: &Arc<HnswIndex>,
_wal_touched_ids: &[u64],
) -> Result<bool> {
Ok(false)
}
#[cfg(feature = "persistence")]
fn remove_orphan_ids(vector_storage: &Arc<RwLock<MmapStorage>>, index: &Arc<HnswIndex>) -> usize {
if index.is_empty() {
return 0;
}
let storage_ids: std::collections::HashSet<u64> =
vector_storage.read().ids().into_iter().collect();
let orphan_ids: Vec<u64> = index
.mappings
.iter()
.map(|(id, _)| id)
.filter(|id| !storage_ids.contains(id))
.collect();
for &id in &orphan_ids {
index.remove(id);
}
if !orphan_ids.is_empty() {
tracing::warn!(
orphans = orphan_ids.len(),
"Crash recovery: removed HNSW ids absent from vector storage"
);
}
orphan_ids.len()
}
#[cfg(feature = "persistence")]
fn reindex_stale_wal_ids(
vector_storage: &Arc<RwLock<MmapStorage>>,
index: &Arc<HnswIndex>,
wal_touched_ids: &[u64],
dimension: usize,
) -> Result<usize> {
let storage = vector_storage.read();
let mut stale: Vec<(u64, Vec<f32>)> = Vec::new();
for &id in wal_touched_ids {
let Some(idx) = index.mappings.get_idx(id) else {
continue; };
match storage.retrieve(id) {
Ok(Some(v)) if v.len() == dimension => {
let matches = index
.vectors
.with_vector(idx, |indexed| indexed == v.as_slice())
.unwrap_or(false);
if !matches {
stale.push((id, v));
}
}
Ok(_) => {} Err(e) => {
return Err(Error::Storage(format!(
"failed to retrieve WAL-touched vector {id}: {e}"
)))
}
}
}
drop(storage);
let reindexed = reindex_vectors(index, &stale);
if reindexed > 0 {
tracing::warn!(
reindexed,
"Crash recovery: re-upserted stale WAL-touched vectors into HNSW"
);
}
Ok(reindexed)
}