velesdb-core 3.0.1

High-performance vector database engine written in Rust
Documentation
//! Secondary index types for metadata payload fields.

#[cfg(test)]
mod bitmap_tests;
#[cfg(test)]
mod ordered_ids_tests;

use parking_lot::RwLock;
use serde_json::Number;
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::ops::Bound;

/// Orderable JSON primitive value used as a key in secondary indexes.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub(crate) enum JsonValue {
    /// String JSON value.
    String(String),
    /// Numeric JSON value (normalized to f64 bits).
    Number(F64Key),
    /// Boolean JSON value.
    Bool(bool),
}

/// Wrapper type that provides total ordering for f64 values.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct F64Key(u64);

impl From<f64> for F64Key {
    fn from(value: f64) -> Self {
        Self(value.to_bits())
    }
}

impl PartialOrd for F64Key {
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        Some(self.cmp(other))
    }
}

impl Ord for F64Key {
    fn cmp(&self, other: &Self) -> Ordering {
        f64::from_bits(self.0).total_cmp(&f64::from_bits(other.0))
    }
}

impl PartialOrd for JsonValue {
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        Some(self.cmp(other))
    }
}

impl Ord for JsonValue {
    fn cmp(&self, other: &Self) -> Ordering {
        match (self, other) {
            (Self::Bool(a), Self::Bool(b)) => a.cmp(b),
            (Self::Number(a), Self::Number(b)) => a.cmp(b),
            (Self::String(a), Self::String(b)) => a.cmp(b),
            (Self::Bool(_), _) | (Self::Number(_), Self::String(_)) => Ordering::Less,
            (Self::Number(_), Self::Bool(_)) | (Self::String(_), _) => Ordering::Greater,
        }
    }
}

impl JsonValue {
    /// Converts JSON payload primitive to an orderable key.
    #[must_use]
    pub fn from_json(value: &serde_json::Value) -> Option<Self> {
        match value {
            serde_json::Value::String(s) => Some(Self::String(s.clone())),
            serde_json::Value::Number(n) => Self::number_from_json(n),
            serde_json::Value::Bool(b) => Some(Self::Bool(*b)),
            _ => None,
        }
    }

    /// Converts `VelesQL` AST value into an index key.
    #[must_use]
    pub fn from_ast_value(value: &crate::velesql::Value) -> Option<Self> {
        match value {
            crate::velesql::Value::String(s) => Some(Self::String(s.clone())),
            #[allow(clippy::cast_precision_loss)]
            // Reason: index keys normalize all numerics to f64 for ordering.
            crate::velesql::Value::Integer(i) => Some(Self::Number(F64Key::from(*i as f64))),
            #[allow(clippy::cast_precision_loss)]
            // Reason: index keys normalize all numerics to f64 for ordering.
            crate::velesql::Value::UnsignedInteger(u) => {
                Some(Self::Number(F64Key::from(*u as f64)))
            }
            crate::velesql::Value::Float(f) => Some(Self::Number(F64Key::from(*f))),
            crate::velesql::Value::Boolean(b) => Some(Self::Bool(*b)),
            _ => None,
        }
    }

    fn number_from_json(number: &Number) -> Option<Self> {
        number.as_f64().map(|v| Self::Number(F64Key::from(v)))
    }
}

/// Secondary index implementation.
#[derive(Debug)]
pub(crate) enum SecondaryIndex {
    /// B-tree index mapping JSON primitive values to point IDs.
    BTree(RwLock<BTreeMap<JsonValue, Vec<u64>>>),
}

impl SecondaryIndex {
    /// Returns a [`RoaringBitmap`] of all point IDs matching the given value.
    ///
    /// The bitmap is built on-the-fly from the B-tree leaf. Returns
    /// `Some(empty)` when the value has no entries (a valid "no matches"
    /// pre-filter). Returns `None` when any matching ID exceeds [`u32::MAX`] and
    /// therefore cannot be represented in the bitmap — signalling an
    /// **incomplete** result so callers fall back to a full scan rather than
    /// silently dropping the high ID (correctness over the optimization).
    #[must_use]
    pub fn to_bitmap(&self, value: &JsonValue) -> Option<roaring::RoaringBitmap> {
        match self {
            Self::BTree(tree) => {
                let guard = tree.read();
                match guard.get(value) {
                    Some(ids) => ids_to_bitmap(ids),
                    None => Some(roaring::RoaringBitmap::new()),
                }
            }
        }
    }

    /// Returns a [`RoaringBitmap`] of all point IDs whose key falls within
    /// the given range bounds.
    ///
    /// Uses `BTreeMap::range()` for efficient ordered iteration. This powers
    /// Gt, Gte, Lt, Lte, and BETWEEN pre-filters. Returns `Some(empty)` when no
    /// keys fall within the range, and `None` when any in-range ID exceeds
    /// [`u32::MAX`] (incomplete — callers must fall back to a full scan).
    #[must_use]
    pub fn range_bitmap(
        &self,
        from: Bound<&JsonValue>,
        to: Bound<&JsonValue>,
    ) -> Option<roaring::RoaringBitmap> {
        match self {
            Self::BTree(tree) => {
                let guard = tree.read();
                let mut bm = roaring::RoaringBitmap::new();
                for ids in guard.range((from, to)).map(|(_, v)| v) {
                    for &id in ids {
                        bm.insert(u32::try_from(id).ok()?);
                    }
                }
                Some(bm)
            }
        }
    }

    /// Returns up to `limit` point IDs in this index's key order — ascending,
    /// or descending when `descending` is true — with IDs inside an equal-key
    /// bucket emitted in ascending ID order for determinism.
    ///
    /// This is the ordered-iteration primitive for index-backed
    /// `ORDER BY <field> LIMIT k` top-k (the B001 perf follow-up): a
    /// `BTreeMap` walk costs `O(log n + k)` instead of the `O(n log n)`
    /// full-scan sort. It only sees rows that carry the field — rows missing
    /// the field (which a full `ORDER BY` sorts first for ASC / last for DESC)
    /// are absent, so callers MUST restrict use to fully-covered fields and
    /// fall back to the full scan otherwise.
    // Consumed by the `ORDER BY <field> LIMIT k` pushdown (routing at
    // `Collection::try_ordered_index_scan`, gated by
    // `order_by_requires_exhaustive_fetch`) — phase 2 of the ordered-index
    // EPIC, see docs/planning/CORE_PARITY_REMEDIATION.md.
    #[cfg(test)]
    #[must_use]
    pub fn ordered_ids(&self, descending: bool, limit: usize) -> Vec<u64> {
        let Self::BTree(tree) = self;
        walk_ordered(&tree.read(), descending, limit)
    }

    /// Returns the top `limit` ordered IDs **only when the index fully covers**
    /// `point_count` rows — i.e. every point carries the indexed field
    /// (`Σ bucket lengths == point_count`). Returns `None` otherwise.
    ///
    /// The coverage check and the ordered walk share **one** read lock so a
    /// concurrent upsert cannot slip a row in between the two (TOCTOU). A
    /// partially-covered index omits rows missing the field (which a full
    /// `ORDER BY` places first for ASC / last for DESC), so the caller must fall
    /// back to the exhaustive scan — that's what `None` signals.
    #[must_use]
    pub fn ordered_ids_if_covered(
        &self,
        descending: bool,
        limit: usize,
        point_count: usize,
    ) -> Option<Vec<u64>> {
        let Self::BTree(tree) = self;
        let guard = tree.read();
        let covered: usize = guard.values().map(Vec::len).sum();
        if covered != point_count {
            return None;
        }
        Some(walk_ordered(&guard, descending, limit))
    }

    /// Returns the point IDs of the leading lead-key buckets (in `descending`
    /// key order) whose cumulative size first reaches `min_rows`, **only when
    /// the index fully covers** `point_count`. Whole buckets are absorbed (never
    /// cut mid-bucket), IDs ascending within a bucket; all IDs are returned when
    /// the whole index holds fewer than `min_rows`. `None` when coverage is
    /// incomplete.
    ///
    /// Backs multi-column `ORDER BY <lead_field>, ...` (EPIC-081 phase 3c): the
    /// lead key dominates the total order, so after a full multi-key sort the top
    /// `min_rows` rows lie entirely within these leading buckets — the caller
    /// hydrates them, applies the exhaustive multi-key sort, then OFFSET/LIMIT.
    /// The coverage check and the walk share one read lock (TOCTOU-safe).
    #[must_use]
    pub fn ordered_prefix_if_covered(
        &self,
        descending: bool,
        min_rows: usize,
        point_count: usize,
    ) -> Option<Vec<u64>> {
        let Self::BTree(tree) = self;
        let guard = tree.read();
        let covered: usize = guard.values().map(Vec::len).sum();
        if covered != point_count {
            return None;
        }
        Some(walk_whole_buckets(&guard, descending, min_rows))
    }
}

/// Walks the B-tree in key order (ascending, or descending when `descending`)
/// collecting up to `limit` point IDs, ascending by ID within each equal-key
/// bucket. The caller holds the read guard so the coverage check and the walk
/// stay atomic.
fn walk_ordered(tree: &BTreeMap<JsonValue, Vec<u64>>, descending: bool, limit: usize) -> Vec<u64> {
    let mut out: Vec<u64> = Vec::new();
    let mut collect = |buckets: &mut dyn Iterator<Item = &Vec<u64>>| {
        for ids in buckets {
            if out.len() >= limit {
                break;
            }
            push_bucket_capped(ids, limit, &mut out);
        }
    };
    if descending {
        collect(&mut tree.values().rev());
    } else {
        collect(&mut tree.values());
    }
    out
}

/// Walks the B-tree in key order (ascending, or descending when `descending`)
/// absorbing WHOLE buckets (IDs ascending within) until the accumulated count
/// first reaches `min_rows`. Check-then-absorb, so `min_rows == 0` touches no
/// bucket and the result holds complete leading buckets — never a partial one,
/// which a multi-key secondary sort requires.
fn walk_whole_buckets(
    tree: &BTreeMap<JsonValue, Vec<u64>>,
    descending: bool,
    min_rows: usize,
) -> Vec<u64> {
    let mut out: Vec<u64> = Vec::new();
    let mut absorb = |buckets: &mut dyn Iterator<Item = &Vec<u64>>| {
        for ids in buckets {
            if out.len() >= min_rows {
                break;
            }
            let mut bucket = ids.clone();
            bucket.sort_unstable();
            out.extend(bucket);
        }
    };
    if descending {
        absorb(&mut tree.values().rev());
    } else {
        absorb(&mut tree.values());
    }
    out
}

/// Appends `ids` (sorted ascending for determinism) to `out`, stopping once
/// `out` reaches `limit`.
fn push_bucket_capped(ids: &[u64], limit: usize, out: &mut Vec<u64>) {
    let mut bucket = ids.to_vec();
    bucket.sort_unstable();
    for id in bucket {
        if out.len() >= limit {
            return;
        }
        out.push(id);
    }
}

/// Converts a slice of `u64` point IDs into a [`RoaringBitmap`].
///
/// `RoaringBitmap` stores `u32` values. Returns `None` if any ID exceeds
/// [`u32::MAX`]: the bitmap would silently omit that ID, so callers that fetch
/// only the bitmap's IDs (e.g. the JOIN pre-filter) would drop a real match.
/// Signalling `None` forces those callers to fall back to a full scan.
fn ids_to_bitmap(ids: &[u64]) -> Option<roaring::RoaringBitmap> {
    let mut bm = roaring::RoaringBitmap::new();
    for &id in ids {
        bm.insert(u32::try_from(id).ok()?);
    }
    Some(bm)
}