use crate::index::HnswIndex;
use crate::storage::{MmapStorage, PayloadStorage, VectorStorage};
use parking_lot::RwLock;
use std::sync::Arc;
pub(crate) fn recover_hnsw_gap(
vector_storage: &Arc<RwLock<MmapStorage>>,
index: &Arc<HnswIndex>,
dimension: usize,
) -> crate::error::Result<usize> {
let storage = vector_storage.read();
let storage_count = storage.len();
let hnsw_count = index.len();
if storage_count == 0 || storage_count == hnsw_count {
return Ok(0);
}
let gap_ids = find_gap_ids(&storage, index);
if gap_ids.is_empty() {
return Ok(0);
}
let vectors = retrieve_valid_vectors(&storage, &gap_ids, dimension)?;
let gap_total = gap_ids.len();
drop(storage);
let recovered = reindex_vectors(index, &vectors);
tracing::warn!(
recovered,
gap_total,
"Crash recovery: re-indexed gap vectors into HNSW"
);
Ok(recovered)
}
fn find_gap_ids(storage: &MmapStorage, index: &HnswIndex) -> Vec<u64> {
storage
.ids()
.into_iter()
.filter(|id| !index.mappings.contains(*id))
.collect()
}
fn retrieve_valid_vectors(
storage: &MmapStorage,
gap_ids: &[u64],
dimension: usize,
) -> crate::error::Result<Vec<(u64, Vec<f32>)>> {
let mut vectors = Vec::with_capacity(gap_ids.len());
for &id in gap_ids {
match storage.retrieve(id) {
Ok(Some(v)) if v.len() == dimension => vectors.push((id, v)),
Ok(Some(v)) => tracing::warn!(
id,
expected = dimension,
actual = v.len(),
"Skipping gap vector with mismatched dimension"
),
Ok(None) => {} Err(e) => {
return Err(crate::error::Error::Storage(format!(
"failed to retrieve gap vector {id}: {e}"
)))
}
}
}
Ok(vectors)
}
fn reindex_vectors(index: &HnswIndex, vectors: &[(u64, Vec<f32>)]) -> usize {
if vectors.is_empty() {
return 0;
}
let refs: Vec<(u64, &[f32])> = vectors.iter().map(|(id, v)| (*id, v.as_slice())).collect();
index.insert_batch_parallel(refs)
}
use crate::collection::types::CollectionConfig;
use crate::error::{Error, Result};
use crate::storage::LogPayloadStorage;
pub(super) fn load_config(path: &std::path::Path) -> Result<CollectionConfig> {
let config_path = path.join("config.json");
let config_data = std::fs::read_to_string(&config_path)?;
let config: CollectionConfig =
serde_json::from_str(&config_data).map_err(|e| Error::Serialization(e.to_string()))?;
validate_schema_version(&config)?;
Ok(config)
}
fn validate_schema_version(config: &CollectionConfig) -> Result<()> {
use crate::collection::types::CURRENT_SCHEMA_VERSION;
let version = if config.schema_version == 0 {
1
} else {
config.schema_version
};
if version > CURRENT_SCHEMA_VERSION {
return Err(Error::IncompatibleSchemaVersion {
found: version,
supported: CURRENT_SCHEMA_VERSION,
});
}
Ok(())
}
pub(super) fn reconcile_point_count(
config: &CollectionConfig,
vector_storage: &Arc<RwLock<MmapStorage>>,
payload_storage: &Arc<RwLock<LogPayloadStorage>>,
) -> usize {
if config.metadata_only {
payload_storage.read().ids().len()
} else {
vector_storage.read().len()
}
}
#[cfg(feature = "persistence")]
pub(super) fn run_crash_recovery(
config: &CollectionConfig,
vector_storage: &Arc<RwLock<MmapStorage>>,
index: &Arc<HnswIndex>,
) -> Result<()> {
if config.metadata_only || config.dimension == 0 {
return Ok(());
}
let recovered = recover_hnsw_gap(vector_storage, index, config.dimension)?;
if recovered > 0 {
tracing::info!(
collection = %config.name,
recovered,
"Collection gap recovery completed on open"
);
}
Ok(())
}
#[cfg(not(feature = "persistence"))]
pub(super) fn run_crash_recovery(
_config: &CollectionConfig,
_vector_storage: &Arc<RwLock<MmapStorage>>,
_index: &Arc<HnswIndex>,
) -> Result<()> {
Ok(())
}