tempest-engine 0.0.2

Relational database engine for TempestDB
Documentation
use bytes::{Buf, Bytes};
use derive_more::{Display, Error, From};
use tempest_core::encoding::{LexicalDecodeError, RawDecodeError};

use crate::{
    base::KeySpace,
    catalog::{
        CatalogState,
        schema::{TypeExpr, TypeId, VariantId},
    },
    ctrl::hlc::HlcTimestamp,
    row::resolved::ResolvedTable,
    types::{TempestType, TempestValue},
};

#[derive(Debug, Display, From, Error)]
pub enum RowDecodeError {
    #[from(skip)]
    #[display("invalid key space: {}", _0)]
    InvalidKeySpace(#[error(not(source))] u8),
    LexicalDecodeError(LexicalDecodeError),
    RawDecodeError(RawDecodeError),
}

#[derive(Debug)]
pub(crate) struct RowDecoder<'a> {
    table: &'a ResolvedTable<'a>,
    catalog: &'a CatalogState,
    pk_indices: Vec<usize>,
}

impl<'a> RowDecoder<'a> {
    pub(crate) fn new(table: &'a ResolvedTable<'a>, catalog: &'a CatalogState) -> Self {
        let pk_indices = table.primary_key.clone();
        Self { table, catalog, pk_indices }
    }

    fn decode_enum_value(&self, buf: &mut Bytes, type_id: u32, type_args: &[TypeExpr]) -> Result<TempestValue<'static>, RowDecodeError> {
        let variant_id = buf.get_u32();
        let enum_schema = self.catalog
            .get_type(TypeId(type_id))
            .and_then(|s| s.as_enum())
            .expect("enum schema must exist in catalog");
        let variant = enum_schema.variants
            .get(&VariantId(variant_id))
            .expect("variant id must exist in enum schema");
        let mut fields = Vec::with_capacity(variant.fields.len());
        for ty_expr in &variant.fields {
            let concrete_ty = resolve_type_expr(ty_expr, type_args);
            let val = TempestValue::decode(buf, concrete_ty)?;
            fields.push(val);
        }
        Ok(TempestValue::Enum { type_id, variant_id, fields })
    }

    /// Decodes the PK columns and HLC from a key buffer.
    pub(crate) fn decode_key(
        &self,
        buf: &mut Bytes,
    ) -> Result<(Vec<TempestValue<'static>>, HlcTimestamp), RowDecodeError> {
        let ks = buf.get_u8();
        if ks != KeySpace::TableRow as u8 {
            return Err(RowDecodeError::InvalidKeySpace(ks));
        }
        buf.advance(4);

        let mut pk_values = Vec::with_capacity(self.pk_indices.len());
        for &idx in &self.pk_indices {
            let ty = self.table.flat_fields[idx].ty;
            let val = TempestValue::decode_lexical(buf, ty)?;
            pk_values.push(val);
        }

        let hlc = HlcTimestamp::from_u64(buf.get_u64());
        Ok((pk_values, hlc))
    }

    /// Decodes the non-PK columns from a value buffer.
    pub(crate) fn decode_value(
        &self,
        buf: &mut Bytes,
    ) -> Result<Vec<TempestValue<'static>>, RowDecodeError> {
        let mut values = Vec::new();
        for (idx, ff) in self.table.flat_fields.iter().enumerate() {
            if !self.pk_indices.contains(&idx) {
                let val = match ff.ty {
                    TempestType::Enum(type_id) => self.decode_enum_value(buf, type_id, &ff.type_args)?,
                    other => TempestValue::decode(buf, other)?,
                };
                values.push(val);
            }
        }
        Ok(values)
    }

    /// Decodes a full row, returning all column values in flat field order.
    #[instrument(level = "trace")]
    pub(crate) fn decode_row(
        &self,
        key_buf: &mut Bytes,
        value_buf: &mut Bytes,
    ) -> Result<Vec<TempestValue<'static>>, RowDecodeError> {
        let (pk_vals, _hlc) = self.decode_key(key_buf)?;
        let val_vals = self.decode_value(value_buf)?;

        let mut row = Vec::with_capacity(self.table.flat_fields.len());
        let mut pk_iter = pk_vals.into_iter();
        let mut val_iter = val_vals.into_iter();
        for idx in 0..self.table.flat_fields.len() {
            if self.pk_indices.contains(&idx) {
                row.push(pk_iter.next().unwrap());
            } else {
                row.push(val_iter.next().unwrap());
            }
        }
        Ok(row)
    }
}

fn resolve_type_expr(ty: &TypeExpr, args: &[TypeExpr]) -> TempestType {
    match ty {
        TypeExpr::Primitive(t) => *t,
        TypeExpr::GenericParam(i) => match args.get(*i as usize) {
            Some(TypeExpr::Primitive(t)) => *t,
            Some(TypeExpr::GenericParam(j)) => panic!("nested generic param {} in enum payload", j),
            Some(TypeExpr::Ref(..)) => panic!("ref type in enum payload field not supported"),
            None => panic!("generic param index {} out of range", i),
        },
        TypeExpr::Ref(..) => panic!("ref type in enum payload field not supported"),
    }
}