use bytes::{Buf, Bytes};
use derive_more::{Display, Error, From};
use tempest_core::encoding::{LexicalDecodeError, RawDecodeError};
use crate::{
base::KeySpace,
catalog::{
CatalogState,
schema::{TypeExpr, TypeId, VariantId},
},
ctrl::hlc::HlcTimestamp,
row::resolved::ResolvedTable,
types::{TempestType, TempestValue},
};
#[derive(Debug, Display, From, Error)]
pub enum RowDecodeError {
#[from(skip)]
#[display("invalid key space: {}", _0)]
InvalidKeySpace(#[error(not(source))] u8),
LexicalDecodeError(LexicalDecodeError),
RawDecodeError(RawDecodeError),
}
#[derive(Debug)]
pub(crate) struct RowDecoder<'a> {
table: &'a ResolvedTable<'a>,
catalog: &'a CatalogState,
pk_indices: Vec<usize>,
}
impl<'a> RowDecoder<'a> {
pub(crate) fn new(table: &'a ResolvedTable<'a>, catalog: &'a CatalogState) -> Self {
let pk_indices = table.primary_key.clone();
Self { table, catalog, pk_indices }
}
fn decode_enum_value(&self, buf: &mut Bytes, type_id: u32, type_args: &[TypeExpr]) -> Result<TempestValue<'static>, RowDecodeError> {
let variant_id = buf.get_u32();
let enum_schema = self.catalog
.get_type(TypeId(type_id))
.and_then(|s| s.as_enum())
.expect("enum schema must exist in catalog");
let variant = enum_schema.variants
.get(&VariantId(variant_id))
.expect("variant id must exist in enum schema");
let mut fields = Vec::with_capacity(variant.fields.len());
for ty_expr in &variant.fields {
let concrete_ty = resolve_type_expr(ty_expr, type_args);
let val = TempestValue::decode(buf, concrete_ty)?;
fields.push(val);
}
Ok(TempestValue::Enum { type_id, variant_id, fields })
}
pub(crate) fn decode_key(
&self,
buf: &mut Bytes,
) -> Result<(Vec<TempestValue<'static>>, HlcTimestamp), RowDecodeError> {
let ks = buf.get_u8();
if ks != KeySpace::TableRow as u8 {
return Err(RowDecodeError::InvalidKeySpace(ks));
}
buf.advance(4);
let mut pk_values = Vec::with_capacity(self.pk_indices.len());
for &idx in &self.pk_indices {
let ty = self.table.flat_fields[idx].ty;
let val = TempestValue::decode_lexical(buf, ty)?;
pk_values.push(val);
}
let hlc = HlcTimestamp::from_u64(buf.get_u64());
Ok((pk_values, hlc))
}
pub(crate) fn decode_value(
&self,
buf: &mut Bytes,
) -> Result<Vec<TempestValue<'static>>, RowDecodeError> {
let mut values = Vec::new();
for (idx, ff) in self.table.flat_fields.iter().enumerate() {
if !self.pk_indices.contains(&idx) {
let val = match ff.ty {
TempestType::Enum(type_id) => self.decode_enum_value(buf, type_id, &ff.type_args)?,
other => TempestValue::decode(buf, other)?,
};
values.push(val);
}
}
Ok(values)
}
#[instrument(level = "trace")]
pub(crate) fn decode_row(
&self,
key_buf: &mut Bytes,
value_buf: &mut Bytes,
) -> Result<Vec<TempestValue<'static>>, RowDecodeError> {
let (pk_vals, _hlc) = self.decode_key(key_buf)?;
let val_vals = self.decode_value(value_buf)?;
let mut row = Vec::with_capacity(self.table.flat_fields.len());
let mut pk_iter = pk_vals.into_iter();
let mut val_iter = val_vals.into_iter();
for idx in 0..self.table.flat_fields.len() {
if self.pk_indices.contains(&idx) {
row.push(pk_iter.next().unwrap());
} else {
row.push(val_iter.next().unwrap());
}
}
Ok(row)
}
}
fn resolve_type_expr(ty: &TypeExpr, args: &[TypeExpr]) -> TempestType {
match ty {
TypeExpr::Primitive(t) => *t,
TypeExpr::GenericParam(i) => match args.get(*i as usize) {
Some(TypeExpr::Primitive(t)) => *t,
Some(TypeExpr::GenericParam(j)) => panic!("nested generic param {} in enum payload", j),
Some(TypeExpr::Ref(..)) => panic!("ref type in enum payload field not supported"),
None => panic!("generic param index {} out of range", i),
},
TypeExpr::Ref(..) => panic!("ref type in enum payload field not supported"),
}
}