mindb 0.1.2

Lightweight embedded key–value store with write-ahead log and zstd compression.
Documentation
//! High level query handlers that compose the different index layers.
#![allow(dead_code)]

use std::collections::HashSet;

use serde_json::Value;

use crate::index::{
    SequenceNumber, VersionPointer,
    bloom::BloomFilter,
    hot::HotIndex,
    secondary::{FullTextSidecar, JsonTrieSidecar, RoaringBitmapSidecar, SlugMap, TimeFenceIndex},
    sparse::SparseIndex,
};

/// MVCC snapshot descriptor passed to all query handlers.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct MvccSnapshot {
    pub sequence: SequenceNumber,
}

impl MvccSnapshot {
    /// Creates a snapshot pinned at the provided sequence.
    pub fn new(sequence: SequenceNumber) -> Self {
        Self { sequence }
    }

    /// Snapshot that can observe every committed version.
    pub fn open() -> Self {
        Self {
            sequence: SequenceNumber::MAX,
        }
    }
}

/// Shared view over the logical indexes.
#[derive(Clone)]
pub struct IndexRegistry {
    pub hot: HotIndex,
    pub slug: SlugMap,
    pub time_fence: TimeFenceIndex,
    pub roaring: RoaringBitmapSidecar,
    pub json_trie: JsonTrieSidecar,
    pub fts: FullTextSidecar,
}

impl IndexRegistry {
    pub fn new(
        hot: HotIndex,
        slug: SlugMap,
        time_fence: TimeFenceIndex,
        roaring: RoaringBitmapSidecar,
        json_trie: JsonTrieSidecar,
        fts: FullTextSidecar,
    ) -> Self {
        Self {
            hot,
            slug,
            time_fence,
            roaring,
            json_trie,
            fts,
        }
    }
}

/// Segment level accelerators exposed to the query layer.
pub struct SegmentAccelerators<'a> {
    pub sparse: Option<&'a SparseIndex>,
    pub bloom: Option<&'a BloomFilter>,
}

impl<'a> SegmentAccelerators<'a> {
    pub fn empty() -> Self {
        Self {
            sparse: None,
            bloom: None,
        }
    }
}

/// Composite filter request spanning bitmap tags and JSON predicates.
pub struct FilterRequest<'a> {
    pub bitmap_tags: &'a [String],
    pub json_equals: &'a [(&'a str, &'a Value)],
}

/// Primary-key lookup that first consults the hot index and optionally falls back
/// to segment-level accelerators.
pub fn get(
    registry: &IndexRegistry,
    accelerators: SegmentAccelerators<'_>,
    key: &[u8],
    snapshot: MvccSnapshot,
) -> Option<VersionPointer> {
    if let Some(pointer) = registry.hot.get_latest(key, snapshot.sequence) {
        return Some(pointer);
    }

    if let Some(filter) = accelerators.bloom {
        if !filter.may_contain(key) {
            return None;
        }
    }

    accelerators
        .sparse
        .and_then(|sparse| sparse.locate(key, snapshot.sequence))
}

/// Slug lookup that resolves the slug to a key and obtains the visible pointer.
pub fn get_by_slug(
    registry: &IndexRegistry,
    slug: &str,
    snapshot: MvccSnapshot,
) -> Option<(Vec<u8>, VersionPointer)> {
    if let Some((key, pointer)) = registry.slug.get_latest(slug, snapshot.sequence) {
        if let Some(latest) = registry.hot.get_latest(&key, snapshot.sequence) {
            return Some((key, latest));
        }
        return Some((key, pointer));
    }

    None
}

/// Returns the keys whose timestamp falls within the inclusive range.
pub fn scan_time(
    registry: &IndexRegistry,
    start: i64,
    end: i64,
    snapshot: MvccSnapshot,
) -> Vec<(Vec<u8>, VersionPointer)> {
    let mut results = registry
        .time_fence
        .scan_range(start, end, snapshot.sequence);

    for (key, pointer) in results.iter_mut() {
        if let Some(latest) = registry.hot.get_latest(key, snapshot.sequence) {
            *pointer = latest;
        }
    }

    results
        .into_iter()
        .filter(|(_, pointer)| pointer.is_visible_at(snapshot.sequence))
        .collect()
}

/// Applies bitmap and JSON filters to produce a set of matching keys.
pub fn filter(
    registry: &IndexRegistry,
    request: &FilterRequest<'_>,
    snapshot: MvccSnapshot,
) -> Vec<(Vec<u8>, VersionPointer)> {
    let mut candidates: Option<HashSet<Vec<u8>>> = None;

    if !request.bitmap_tags.is_empty() {
        let keys = registry
            .roaring
            .intersect_keys(request.bitmap_tags, snapshot.sequence);
        candidates = Some(keys.into_iter().collect());
    }

    for (path, value) in request.json_equals {
        let keys = registry
            .json_trie
            .query_equals(path, value, snapshot.sequence);
        let set: HashSet<Vec<u8>> = keys.into_iter().collect();
        candidates = match candidates {
            Some(existing) => Some(existing.intersection(&set).cloned().collect()),
            None => Some(set),
        };
        if let Some(current) = &candidates {
            if current.is_empty() {
                return Vec::new();
            }
        }
    }

    let keys = match candidates {
        Some(set) => set.into_iter().collect::<Vec<_>>(),
        None => Vec::new(),
    };

    keys.into_iter()
        .filter_map(|key| {
            registry
                .hot
                .get_latest(&key, snapshot.sequence)
                .filter(|pointer| pointer.is_visible_at(snapshot.sequence))
                .map(|pointer| (key, pointer))
        })
        .collect()
}

/// Performs a full-text search across the registered documents.
pub fn search(
    registry: &IndexRegistry,
    query: &str,
    snapshot: MvccSnapshot,
) -> Vec<(Vec<u8>, VersionPointer)> {
    let terms: Vec<String> = query
        .split_whitespace()
        .map(|term| term.to_lowercase())
        .collect();

    if terms.is_empty() {
        return Vec::new();
    }

    let keys = registry.fts.search(&terms, snapshot.sequence);

    keys.into_iter()
        .filter_map(|key| {
            registry
                .hot
                .get_latest(&key, snapshot.sequence)
                .filter(|pointer| pointer.is_visible_at(snapshot.sequence))
                .map(|pointer| (key, pointer))
        })
        .collect()
}

/// Returns the keys that share the provided prefix ordered by their latest
/// visible versions.
pub fn scan_prefix(
    registry: &IndexRegistry,
    prefix: &[u8],
    snapshot: MvccSnapshot,
) -> Vec<(Vec<u8>, VersionPointer)> {
    registry
        .hot
        .scan_prefix(prefix, snapshot.sequence)
        .into_iter()
        .filter(|(_, pointer)| pointer.is_visible_at(snapshot.sequence))
        .collect()
}