use crate::collection::types::Collection;
use crate::error::{Error, Result};
use crate::index::VectorIndex;
use crate::point::Point;
use crate::quantization::{BinaryQuantizedVector, PQVector, QuantizedVector, StorageMode};
use crate::storage::{LogPayloadStorage, PayloadStorage, VectorStorage};
use crate::validation::validate_dimension_match;
use parking_lot::RwLockWriteGuard;
use std::collections::{BTreeMap, HashMap};
struct QuantizationGuards<'a> {
sq8: Option<RwLockWriteGuard<'a, HashMap<u64, QuantizedVector>>>,
binary: Option<RwLockWriteGuard<'a, HashMap<u64, BinaryQuantizedVector>>>,
pq: Option<RwLockWriteGuard<'a, HashMap<u64, PQVector>>>,
}
impl<'a> QuantizationGuards<'a> {
fn acquire(collection: &'a Collection, mode: StorageMode) -> Self {
Self {
sq8: matches!(mode, StorageMode::SQ8).then(|| collection.sq8_cache.write()),
binary: matches!(mode, StorageMode::Binary).then(|| collection.binary_cache.write()),
pq: matches!(mode, StorageMode::ProductQuantization)
.then(|| collection.pq_cache.write()),
}
}
}
impl Collection {
pub fn upsert(&self, points: impl IntoIterator<Item = Point>) -> Result<()> {
let points: Vec<Point> = points.into_iter().collect();
let config = self.config.read();
let dimension = config.dimension;
let storage_mode = config.storage_mode;
if config.metadata_only {
for point in &points {
if !point.vector.is_empty() {
return Err(Error::VectorNotAllowed(config.name.clone()));
}
}
drop(config);
return self.upsert_metadata(points);
}
drop(config);
for point in &points {
validate_dimension_match(dimension, point.dimension())?;
}
let sparse_batch = self.upsert_storage_and_index(&points, storage_mode)?;
self.apply_sparse_batch_upsert(&sparse_batch)?;
self.invalidate_caches_and_bump_generation();
Ok(())
}
fn upsert_storage_and_index(
&self,
points: &[Point],
storage_mode: StorageMode,
) -> Result<Vec<(u64, BTreeMap<String, crate::index::sparse::SparseVector>)>> {
let mut vector_storage = self.vector_storage.write();
let mut payload_storage = self.payload_storage.write();
let mut quant_guards = QuantizationGuards::acquire(self, storage_mode);
let mut sparse_batch = Vec::new();
for point in points {
let old_payload = payload_storage.retrieve(point.id).ok().flatten();
vector_storage.store(point.id, &point.vector)?;
let (sq8, binary, pq) = (
quant_guards.sq8.as_deref_mut(),
quant_guards.binary.as_deref_mut(),
quant_guards.pq.as_deref_mut(),
);
self.cache_quantized_vector(point, storage_mode, sq8, binary, pq);
Self::store_or_delete_payload(&mut payload_storage, point)?;
self.update_secondary_indexes_on_upsert(
point.id,
old_payload.as_ref(),
point.payload.as_ref(),
);
self.index.insert(point.id, &point.vector);
Self::update_text_index(&self.text_index, point);
Self::collect_sparse_vectors(point, &mut sparse_batch);
}
let point_count = vector_storage.len();
vector_storage.flush()?;
payload_storage.flush()?;
drop(vector_storage);
drop(payload_storage);
self.config.write().point_count = point_count;
Ok(sparse_batch)
}
fn store_or_delete_payload(
payload_storage: &mut LogPayloadStorage,
point: &Point,
) -> Result<()> {
if let Some(payload) = &point.payload {
payload_storage.store(point.id, payload)?;
} else {
let _ = payload_storage.delete(point.id);
}
Ok(())
}
fn collect_sparse_vectors(
point: &Point,
sparse_batch: &mut Vec<(u64, BTreeMap<String, crate::index::sparse::SparseVector>)>,
) {
if let Some(sv_map) = &point.sparse_vectors {
if !sv_map.is_empty() {
sparse_batch.push((point.id, sv_map.clone()));
}
}
}
fn update_text_index(text_index: &crate::index::Bm25Index, point: &Point) {
if let Some(payload) = &point.payload {
let text = Self::extract_text_from_payload(payload);
if !text.is_empty() {
text_index.add_document(point.id, &text);
}
} else {
text_index.remove_document(point.id);
}
}
fn apply_sparse_batch_upsert(
&self,
sparse_batch: &[(u64, BTreeMap<String, crate::index::sparse::SparseVector>)],
) -> Result<()> {
if sparse_batch.is_empty() {
return Ok(());
}
#[cfg(feature = "persistence")]
{
for (point_id, sv_map) in sparse_batch {
for (name, sv) in sv_map {
let wal_path =
crate::index::sparse::persistence::wal_path_for_name(&self.path, name);
crate::index::sparse::persistence::wal_append_upsert(&wal_path, *point_id, sv)?;
}
}
}
let mut indexes = self.sparse_indexes.write();
for (point_id, sv_map) in sparse_batch {
for (name, sv) in sv_map {
let idx = indexes.entry(name.clone()).or_default();
idx.insert(*point_id, sv);
}
}
Ok(())
}
fn invalidate_caches_and_bump_generation(&self) {
*self.cached_stats.lock() = None;
self.write_generation
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
pub fn upsert_metadata(&self, points: impl IntoIterator<Item = Point>) -> Result<()> {
let points: Vec<Point> = points.into_iter().collect();
let mut payload_storage = self.payload_storage.write();
for point in &points {
let old_payload = payload_storage.retrieve(point.id).ok().flatten();
Self::store_or_delete_payload(&mut payload_storage, point)?;
Self::update_text_index(&self.text_index, point);
self.update_secondary_indexes_on_upsert(
point.id,
old_payload.as_ref(),
point.payload.as_ref(),
);
}
let point_count = payload_storage.ids().len();
payload_storage.flush()?;
drop(payload_storage);
self.config.write().point_count = point_count;
self.invalidate_caches_and_bump_generation();
Ok(())
}
pub fn upsert_bulk(&self, points: &[Point]) -> Result<usize> {
if points.is_empty() {
return Ok(0);
}
let dimension = self.config.read().dimension;
for point in points {
validate_dimension_match(dimension, point.dimension())?;
}
let vectors_for_hnsw: Vec<(u64, Vec<f32>)> =
points.iter().map(|p| (p.id, p.vector.clone())).collect();
let sparse_batch = Self::collect_sparse_batch(points);
self.bulk_store_vectors(&vectors_for_hnsw)?;
self.bulk_store_payloads(points)?;
let inserted = self.index.insert_batch_parallel(vectors_for_hnsw);
self.index.set_searching_mode();
self.config.write().point_count = self.vector_storage.read().len();
self.apply_sparse_batch_bulk(&sparse_batch)?;
self.invalidate_caches_and_bump_generation();
Ok(inserted)
}
fn collect_sparse_batch(
points: &[Point],
) -> BTreeMap<String, Vec<(u64, crate::index::sparse::SparseVector)>> {
let mut batch: BTreeMap<String, Vec<(u64, crate::index::sparse::SparseVector)>> =
BTreeMap::new();
for point in points {
if let Some(sv_map) = &point.sparse_vectors {
for (name, sv) in sv_map {
batch
.entry(name.clone())
.or_default()
.push((point.id, sv.clone()));
}
}
}
batch
}
fn bulk_store_vectors(&self, vectors: &[(u64, Vec<f32>)]) -> Result<()> {
let refs: Vec<(u64, &[f32])> = vectors.iter().map(|(id, v)| (*id, v.as_slice())).collect();
let mut storage = self.vector_storage.write();
storage.store_batch(&refs)?;
storage.flush()?;
Ok(())
}
fn bulk_store_payloads(&self, points: &[Point]) -> Result<()> {
let mut storage = self.payload_storage.write();
for point in points {
if let Some(payload) = &point.payload {
storage.store(point.id, payload)?;
let text = Self::extract_text_from_payload(payload);
if !text.is_empty() {
self.text_index.add_document(point.id, &text);
}
}
}
storage.flush()?;
Ok(())
}
fn apply_sparse_batch_bulk(
&self,
sparse_batch: &BTreeMap<String, Vec<(u64, crate::index::sparse::SparseVector)>>,
) -> Result<()> {
if sparse_batch.is_empty() {
return Ok(());
}
#[cfg(feature = "persistence")]
{
for (name, docs) in sparse_batch {
let wal_path =
crate::index::sparse::persistence::wal_path_for_name(&self.path, name);
for (point_id, sv) in docs {
crate::index::sparse::persistence::wal_append_upsert(&wal_path, *point_id, sv)?;
}
}
}
let mut indexes = self.sparse_indexes.write();
for (name, docs) in sparse_batch {
let idx = indexes.entry(name.clone()).or_default();
idx.insert_batch_chunk(docs);
}
Ok(())
}
#[must_use]
pub fn get(&self, ids: &[u64]) -> Vec<Option<Point>> {
let config = self.config.read();
let is_metadata_only = config.metadata_only;
drop(config);
let payload_storage = self.payload_storage.read();
if is_metadata_only {
ids.iter()
.map(|&id| {
let payload = payload_storage.retrieve(id).ok().flatten()?;
Some(Point {
id,
vector: Vec::new(),
payload: Some(payload),
sparse_vectors: None,
})
})
.collect()
} else {
let vector_storage = self.vector_storage.read();
ids.iter()
.map(|&id| {
let vector = vector_storage.retrieve(id).ok().flatten()?;
let payload = payload_storage.retrieve(id).ok().flatten();
Some(Point {
id,
vector,
payload,
sparse_vectors: None,
})
})
.collect()
}
}
pub fn delete(&self, ids: &[u64]) -> Result<()> {
if self.config.read().metadata_only {
self.delete_metadata_only(ids)?;
} else {
self.delete_vector_points(ids)?;
}
self.invalidate_caches_and_bump_generation();
Ok(())
}
fn delete_metadata_only(&self, ids: &[u64]) -> Result<()> {
let mut payload_storage = self.payload_storage.write();
for &id in ids {
let old_payload = payload_storage.retrieve(id).ok().flatten();
payload_storage.delete(id)?;
self.text_index.remove_document(id);
self.update_secondary_indexes_on_delete(id, old_payload.as_ref());
}
let point_count = payload_storage.ids().len();
drop(payload_storage);
self.config.write().point_count = point_count;
Ok(())
}
fn delete_vector_points(&self, ids: &[u64]) -> Result<()> {
let mut payload_storage = self.payload_storage.write();
let mut vector_storage = self.vector_storage.write();
let mut sq8_cache = self.sq8_cache.write();
let mut binary_cache = self.binary_cache.write();
let mut pq_cache = self.pq_cache.write();
for &id in ids {
let old_payload = payload_storage.retrieve(id).ok().flatten();
vector_storage.delete(id)?;
payload_storage.delete(id)?;
self.index.remove(id);
sq8_cache.remove(&id);
binary_cache.remove(&id);
pq_cache.remove(&id);
self.text_index.remove_document(id);
self.update_secondary_indexes_on_delete(id, old_payload.as_ref());
}
let point_count = vector_storage.len();
drop(vector_storage);
drop(payload_storage);
drop(sq8_cache);
drop(binary_cache);
drop(pq_cache);
self.config.write().point_count = point_count;
self.delete_from_sparse_indexes(ids)
}
fn delete_from_sparse_indexes(&self, ids: &[u64]) -> Result<()> {
#[cfg(feature = "persistence")]
{
let indexes = self.sparse_indexes.read();
for (name, _) in indexes.iter() {
let wal_path =
crate::index::sparse::persistence::wal_path_for_name(&self.path, name);
for &id in ids {
crate::index::sparse::persistence::wal_append_delete(&wal_path, id)?;
}
}
}
let indexes = self.sparse_indexes.read();
for idx in indexes.values() {
for &id in ids {
idx.delete(id);
}
}
Ok(())
}
#[must_use]
pub fn len(&self) -> usize {
self.config.read().point_count
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.config.read().point_count == 0
}
#[must_use]
pub fn all_ids(&self) -> Vec<u64> {
self.payload_storage.read().ids()
}
}