use bytes::{BufMut, Bytes, BytesMut};
use crate::error::RelError;
use crate::schema::{FieldType, FieldValue, Record, Schema};
pub fn encode_value(record: &Record, schema: &Schema) -> Result<Bytes, RelError> {
let mut buf = BytesMut::new();
for (i, field) in schema.fields.iter().enumerate() {
if field.is_key {
continue;
}
let v = record
.values
.get(i)
.ok_or_else(|| RelError::MissingField(field.name.clone()))?;
encode_field_value(&mut buf, &field.field_type, v)?;
}
Ok(buf.freeze())
}
pub fn encode_key(record: &Record, schema: &Schema) -> Result<Bytes, RelError> {
let mut buf = BytesMut::new();
for (i, field) in schema.fields.iter().enumerate() {
if !field.is_key {
continue;
}
let v = record
.values
.get(i)
.ok_or_else(|| RelError::MissingField(field.name.clone()))?;
encode_field_key(&mut buf, &field.field_type, v)?;
}
Ok(buf.freeze())
}
pub fn encode_field_key(
buf: &mut BytesMut,
ft: &FieldType,
v: &FieldValue,
) -> Result<(), RelError> {
match (ft, v) {
(FieldType::U8, FieldValue::U8(n)) => buf.put_u8(*n),
(FieldType::U16, FieldValue::U16(n)) => buf.put_u16(*n), (FieldType::U32, FieldValue::U32(n)) => buf.put_u32(*n),
(FieldType::U64, FieldValue::U64(n)) => buf.put_u64(*n),
(FieldType::U128, FieldValue::U128(n)) => buf.put_u128(*n),
(FieldType::I64, FieldValue::I64(n)) => {
buf.put_u64((*n as u64) ^ 0x8000_0000_0000_0000);
}
(FieldType::Bool, FieldValue::Bool(b)) => buf.put_u8(*b as u8),
(FieldType::Bytes(_), FieldValue::Bytes(b)) => buf.put_slice(b),
(FieldType::VarBytes, FieldValue::Bytes(b)) => {
buf.put_u32(b.len() as u32);
buf.put_slice(b);
}
(FieldType::Address, FieldValue::Bytes(b)) => buf.put_slice(b),
(FieldType::Hash, FieldValue::Bytes(b)) => buf.put_slice(b),
(_, FieldValue::Null) => {}
_ => {
return Err(RelError::Codec(format!(
"key type mismatch: {:?} vs {:?}",
ft.type_name(),
v.type_name()
)))
}
}
Ok(())
}
pub fn encode_field_value(
buf: &mut BytesMut,
ft: &FieldType,
v: &FieldValue,
) -> Result<(), RelError> {
match (ft, v) {
(FieldType::U8, FieldValue::U8(n)) => buf.put_u8(*n),
(FieldType::U16, FieldValue::U16(n)) => buf.put_u16_le(*n),
(FieldType::U32, FieldValue::U32(n)) => buf.put_u32_le(*n),
(FieldType::U64, FieldValue::U64(n)) => buf.put_u64_le(*n),
(FieldType::U128, FieldValue::U128(n)) => buf.put_u128_le(*n),
(FieldType::I64, FieldValue::I64(n)) => buf.put_i64_le(*n),
(FieldType::Bool, FieldValue::Bool(b)) => buf.put_u8(*b as u8),
(FieldType::Bytes(_), FieldValue::Bytes(b)) => buf.put_slice(b),
(FieldType::VarBytes, FieldValue::Bytes(b)) => {
buf.put_u32_le(b.len() as u32);
buf.put_slice(b);
}
(FieldType::Address, FieldValue::Bytes(b)) => buf.put_slice(b),
(FieldType::Hash, FieldValue::Bytes(b)) => buf.put_slice(b),
(_, FieldValue::Null) => {}
_ => {
return Err(RelError::Codec(format!(
"value type mismatch: {:?} vs {:?}",
ft.type_name(),
v.type_name()
)))
}
}
Ok(())
}
pub fn decode_value(data: &[u8], schema: &Schema) -> Result<Vec<FieldValue>, RelError> {
let mut pos = 0usize;
let mut values = Vec::with_capacity(schema.fields.len());
for field in &schema.fields {
if field.is_key {
values.push(FieldValue::Null); continue;
}
let (v, consumed) = decode_one(data, pos, &field.field_type)
.map_err(|e| RelError::Codec(format!("field '{}': {}", field.name, e)))?;
pos += consumed;
values.push(v);
}
Ok(values)
}
pub fn decode_key(data: &[u8], schema: &Schema) -> Result<Vec<(usize, FieldValue)>, RelError> {
let mut pos = 0usize;
let mut result = Vec::new();
for (i, field) in schema.fields.iter().enumerate() {
if !field.is_key {
continue;
}
let (v, consumed) = decode_one_key(data, pos, &field.field_type)
.map_err(|e| RelError::Codec(format!("key field '{}': {}", field.name, e)))?;
pos += consumed;
result.push((i, v));
}
Ok(result)
}
fn read_bytes(data: &[u8], pos: usize, n: usize) -> Result<&[u8], String> {
if pos + n > data.len() {
return Err(format!(
"need {} bytes at pos {}, have {}",
n,
pos,
data.len()
));
}
Ok(&data[pos..pos + n])
}
fn decode_one(data: &[u8], pos: usize, ft: &FieldType) -> Result<(FieldValue, usize), String> {
match ft {
FieldType::U8 => Ok((FieldValue::U8(data[pos]), 1)),
FieldType::U16 => {
let b = read_bytes(data, pos, 2)?;
Ok((
FieldValue::U16(u16::from_le_bytes(b.try_into().unwrap())),
2,
))
}
FieldType::U32 => {
let b = read_bytes(data, pos, 4)?;
Ok((
FieldValue::U32(u32::from_le_bytes(b.try_into().unwrap())),
4,
))
}
FieldType::U64 => {
let b = read_bytes(data, pos, 8)?;
Ok((
FieldValue::U64(u64::from_le_bytes(b.try_into().unwrap())),
8,
))
}
FieldType::U128 => {
let b = read_bytes(data, pos, 16)?;
Ok((
FieldValue::U128(u128::from_le_bytes(b.try_into().unwrap())),
16,
))
}
FieldType::I64 => {
let b = read_bytes(data, pos, 8)?;
Ok((
FieldValue::I64(i64::from_le_bytes(b.try_into().unwrap())),
8,
))
}
FieldType::Bool => Ok((FieldValue::Bool(data[pos] != 0), 1)),
FieldType::Bytes(n) => {
let b = read_bytes(data, pos, *n)?;
Ok((FieldValue::Bytes(b.to_vec()), *n))
}
FieldType::Address | FieldType::Hash => {
let b = read_bytes(data, pos, 32)?;
Ok((FieldValue::Bytes(b.to_vec()), 32))
}
FieldType::VarBytes => {
let lb = read_bytes(data, pos, 4)?;
let len = u32::from_le_bytes(lb.try_into().unwrap()) as usize;
let b = read_bytes(data, pos + 4, len)?;
Ok((FieldValue::Bytes(b.to_vec()), 4 + len))
}
}
}
fn decode_one_key(data: &[u8], pos: usize, ft: &FieldType) -> Result<(FieldValue, usize), String> {
match ft {
FieldType::U8 => Ok((FieldValue::U8(data[pos]), 1)),
FieldType::U16 => {
let b = read_bytes(data, pos, 2)?;
Ok((
FieldValue::U16(u16::from_be_bytes(b.try_into().unwrap())),
2,
))
}
FieldType::U32 => {
let b = read_bytes(data, pos, 4)?;
Ok((
FieldValue::U32(u32::from_be_bytes(b.try_into().unwrap())),
4,
))
}
FieldType::U64 => {
let b = read_bytes(data, pos, 8)?;
Ok((
FieldValue::U64(u64::from_be_bytes(b.try_into().unwrap())),
8,
))
}
FieldType::U128 => {
let b = read_bytes(data, pos, 16)?;
Ok((
FieldValue::U128(u128::from_be_bytes(b.try_into().unwrap())),
16,
))
}
FieldType::I64 => {
let b = read_bytes(data, pos, 8)?;
let raw = u64::from_be_bytes(b.try_into().unwrap()) ^ 0x8000_0000_0000_0000;
Ok((FieldValue::I64(raw as i64), 8))
}
other => decode_one(data, pos, other),
}
}