icydb-core 0.132.0

IcyDB — A schema-first typed query engine and persistence runtime for Internet Computer canisters
Documentation
mod cache;
mod direct;
mod metrics;
mod primary_key;

use crate::{
    db::data::{
        DataKey, RawRow, StructuralRowContract, StructuralRowDecodeError, StructuralRowFieldBytes,
        persisted_row::{
            codec::{ScalarSlotValueRef, decode_scalar_slot_value},
            contract::{decode_slot_value_for_field, validate_non_scalar_slot_value},
            reader::{
                cache::{
                    ValidatedScalarSlotValue, build_initial_slot_cache,
                    materialize_validated_scalar_slot_value, scalar_slot_value_ref_from_validated,
                    validated_scalar_slot_value,
                },
                primary_key::validate_storage_key_from_primary_key_bytes_with_field,
            },
            types::{CanonicalSlotReader, SlotReader},
        },
    },
    error::InternalError,
    model::{
        entity::EntityModel,
        field::{FieldModel, LeafCodec},
    },
    value::{StorageKey, Value},
};
use std::{borrow::Cow, cell::OnceCell};

pub(in crate::db::data::persisted_row) use cache::CachedSlotValue;
pub(in crate::db) use direct::{
    decode_dense_raw_row_with_contract, decode_sparse_indexed_raw_row_with_contract,
    decode_sparse_raw_row_with_contract, decode_sparse_required_slot_with_contract,
    decode_sparse_required_slot_with_contract_and_fields,
};
#[cfg(feature = "diagnostics")]
#[cfg_attr(all(test, not(feature = "diagnostics")), allow(unreachable_pub))]
pub use metrics::{StructuralReadMetrics, with_structural_read_metrics};
#[cfg(all(test, not(feature = "diagnostics")))]
pub(crate) use metrics::{StructuralReadMetrics, with_structural_read_metrics};

///
/// StructuralSlotReader
///
/// StructuralSlotReader adapts the current persisted-row bytes into the
/// canonical slot-reader seam.
/// It validates the persisted row envelope eagerly, then validates and
/// materializes individual slots only when a caller actually touches them.
/// That keeps row-backed selective reads from paying an O(field_count) decode
/// loop before the first real slot access.
///

pub(in crate::db) struct StructuralSlotReader<'a> {
    model: Option<&'static EntityModel>,
    contract: StructuralRowContract,
    field_bytes: StructuralRowFieldBytes<'a>,
    pub(in crate::db::data::persisted_row) cached_values: Vec<CachedSlotValue>,
    #[cfg(any(test, feature = "diagnostics"))]
    metrics: metrics::StructuralReadProbe,
}

impl<'a> StructuralSlotReader<'a> {
    /// Build one slot reader over one persisted row using the current structural row scanner.
    pub(in crate::db) fn from_raw_row(
        raw_row: &'a RawRow,
        model: &'static EntityModel,
    ) -> Result<Self, InternalError> {
        let reader = Self::from_raw_row_with_model(raw_row, model)?;
        reader.validate_all_declared_slots()?;

        Ok(reader)
    }

    /// Build one slot reader over one persisted row using one static
    /// structural row contract without retaining the full entity model.
    pub(in crate::db) fn from_raw_row_with_contract(
        raw_row: &'a RawRow,
        contract: StructuralRowContract,
    ) -> Result<Self, InternalError> {
        let field_bytes = StructuralRowFieldBytes::from_raw_row_with_contract(raw_row, contract)
            .map_err(StructuralRowDecodeError::into_internal_error)?;
        let reader = Self {
            model: None,
            contract,
            field_bytes,
            cached_values: build_initial_slot_cache(contract),
            #[cfg(any(test, feature = "diagnostics"))]
            metrics: metrics::StructuralReadProbe::begin(contract.field_count()),
        };

        Ok(reader)
    }

    // Build one slot reader over one persisted row while retaining the full
    // entity model for typed slot-reader seams that still require it.
    fn from_raw_row_with_model(
        raw_row: &'a RawRow,
        model: &'static EntityModel,
    ) -> Result<Self, InternalError> {
        let contract = StructuralRowContract::from_model(model);
        let field_bytes = StructuralRowFieldBytes::from_raw_row_with_contract(raw_row, contract)
            .map_err(StructuralRowDecodeError::into_internal_error)?;
        let reader = Self {
            model: Some(model),
            contract,
            field_bytes,
            cached_values: build_initial_slot_cache(contract),
            #[cfg(any(test, feature = "diagnostics"))]
            metrics: metrics::StructuralReadProbe::begin(contract.field_count()),
        };

        Ok(reader)
    }

    /// Return the owning structural model.
    #[must_use]
    pub(in crate::db) const fn model(&self) -> &'static EntityModel {
        self.model
            .expect("model-backed structural slot reader required by typed slot-reader seam")
    }

    /// Validate the decoded primary-key slot against the authoritative row key.
    pub(in crate::db) fn validate_storage_key(
        &self,
        data_key: &DataKey,
    ) -> Result<(), InternalError> {
        self.validate_storage_key_value(data_key.storage_key())
    }

    // Validate the decoded primary-key slot against one authoritative storage
    // key without rebuilding a full `DataKey` wrapper at the call site.
    fn validate_storage_key_value(&self, expected_key: StorageKey) -> Result<(), InternalError> {
        let primary_key_slot = self.model.map_or_else(
            || self.contract.primary_key_slot(),
            EntityModel::primary_key_slot,
        );
        let field = self.field_model(primary_key_slot)?;

        // Preserve the reader's scalar validation/cache side effect before the
        // shared raw-bytes validator performs the authoritative key check.
        if let (LeafCodec::Scalar(_), Some(CachedSlotValue::Scalar { validated, .. })) =
            (field.leaf_codec(), self.cached_values.get(primary_key_slot))
        {
            let _ = self.required_validated_scalar_slot_value(primary_key_slot, validated)?;
        }

        let raw_value = self.required_field_bytes(primary_key_slot, field.name())?;

        validate_storage_key_from_primary_key_bytes_with_field(raw_value, field, expected_key)
    }

    // Resolve one field model entry by stable slot index.
    fn field_model(&self, slot: usize) -> Result<&FieldModel, InternalError> {
        self.contract.fields().get(slot).ok_or_else(|| {
            InternalError::persisted_row_slot_lookup_out_of_bounds(
                self.contract.entity_path(),
                slot,
            )
        })
    }

    // Validate one scalar slot at most once and freeze the compact validated
    // scalar cache state shared by both scalar reads and semantic materialization.
    fn required_validated_scalar_slot_value(
        &self,
        slot: usize,
        validated: &OnceCell<ValidatedScalarSlotValue>,
    ) -> Result<ValidatedScalarSlotValue, InternalError> {
        if let Some(validated) = validated.get() {
            return Ok(*validated);
        }

        let field = self.field_model(slot)?;
        let raw_value = self.required_field_bytes(slot, field.name())?;
        let LeafCodec::Scalar(codec) = field.leaf_codec() else {
            return Err(InternalError::persisted_row_decode_failed(format!(
                "validated scalar cache routed through non-scalar field contract: slot={slot}",
            )));
        };
        #[cfg(any(test, feature = "diagnostics"))]
        self.metrics.record_validated_slot();
        let validated_value =
            validated_scalar_slot_value(decode_scalar_slot_value(raw_value, codec, field.name())?);
        let _ = validated.set(validated_value);

        Ok(validated_value)
    }

    // Borrow one declared slot value from the validated structural cache,
    // materializing the semantic `Value` lazily when the caller first touches
    // that slot.
    pub(in crate::db::data::persisted_row) fn required_cached_value(
        &self,
        slot: usize,
    ) -> Result<&Value, InternalError> {
        let cached = self.cached_values.get(slot).ok_or_else(|| {
            InternalError::persisted_row_slot_cache_lookup_out_of_bounds(
                self.contract.entity_path(),
                slot,
            )
        })?;

        match cached {
            CachedSlotValue::Scalar {
                validated,
                materialized,
            } => {
                let validated = self.required_validated_scalar_slot_value(slot, validated)?;
                if materialized.get().is_none() {
                    let value = materialize_validated_scalar_slot_value(
                        validated,
                        self.contract,
                        &self.field_bytes,
                        slot,
                    )?;
                    let _ = materialized.set(value);
                }

                materialized.get().ok_or_else(|| {
                    InternalError::persisted_row_decode_failed(format!(
                        "structural scalar cache failed to materialize deferred value: slot={slot}",
                    ))
                })
            }
            CachedSlotValue::Deferred { materialized } => {
                let field = self.field_model(slot)?;
                let raw_value = self.required_field_bytes(slot, field.name())?;
                if materialized.get().is_none() {
                    #[cfg(any(test, feature = "diagnostics"))]
                    {
                        self.metrics.record_validated_slot();
                        self.metrics.record_validated_non_scalar();
                        self.metrics.record_materialized_non_scalar();
                    }
                    validate_non_scalar_slot_value(raw_value, field)?;
                    let value = decode_slot_value_for_field(field, raw_value)?;
                    let _ = materialized.set(value);
                }

                materialized.get().ok_or_else(|| {
                    InternalError::persisted_row_decode_failed(format!(
                        "structural slot cache failed to materialize deferred value: slot={slot}",
                    ))
                })
            }
        }
    }

    // Borrow one declared slot payload, treating absence as a persisted-row
    // invariant violation instead of a normal structural branch.
    pub(in crate::db) fn required_field_bytes(
        &self,
        slot: usize,
        field_name: &str,
    ) -> Result<&[u8], InternalError> {
        self.field_bytes
            .field(slot)
            .ok_or_else(|| InternalError::persisted_row_declared_field_missing(field_name))
    }

    // Validate every declared slot once at the model-backed structural row
    // boundary so fail-closed callers reject malformed unused fields before
    // projection, relation, or commit logic runs.
    fn validate_all_declared_slots(&self) -> Result<(), InternalError> {
        for (slot, field) in self.contract.fields().iter().enumerate() {
            let raw_value = self.required_field_bytes(slot, field.name())?;

            match field.leaf_codec() {
                LeafCodec::Scalar(codec) => {
                    #[cfg(any(test, feature = "diagnostics"))]
                    self.metrics.record_validated_slot();
                    decode_scalar_slot_value(raw_value, codec, field.name())?;
                }
                LeafCodec::StructuralFallback => {
                    #[cfg(any(test, feature = "diagnostics"))]
                    {
                        self.metrics.record_validated_slot();
                        self.metrics.record_validated_non_scalar();
                    }
                    validate_non_scalar_slot_value(raw_value, field)?;
                }
            }
        }

        Ok(())
    }
}

impl SlotReader for StructuralSlotReader<'_> {
    fn model(&self) -> &'static EntityModel {
        self.model()
    }

    fn has(&self, slot: usize) -> bool {
        self.field_bytes.field(slot).is_some()
    }

    fn get_bytes(&self, slot: usize) -> Option<&[u8]> {
        self.field_bytes.field(slot)
    }

    fn get_scalar(&self, slot: usize) -> Result<Option<ScalarSlotValueRef<'_>>, InternalError> {
        let field = self.field_model(slot)?;

        match field.leaf_codec() {
            LeafCodec::Scalar(_codec) => match self.cached_values.get(slot) {
                Some(CachedSlotValue::Scalar { validated, .. }) => {
                    let validated = self.required_validated_scalar_slot_value(slot, validated)?;

                    scalar_slot_value_ref_from_validated(
                        validated,
                        self.contract,
                        &self.field_bytes,
                        slot,
                    )
                    .map(Some)
                }
                Some(CachedSlotValue::Deferred { .. }) => {
                    Err(InternalError::persisted_row_decode_failed(format!(
                        "structural scalar slot routed through non-scalar cache variant: slot={slot}",
                    )))
                }
                None => Err(
                    InternalError::persisted_row_slot_cache_lookup_out_of_bounds(
                        self.contract.entity_path(),
                        slot,
                    ),
                ),
            },
            LeafCodec::StructuralFallback => Ok(None),
        }
    }

    fn get_value(&mut self, slot: usize) -> Result<Option<Value>, InternalError> {
        Ok(Some(self.required_cached_value(slot)?.clone()))
    }
}

impl CanonicalSlotReader for StructuralSlotReader<'_> {
    fn required_bytes(&self, slot: usize) -> Result<&[u8], InternalError> {
        let field = self.field_model(slot)?;

        self.get_bytes(slot)
            .ok_or_else(|| InternalError::persisted_row_declared_field_missing(field.name()))
    }

    fn required_scalar(&self, slot: usize) -> Result<ScalarSlotValueRef<'_>, InternalError> {
        let field = self.field_model(slot)?;
        debug_assert!(matches!(field.leaf_codec(), LeafCodec::Scalar(_)));

        self.get_scalar(slot)?
            .ok_or_else(|| InternalError::persisted_row_declared_field_missing(field.name()))
    }

    fn required_value_by_contract(&self, slot: usize) -> Result<Value, InternalError> {
        Ok(self.required_cached_value(slot)?.clone())
    }

    fn required_value_by_contract_cow(&self, slot: usize) -> Result<Cow<'_, Value>, InternalError> {
        Ok(Cow::Borrowed(self.required_cached_value(slot)?))
    }
}