donadb-rel 0.1.2

DonaDB Relational — typed schemas, secondary indexes, and relational scans on top of DonaDB. Built for the TruthLinked blockchain.
Documentation
// codec.rs — compact binary encoding for typed FieldValues.
//
// Wire format for a Record (field values written in schema order):
//   For each field:
//     U8:      1 byte, raw
//     U16:     2 bytes, little-endian
//     U32:     4 bytes, little-endian
//     U64:     8 bytes, little-endian
//     U128:   16 bytes, little-endian
//     I64:     8 bytes, little-endian (two's complement)
//     Bool:    1 byte, 0x00=false 0x01=true
//     Bytes(n): n bytes, raw
//     Address: 32 bytes, raw
//     Hash:    32 bytes, raw
//     VarBytes: u32 length (4 bytes LE) followed by `length` bytes
//     Null:    0 bytes (schema knows which field is nullable)
//
// Key encoding (for DonaDB key bytes):
//   Key fields are encoded in schema order and concatenated.
//   This makes range scans over compound keys work correctly:
//   the lexicographic byte order of the encoded key matches the
//   natural ordering of the field values.
//   U* fields are encoded big-endian for key ordering correctness.
//   VarBytes keys are prefixed with 4-byte BE length.

use bytes::{BufMut, Bytes, BytesMut};

use crate::error::RelError;
use crate::schema::{FieldType, FieldValue, Record, Schema};

/// Encode a record's VALUE fields (non-key fields) to Bytes.
/// This is what gets stored as the DonaDB value.
pub fn encode_value(record: &Record, schema: &Schema) -> Result<Bytes, RelError> {
    let mut buf = BytesMut::new();
    for (i, field) in schema.fields.iter().enumerate() {
        if field.is_key {
            continue;
        }
        let v = record
            .values
            .get(i)
            .ok_or_else(|| RelError::MissingField(field.name.clone()))?;
        encode_field_value(&mut buf, &field.field_type, v)?;
    }
    Ok(buf.freeze())
}

/// Encode a record's KEY fields to Bytes (big-endian for correct ordering).
/// This is what becomes the DonaDB key.
pub fn encode_key(record: &Record, schema: &Schema) -> Result<Bytes, RelError> {
    let mut buf = BytesMut::new();
    for (i, field) in schema.fields.iter().enumerate() {
        if !field.is_key {
            continue;
        }
        let v = record
            .values
            .get(i)
            .ok_or_else(|| RelError::MissingField(field.name.clone()))?;
        encode_field_key(&mut buf, &field.field_type, v)?;
    }
    Ok(buf.freeze())
}

/// Encode a single FieldValue for KEY use (big-endian numerics for range ordering).
pub fn encode_field_key(
    buf: &mut BytesMut,
    ft: &FieldType,
    v: &FieldValue,
) -> Result<(), RelError> {
    match (ft, v) {
        (FieldType::U8, FieldValue::U8(n)) => buf.put_u8(*n),
        (FieldType::U16, FieldValue::U16(n)) => buf.put_u16(*n), // big-endian
        (FieldType::U32, FieldValue::U32(n)) => buf.put_u32(*n),
        (FieldType::U64, FieldValue::U64(n)) => buf.put_u64(*n),
        (FieldType::U128, FieldValue::U128(n)) => buf.put_u128(*n),
        (FieldType::I64, FieldValue::I64(n)) => {
            // Flip sign bit so negative < positive in byte order
            buf.put_u64((*n as u64) ^ 0x8000_0000_0000_0000);
        }
        (FieldType::Bool, FieldValue::Bool(b)) => buf.put_u8(*b as u8),
        (FieldType::Bytes(_), FieldValue::Bytes(b)) => buf.put_slice(b),
        (FieldType::VarBytes, FieldValue::Bytes(b)) => {
            buf.put_u32(b.len() as u32);
            buf.put_slice(b);
        }
        (FieldType::Address, FieldValue::Bytes(b)) => buf.put_slice(b),
        (FieldType::Hash, FieldValue::Bytes(b)) => buf.put_slice(b),
        (_, FieldValue::Null) => {}
        _ => {
            return Err(RelError::Codec(format!(
                "key type mismatch: {:?} vs {:?}",
                ft.type_name(),
                v.type_name()
            )))
        }
    }
    Ok(())
}

pub fn encode_field_value(
    buf: &mut BytesMut,
    ft: &FieldType,
    v: &FieldValue,
) -> Result<(), RelError> {
    match (ft, v) {
        (FieldType::U8, FieldValue::U8(n)) => buf.put_u8(*n),
        (FieldType::U16, FieldValue::U16(n)) => buf.put_u16_le(*n),
        (FieldType::U32, FieldValue::U32(n)) => buf.put_u32_le(*n),
        (FieldType::U64, FieldValue::U64(n)) => buf.put_u64_le(*n),
        (FieldType::U128, FieldValue::U128(n)) => buf.put_u128_le(*n),
        (FieldType::I64, FieldValue::I64(n)) => buf.put_i64_le(*n),
        (FieldType::Bool, FieldValue::Bool(b)) => buf.put_u8(*b as u8),
        (FieldType::Bytes(_), FieldValue::Bytes(b)) => buf.put_slice(b),
        (FieldType::VarBytes, FieldValue::Bytes(b)) => {
            buf.put_u32_le(b.len() as u32);
            buf.put_slice(b);
        }
        (FieldType::Address, FieldValue::Bytes(b)) => buf.put_slice(b),
        (FieldType::Hash, FieldValue::Bytes(b)) => buf.put_slice(b),
        (_, FieldValue::Null) => {}
        _ => {
            return Err(RelError::Codec(format!(
                "value type mismatch: {:?} vs {:?}",
                ft.type_name(),
                v.type_name()
            )))
        }
    }
    Ok(())
}

/// Decode a stored value Bytes back into a Record (value fields only).
/// Key fields are decoded separately from the DonaDB key.
pub fn decode_value(data: &[u8], schema: &Schema) -> Result<Vec<FieldValue>, RelError> {
    let mut pos = 0usize;
    let mut values = Vec::with_capacity(schema.fields.len());
    for field in &schema.fields {
        if field.is_key {
            values.push(FieldValue::Null); // placeholder; filled from key by caller
            continue;
        }
        let (v, consumed) = decode_one(data, pos, &field.field_type)
            .map_err(|e| RelError::Codec(format!("field '{}': {}", field.name, e)))?;
        pos += consumed;
        values.push(v);
    }
    Ok(values)
}

/// Decode a stored key Bytes back into key FieldValues.
pub fn decode_key(data: &[u8], schema: &Schema) -> Result<Vec<(usize, FieldValue)>, RelError> {
    let mut pos = 0usize;
    let mut result = Vec::new();
    for (i, field) in schema.fields.iter().enumerate() {
        if !field.is_key {
            continue;
        }
        let (v, consumed) = decode_one_key(data, pos, &field.field_type)
            .map_err(|e| RelError::Codec(format!("key field '{}': {}", field.name, e)))?;
        pos += consumed;
        result.push((i, v));
    }
    Ok(result)
}

fn read_bytes(data: &[u8], pos: usize, n: usize) -> Result<&[u8], String> {
    if pos + n > data.len() {
        return Err(format!(
            "need {} bytes at pos {}, have {}",
            n,
            pos,
            data.len()
        ));
    }
    Ok(&data[pos..pos + n])
}

fn decode_one(data: &[u8], pos: usize, ft: &FieldType) -> Result<(FieldValue, usize), String> {
    match ft {
        FieldType::U8 => Ok((FieldValue::U8(data[pos]), 1)),
        FieldType::U16 => {
            let b = read_bytes(data, pos, 2)?;
            Ok((
                FieldValue::U16(u16::from_le_bytes(b.try_into().unwrap())),
                2,
            ))
        }
        FieldType::U32 => {
            let b = read_bytes(data, pos, 4)?;
            Ok((
                FieldValue::U32(u32::from_le_bytes(b.try_into().unwrap())),
                4,
            ))
        }
        FieldType::U64 => {
            let b = read_bytes(data, pos, 8)?;
            Ok((
                FieldValue::U64(u64::from_le_bytes(b.try_into().unwrap())),
                8,
            ))
        }
        FieldType::U128 => {
            let b = read_bytes(data, pos, 16)?;
            Ok((
                FieldValue::U128(u128::from_le_bytes(b.try_into().unwrap())),
                16,
            ))
        }
        FieldType::I64 => {
            let b = read_bytes(data, pos, 8)?;
            Ok((
                FieldValue::I64(i64::from_le_bytes(b.try_into().unwrap())),
                8,
            ))
        }
        FieldType::Bool => Ok((FieldValue::Bool(data[pos] != 0), 1)),
        FieldType::Bytes(n) => {
            let b = read_bytes(data, pos, *n)?;
            Ok((FieldValue::Bytes(b.to_vec()), *n))
        }
        FieldType::Address | FieldType::Hash => {
            let b = read_bytes(data, pos, 32)?;
            Ok((FieldValue::Bytes(b.to_vec()), 32))
        }
        FieldType::VarBytes => {
            let lb = read_bytes(data, pos, 4)?;
            let len = u32::from_le_bytes(lb.try_into().unwrap()) as usize;
            let b = read_bytes(data, pos + 4, len)?;
            Ok((FieldValue::Bytes(b.to_vec()), 4 + len))
        }
    }
}

/// Decode key fields encoded big-endian.
fn decode_one_key(data: &[u8], pos: usize, ft: &FieldType) -> Result<(FieldValue, usize), String> {
    match ft {
        FieldType::U8 => Ok((FieldValue::U8(data[pos]), 1)),
        FieldType::U16 => {
            let b = read_bytes(data, pos, 2)?;
            Ok((
                FieldValue::U16(u16::from_be_bytes(b.try_into().unwrap())),
                2,
            ))
        }
        FieldType::U32 => {
            let b = read_bytes(data, pos, 4)?;
            Ok((
                FieldValue::U32(u32::from_be_bytes(b.try_into().unwrap())),
                4,
            ))
        }
        FieldType::U64 => {
            let b = read_bytes(data, pos, 8)?;
            Ok((
                FieldValue::U64(u64::from_be_bytes(b.try_into().unwrap())),
                8,
            ))
        }
        FieldType::U128 => {
            let b = read_bytes(data, pos, 16)?;
            Ok((
                FieldValue::U128(u128::from_be_bytes(b.try_into().unwrap())),
                16,
            ))
        }
        FieldType::I64 => {
            let b = read_bytes(data, pos, 8)?;
            let raw = u64::from_be_bytes(b.try_into().unwrap()) ^ 0x8000_0000_0000_0000;
            Ok((FieldValue::I64(raw as i64), 8))
        }
        // Non-numeric key fields: same as value encoding
        other => decode_one(data, pos, other),
    }
}