use std::{borrow::Cow, collections::BTreeMap};
use super::{decoder::*, encoder::*};
use crate::{
base::{EngineComparer, KeySpace},
catalog::{CatalogState, schema::{FieldDef, FieldId, FlatField, TableId, TypeExpr}},
ctrl::hlc::HlcTimestamp,
row::resolved::ResolvedTable,
types::{TempestType, TempestValue},
};
use bytes::BytesMut;
use tempest_kv::base::Comparer;
fn make_resolved() -> (
TableId,
BTreeMap<FieldId, FieldDef>,
Vec<TypeExpr>,
Vec<usize>,
Vec<FlatField>,
) {
let table_id = TableId(1);
let mut fields = BTreeMap::new();
fields.insert(
FieldId(0),
FieldDef {
name: "id".into(),
ty: TypeExpr::Primitive(TempestType::Int64),
},
);
fields.insert(
FieldId(1),
FieldDef {
name: "name".into(),
ty: TypeExpr::Primitive(TempestType::String),
},
);
let flat_fields = vec![
FlatField { name: "id".into(), ty: TempestType::Int64, type_args: vec![] },
FlatField { name: "name".into(), ty: TempestType::String, type_args: vec![] },
];
let primary_key = vec![0usize];
(table_id, fields, Vec::new(), primary_key, flat_fields)
}
fn encode(row: &[TempestValue<'_>], hlc: HlcTimestamp) -> (BytesMut, BytesMut) {
let (table_id, fields, generic_args, primary_key, flat_fields) = make_resolved();
let resolved = ResolvedTable {
id: table_id,
fields: &fields,
generic_args: &generic_args,
primary_key,
flat_fields,
};
let encoder = RowEncoder::new(&resolved);
let mut key_buf = BytesMut::new();
let mut val_buf = BytesMut::new();
encoder.encode_row(row, hlc, &mut key_buf, &mut val_buf);
(key_buf, val_buf)
}
#[test]
fn test_encode_decode_row() {
let rows = [
(1, "john"),
(6, "vimthusiast"),
(7, "jam06452"),
(24, "bob"),
(42, "contai\0ns\0nu\0ll"),
]
.map(|(id, name)| {
(
TempestValue::Int64(id),
TempestValue::String(Cow::Borrowed(name)),
)
});
let (table_id, fields, generic_args, primary_key, flat_fields) = make_resolved();
let resolved = ResolvedTable {
id: table_id,
fields: &fields,
generic_args: &generic_args,
primary_key,
flat_fields,
};
let encoder = RowEncoder::new(&resolved);
let catalog = CatalogState::default();
let decoder = RowDecoder::new(&resolved, &catalog);
for (hlc, (id, name)) in rows
.into_iter()
.enumerate()
.map(|(i, cols)| (HlcTimestamp::from_u64((i * 10) as u64), cols))
{
let mut key_buf = BytesMut::with_capacity(128);
let mut value_buf = BytesMut::with_capacity(128);
let input_row = &[id.clone(), name.clone()];
encoder.encode_row(input_row, hlc, &mut key_buf, &mut value_buf);
let result = decoder
.decode_row(&mut key_buf.freeze(), &mut value_buf.freeze())
.unwrap();
assert_eq!(result[0], input_row[0]);
assert_eq!(result[1], input_row[1]);
assert_eq!(result.len(), input_row.len());
}
}
#[test]
fn encode_row_key_starts_with_keyspace_prefix() {
let row = &[TempestValue::Int64(1), TempestValue::String("alice".into())];
let (key, _) = encode(row, HlcTimestamp::new(0, 0));
assert_eq!(key[0], KeySpace::TableRow as u8);
}
#[test]
fn encode_row_key_ends_with_hlc() {
let hlc = HlcTimestamp::new(12345, 7);
let row = &[TempestValue::Int64(1), TempestValue::String("alice".into())];
let (key, _) = encode(row, hlc);
let suffix = &key[key.len() - 8..];
let decoded = u64::from_be_bytes(suffix.try_into().unwrap());
assert_eq!(decoded, *hlc);
}
#[test]
fn encode_row_value_contains_non_pk_columns_only() {
let row = &[TempestValue::Int64(42), TempestValue::String("bob".into())];
let (_, val) = encode(row, HlcTimestamp::new(0, 0));
assert!(!val.is_empty());
}
#[test]
#[should_panic]
fn typecheck_rejects_wrong_type() {
let row = &[TempestValue::Bool(true), TempestValue::String("x".into())];
encode(row, HlcTimestamp::new(0, 0));
}
#[test]
fn different_pk_values_produce_different_keys() {
let row_a = &[TempestValue::Int64(1), TempestValue::String("alice".into())];
let row_b = &[TempestValue::Int64(2), TempestValue::String("alice".into())];
let hlc = HlcTimestamp::new(0, 0);
let (key_a, _) = encode(row_a, hlc);
let (key_b, _) = encode(row_b, hlc);
assert_ne!(key_a, key_b);
}
#[test]
fn newer_hlc_sorts_before_older_for_same_pk() {
let row = &[TempestValue::Int64(1), TempestValue::String("alice".into())];
let (key_old, _) = encode(row, HlcTimestamp::new(1, 0));
let (key_new, _) = encode(row, HlcTimestamp::new(2, 0));
assert!(EngineComparer::compare_physical(&key_new, &key_old).is_lt());
}