velesdb-core 1.13.2

High-performance vector database engine written in Rust
Documentation
//! Internal helpers for CRUD operations: quantization caching, secondary index
//! updates, `DedupMap`, and `QuantizationGuards`.

use crate::collection::types::Collection;
use crate::index::{JsonValue, SecondaryIndex};
use crate::point::Point;
use crate::quantization::{
    BinaryQuantizedVector, PQVector, ProductQuantizer, QuantizedVector, StorageMode,
};
use parking_lot::RwLockWriteGuard;
#[cfg(feature = "persistence")]
use rayon::prelude::*;
use std::collections::HashMap;

const PQ_TRAINING_SAMPLES: usize = 128;

/// Pre-computed last-writer-wins dedup map: `point_id -> index_of_last_occurrence`.
///
/// Built once in `batch_store_all` and shared by both `write_deduped_payloads`
/// and `write_deduped_vectors` to avoid redundant map construction (Issue #425).
pub(super) type DedupMap = HashMap<u64, usize>;

/// Write-lock guards for quantization caches, acquired once per batch.
pub(super) struct QuantizationGuards<'a> {
    pub(super) sq8: Option<RwLockWriteGuard<'a, HashMap<u64, QuantizedVector>>>,
    pub(super) binary: Option<RwLockWriteGuard<'a, HashMap<u64, BinaryQuantizedVector>>>,
    pub(super) pq: Option<RwLockWriteGuard<'a, HashMap<u64, PQVector>>>,
}

impl<'a> QuantizationGuards<'a> {
    /// Acquires all quantization cache guards matching `mode`.
    pub(super) fn acquire(collection: &'a Collection, mode: StorageMode) -> Self {
        Self {
            sq8: matches!(mode, StorageMode::SQ8).then(|| collection.sq8_cache.write()),
            binary: matches!(mode, StorageMode::Binary).then(|| collection.binary_cache.write()),
            pq: matches!(mode, StorageMode::ProductQuantization)
                .then(|| collection.pq_cache.write()),
        }
    }

    /// Acquires only the PQ cache guard (for when SQ8/Binary were handled in parallel).
    ///
    /// Issue #486: After parallel quantization for SQ8/Binary, only PQ mode
    /// still needs a guard for sequential processing.
    pub(super) fn acquire_pq_only(collection: &'a Collection, mode: StorageMode) -> Self {
        Self {
            sq8: None,
            binary: None,
            pq: matches!(mode, StorageMode::ProductQuantization)
                .then(|| collection.pq_cache.write()),
        }
    }
}

fn auto_num_subspaces(dimension: usize) -> usize {
    let mut num_subspaces = 8usize;
    while num_subspaces > 1 && dimension % num_subspaces != 0 {
        num_subspaces /= 2;
    }
    num_subspaces.max(1)
}

impl Collection {
    /// Caches a quantized representation of `point`'s vector according to `storage_mode`.
    pub(crate) fn cache_quantized_vector(
        &self,
        point: &Point,
        storage_mode: StorageMode,
        sq8_cache: Option<&mut std::collections::HashMap<u64, QuantizedVector>>,
        binary_cache: Option<&mut std::collections::HashMap<u64, BinaryQuantizedVector>>,
        pq_cache: Option<&mut std::collections::HashMap<u64, PQVector>>,
    ) {
        match storage_mode {
            StorageMode::SQ8 => {
                if let Some(cache) = sq8_cache {
                    let quantized = QuantizedVector::from_f32(&point.vector);
                    cache.insert(point.id, quantized);
                }
            }
            StorageMode::Binary => {
                if let Some(cache) = binary_cache {
                    let quantized = BinaryQuantizedVector::from_f32(&point.vector);
                    cache.insert(point.id, quantized);
                }
            }
            StorageMode::ProductQuantization => {
                self.cache_pq_vector(point, pq_cache);
            }
            StorageMode::Full | StorageMode::RaBitQ => {}
        }
    }

    /// Handles the Product Quantization arm of `cache_quantized_vector`.
    ///
    /// Trains the quantizer on first `PQ_TRAINING_SAMPLES` points, then
    /// backfills and quantizes subsequent points.
    fn cache_pq_vector(
        &self,
        point: &Point,
        pq_cache: Option<&mut std::collections::HashMap<u64, PQVector>>,
    ) {
        let mut quantizer_guard = self.pq_quantizer.write();
        let mut backfill_samples: Vec<(u64, Vec<f32>)> = Vec::new();

        if quantizer_guard.is_none() {
            let mut buffer = self.pq_training_buffer.write();
            buffer.push_back((point.id, point.vector.clone()));
            if buffer.len() >= PQ_TRAINING_SAMPLES {
                let training: Vec<Vec<f32>> =
                    buffer.iter().map(|(_, vector)| vector.clone()).collect();
                let num_centroids = 256usize.min(training.len().max(2));
                *quantizer_guard = ProductQuantizer::train(
                    &training,
                    auto_num_subspaces(point.vector.len()),
                    num_centroids,
                )
                .ok();
                backfill_samples = buffer.drain(..).collect();
            }
        }

        if let (Some(cache), Some(quantizer)) = (pq_cache, quantizer_guard.as_ref()) {
            for (id, vector) in backfill_samples {
                if let Ok(code) = quantizer.quantize(&vector) {
                    cache.insert(id, code);
                }
            }

            if let Ok(code) = quantizer.quantize(&point.vector) {
                cache.insert(point.id, code);
            }
        }
    }

    /// Updates all secondary indexes after an upsert (removes old values, inserts new ones).
    pub(crate) fn update_secondary_indexes_on_upsert(
        &self,
        id: u64,
        old_payload: Option<&serde_json::Value>,
        new_payload: Option<&serde_json::Value>,
    ) {
        let indexes = self.secondary_indexes.read();
        for (field, index) in indexes.iter() {
            if let Some(old_value) = old_payload
                .and_then(|p| p.get(field))
                .and_then(JsonValue::from_json)
            {
                self.remove_from_secondary_index(index, &old_value, id);
            }
            if let Some(new_value) = new_payload
                .and_then(|p| p.get(field))
                .and_then(JsonValue::from_json)
            {
                self.insert_into_secondary_index(index, new_value, id);
            }
        }
    }

    /// Removes entries from all secondary indexes for a deleted point.
    pub(crate) fn update_secondary_indexes_on_delete(
        &self,
        id: u64,
        old_payload: Option<&serde_json::Value>,
    ) {
        let Some(payload) = old_payload else {
            return;
        };
        let indexes = self.secondary_indexes.read();
        for (field, index) in indexes.iter() {
            if let Some(old_value) = payload.get(field).and_then(JsonValue::from_json) {
                self.remove_from_secondary_index(index, &old_value, id);
            }
        }
    }

    // These methods take `&self` for consistency with the impl block calling convention,
    // but the operations are logically index-directed and do not need instance state.
    #[allow(clippy::unused_self)]
    pub(crate) fn insert_into_secondary_index(
        &self,
        index: &SecondaryIndex,
        key: JsonValue,
        id: u64,
    ) {
        match index {
            SecondaryIndex::BTree(tree) => {
                let mut tree = tree.write();
                let ids = tree.entry(key).or_default();
                if !ids.contains(&id) {
                    ids.push(id);
                }
            }
        }
    }

    #[allow(clippy::unused_self)]
    fn remove_from_secondary_index(&self, index: &SecondaryIndex, key: &JsonValue, id: u64) {
        match index {
            SecondaryIndex::BTree(tree) => {
                let mut tree = tree.write();
                if let Some(ids) = tree.get_mut(key) {
                    ids.retain(|existing| *existing != id);
                    if ids.is_empty() {
                        tree.remove(key);
                    }
                }
            }
        }
    }

    /// Batch-quantizes SQ8 vectors in parallel and inserts into cache.
    ///
    /// Issue #486: `QuantizedVector::from_f32()` is a pure function (no shared
    /// state), so the computation can run in parallel via rayon. Only the final
    /// cache insertion requires the write lock, held briefly for a batch insert.
    ///
    /// Uses last-writer-wins dedup: for intra-batch duplicate IDs, only the
    /// last occurrence's quantized vector is cached (matching the WAL dedup
    /// semantics in `write_deduped_vectors`).
    #[cfg(feature = "persistence")]
    pub(super) fn batch_quantize_sq8_parallel(&self, points: &[Point]) {
        let quantized: Vec<(u64, QuantizedVector)> = points
            .par_iter()
            .map(|p| (p.id, QuantizedVector::from_f32(&p.vector)))
            .collect();
        let mut cache = self.sq8_cache.write();
        for (id, qv) in quantized {
            cache.insert(id, qv);
        }
    }

    /// Batch-quantizes Binary vectors in parallel and inserts into cache.
    ///
    /// Issue #486: Same parallel strategy as SQ8 — `BinaryQuantizedVector::from_f32()`
    /// is a pure function, parallelized via rayon.
    #[cfg(feature = "persistence")]
    pub(super) fn batch_quantize_binary_parallel(&self, points: &[Point]) {
        let quantized: Vec<(u64, BinaryQuantizedVector)> = points
            .par_iter()
            .map(|p| (p.id, BinaryQuantizedVector::from_f32(&p.vector)))
            .collect();
        let mut cache = self.binary_cache.write();
        for (id, bqv) in quantized {
            cache.insert(id, bqv);
        }
    }

    /// Replaces persisted histograms for a batch under last-writer-wins dedup.
    ///
    /// Builds the dedup map (`point_id -> index_of_last_occurrence`), keeps
    /// only the payload of the last occurrence for each id (zeroing the rest),
    /// and feeds the decrement/increment pair to `update_histograms_replace`
    /// in a single atomic read → modify → write cycle.
    ///
    /// Used by all upsert paths (`upsert`, `upsert_metadata`, `upsert_bulk`
    /// V2 and standard) to ensure:
    /// - Bug #47 — dedup by `point.id` so only the final payload counts;
    /// - Bug #49 — one histogram cycle instead of two (decrement then
    ///   increment happen together under `stats_io_mutex`).
    ///
    /// Issue #450 Phase 3.1: factored out of 4 identical call sites to shrink
    /// the duplicated surface in `collection/core/`.
    pub(super) fn apply_histogram_replace_dedup(
        &self,
        points: &[Point],
        old_payloads: &[Option<serde_json::Value>],
    ) {
        let dedup = Self::build_dedup_map(points);
        let new_payloads: Vec<Option<serde_json::Value>> = points
            .iter()
            .enumerate()
            .map(|(i, p)| {
                if dedup.get(&p.id) == Some(&i) {
                    p.payload.clone()
                } else {
                    None
                }
            })
            .collect();
        self.update_histograms_replace(old_payloads, &new_payloads);
    }
}