icydb-core 0.68.7

IcyDB — A type-safe, embedded ORM and schema system for the Internet Computer
Documentation
//! Module: index::scan
//! Responsibility: raw-range index scan resolution under cursor-owned continuation contracts.
//! Does not own: index persistence layout or predicate compilation.
//! Boundary: executor/query range reads go through this layer above `index::store`.

use crate::{
    db::{
        cursor::{
            ContinuationKeyRef, ContinuationRuntime, IndexScanContinuationInput, LoopAction,
            WindowCursorContract,
        },
        data::DataKey,
        direction::Direction,
        index::{
            IndexKey,
            entry::RawIndexEntry,
            envelope_is_empty,
            key::RawIndexKey,
            predicate::{IndexPredicateExecution, eval_index_execution_on_decoded_key},
            store::IndexStore,
        },
    },
    error::InternalError,
    model::index::IndexModel,
    types::EntityTag,
};
use std::ops::Bound;

type IndexComponentValues = Vec<Vec<u8>>;
type DataKeyComponentRows = Vec<(DataKey, IndexComponentValues)>;

impl IndexStore {
    // Keep bounded scan preallocation modest so common page-limited reads
    // avoid the first growth step without reserving pathologically large
    // vectors from caller-supplied limits.
    const LIMITED_SCAN_PREALLOC_CAP: usize = 32;

    pub(in crate::db) fn resolve_data_values_in_raw_range_limited(
        &self,
        entity: EntityTag,
        index: &IndexModel,
        bounds: (&Bound<RawIndexKey>, &Bound<RawIndexKey>),
        continuation: IndexScanContinuationInput<'_>,
        limit: usize,
        index_predicate_execution: Option<IndexPredicateExecution<'_>>,
    ) -> Result<Vec<DataKey>, InternalError> {
        self.resolve_raw_range_limited(bounds, continuation, limit, |raw_key, value, out| {
            Self::decode_index_entry_and_push(
                entity,
                index,
                raw_key,
                value,
                out,
                Some(limit),
                "range resolve",
                index_predicate_execution,
            )
        })
    }

    #[expect(clippy::too_many_arguments)]
    pub(in crate::db) fn resolve_data_values_with_components_in_raw_range_limited(
        &self,
        entity: EntityTag,
        index: &IndexModel,
        bounds: (&Bound<RawIndexKey>, &Bound<RawIndexKey>),
        continuation: IndexScanContinuationInput<'_>,
        limit: usize,
        component_indices: &[usize],
        index_predicate_execution: Option<IndexPredicateExecution<'_>>,
    ) -> Result<DataKeyComponentRows, InternalError> {
        self.resolve_raw_range_limited(bounds, continuation, limit, |raw_key, value, out| {
            Self::decode_index_entry_and_push_with_components(
                entity,
                index,
                raw_key,
                value,
                out,
                Some(limit),
                component_indices,
                "range resolve",
                index_predicate_execution,
            )
        })
    }

    // Resolve one bounded directional raw-range scan with shared continuation guards.
    fn resolve_raw_range_limited<T, F>(
        &self,
        bounds: (&Bound<RawIndexKey>, &Bound<RawIndexKey>),
        continuation: IndexScanContinuationInput<'_>,
        limit: usize,
        mut decode_and_push: F,
    ) -> Result<Vec<T>, InternalError>
    where
        F: FnMut(&RawIndexKey, &RawIndexEntry, &mut Vec<T>) -> Result<bool, InternalError>,
    {
        // Phase 1: handle degenerate and initial-window cases without paying
        // continuation-runtime setup when there is no resume anchor.
        if limit == 0 {
            return Ok(Vec::new());
        }

        if !continuation.has_anchor() {
            if envelope_is_empty(bounds.0, bounds.1) {
                return Ok(Vec::new());
            }

            let mut out = Vec::with_capacity(limit.min(Self::LIMITED_SCAN_PREALLOC_CAP));
            match continuation.direction() {
                Direction::Asc => {
                    for entry in self.map.range((bounds.0.clone(), bounds.1.clone())) {
                        if decode_and_push(entry.key(), &entry.value(), &mut out)? {
                            return Ok(out);
                        }
                    }
                }
                Direction::Desc => {
                    for entry in self.map.range((bounds.0.clone(), bounds.1.clone())).rev() {
                        if decode_and_push(entry.key(), &entry.value(), &mut out)? {
                            return Ok(out);
                        }
                    }
                }
            }

            return Ok(out);
        }

        // Phase 2: derive validated cursor-owned resume bounds for resumed scans.
        let continuation =
            ContinuationRuntime::new(continuation, WindowCursorContract::unbounded());
        let (start_raw, end_raw) = continuation.scan_bounds(bounds)?;

        if envelope_is_empty(&start_raw, &end_raw) {
            return Ok(Vec::new());
        }

        // Phase 3: scan in directional order and decode entries until limit.
        let mut out = Vec::with_capacity(limit.min(Self::LIMITED_SCAN_PREALLOC_CAP));

        match continuation.direction() {
            Direction::Asc => {
                for entry in self.map.range((start_raw, end_raw)) {
                    let raw_key = entry.key();
                    let value = entry.value();

                    if Self::scan_range_entry(
                        &continuation,
                        raw_key,
                        &value,
                        &mut out,
                        &mut decode_and_push,
                    )? {
                        return Ok(out);
                    }
                }
            }
            Direction::Desc => {
                for entry in self.map.range((start_raw, end_raw)).rev() {
                    let raw_key = entry.key();
                    let value = entry.value();

                    if Self::scan_range_entry(
                        &continuation,
                        raw_key,
                        &value,
                        &mut out,
                        &mut decode_and_push,
                    )? {
                        return Ok(out);
                    }
                }
            }
        }

        Ok(out)
    }

    // Apply continuation advancement guard and one decode/push attempt for an entry.
    fn scan_range_entry<T, F>(
        continuation: &ContinuationRuntime<'_>,
        raw_key: &RawIndexKey,
        value: &RawIndexEntry,
        out: &mut Vec<T>,
        decode_and_push: &mut F,
    ) -> Result<bool, InternalError>
    where
        F: FnMut(&RawIndexKey, &RawIndexEntry, &mut Vec<T>) -> Result<bool, InternalError>,
    {
        match continuation.accept_key(ContinuationKeyRef::scan(raw_key))? {
            LoopAction::Skip => return Ok(false),
            LoopAction::Emit => {}
            LoopAction::Stop => return Ok(true),
        }

        decode_and_push(raw_key, value, out)
    }

    #[expect(clippy::too_many_arguments)]
    fn decode_index_entry_and_push(
        entity: EntityTag,
        index: &IndexModel,
        raw_key: &RawIndexKey,
        value: &RawIndexEntry,
        out: &mut Vec<DataKey>,
        limit: Option<usize>,
        context: &'static str,
        index_predicate_execution: Option<IndexPredicateExecution<'_>>,
    ) -> Result<bool, InternalError> {
        // Phase 1: only decode raw key components when an index-only
        // predicate needs them. Plain membership scans only need the entry
        // payload, so they should not pay raw-key decode on every hit.
        if let Some(execution) = index_predicate_execution {
            let decoded_key = IndexKey::try_from_raw(raw_key)
                .map_err(|err| InternalError::index_scan_key_corrupted_during(context, err))?;
            if !eval_index_execution_on_decoded_key(&decoded_key, execution)? {
                return Ok(false);
            }
        }

        // Phase 2: fast-path one-key entries without allocating the full
        // membership vector.
        if let Some(storage_key) = value
            .decode_single_key()
            .map_err(InternalError::index_entry_decode_failed)?
        {
            out.push(DataKey::new(entity, storage_key));

            if let Some(limit) = limit
                && out.len() == limit
            {
                return Ok(true);
            }

            return Ok(false);
        }

        // Phase 3: decode multi-key entry payload and push bounded data keys.
        let storage_keys = value
            .decode_keys()
            .map_err(InternalError::index_entry_decode_failed)?;

        if index.is_unique() && storage_keys.len() != 1 {
            return Err(InternalError::unique_index_entry_single_key_required());
        }

        for storage_key in storage_keys {
            out.push(DataKey::new(entity, storage_key));

            if let Some(limit) = limit
                && out.len() == limit
            {
                return Ok(true);
            }
        }

        Ok(false)
    }

    #[expect(clippy::too_many_arguments)]
    fn decode_index_entry_and_push_with_components(
        entity: EntityTag,
        index: &IndexModel,
        raw_key: &RawIndexKey,
        value: &RawIndexEntry,
        out: &mut Vec<(DataKey, Vec<Vec<u8>>)>,
        limit: Option<usize>,
        component_indices: &[usize],
        context: &'static str,
        index_predicate_execution: Option<IndexPredicateExecution<'_>>,
    ) -> Result<bool, InternalError> {
        // Phase 1: decode the raw key once, extract every requested component,
        // and evaluate any optional index-only predicate against that one
        // decoded key view.
        let decoded_key = IndexKey::try_from_raw(raw_key)
            .map_err(|err| InternalError::index_scan_key_corrupted_during(context, err))?;
        let mut components = Vec::with_capacity(component_indices.len());
        for component_index in component_indices {
            let Some(component) = decoded_key.component(*component_index) else {
                return Err(InternalError::index_projection_component_required(
                    index.name(),
                    *component_index,
                ));
            };
            components.push(component.to_vec());
        }

        if let Some(execution) = index_predicate_execution
            && !eval_index_execution_on_decoded_key(&decoded_key, execution)?
        {
            return Ok(false);
        }

        // Phase 2: fast-path one-key entries without allocating the full
        // membership vector.
        if let Some(storage_key) = value
            .decode_single_key()
            .map_err(InternalError::index_entry_decode_failed)?
        {
            out.push((DataKey::new(entity, storage_key), components));

            if let Some(limit) = limit
                && out.len() == limit
            {
                return Ok(true);
            }

            return Ok(false);
        }

        // Phase 3: decode multi-key entry payload and push bounded
        // `(data_key, components)`.
        let storage_keys = value
            .decode_keys()
            .map_err(InternalError::index_entry_decode_failed)?;

        if index.is_unique() && storage_keys.len() != 1 {
            return Err(InternalError::unique_index_entry_single_key_required());
        }

        for storage_key in storage_keys {
            out.push((DataKey::new(entity, storage_key), components.clone()));

            if let Some(limit) = limit
                && out.len() == limit
            {
                return Ok(true);
            }
        }

        Ok(false)
    }
}

///
/// TESTS
///

#[cfg(test)]
mod tests {
    use super::IndexStore;
    use crate::{
        db::{
            data::{DataKey, StorageKey},
            index::{RawIndexEntry, RawIndexKey},
        },
        model::index::IndexModel,
        traits::Storable,
        types::EntityTag,
    };
    use std::borrow::Cow;

    const TEST_SCAN_INDEX_FIELDS: &[&str] = &["name"];
    const TEST_SCAN_INDEX: IndexModel = IndexModel::new(
        "scan::idx_name",
        "scan::IndexStore",
        TEST_SCAN_INDEX_FIELDS,
        false,
    );

    #[test]
    fn decode_index_entry_and_push_without_index_predicate_skips_raw_key_decode() {
        let entity = EntityTag::new(7);
        let raw_key = RawIndexKey::from_bytes(Cow::Owned(vec![0xFF]));
        let raw_entry =
            RawIndexEntry::try_from_keys([StorageKey::Uint(11)]).expect("encode index entry");
        let mut out = Vec::new();

        let halted = IndexStore::decode_index_entry_and_push(
            entity,
            &TEST_SCAN_INDEX,
            &raw_key,
            &raw_entry,
            &mut out,
            Some(1),
            "test scan",
            None,
        )
        .expect("plain membership scan should not require raw key decode");

        assert!(halted, "bounded single-row scan should stop at the limit");
        assert_eq!(out, vec![DataKey::new(entity, StorageKey::Uint(11))]);
    }
}