use crate::schema::TableSchema;
use crate::types::ComparatorType;
use crate::{Error, Result, Value};
pub fn decode_partition_key_columns(
data: &[u8],
schema: &TableSchema,
) -> Result<Vec<(String, Value)>> {
if schema.partition_keys.is_empty() {
return Err(Error::InvalidInput(
"Schema has no partition keys".to_string(),
));
}
if data.is_empty() {
return Err(Error::InvalidInput("Empty partition key bytes".to_string()));
}
let mut columns = Vec::with_capacity(schema.partition_keys.len());
if schema.partition_keys.len() == 1 {
let key_col = &schema.partition_keys[0];
let comparator = ComparatorType::from_data_type(&key_col.data_type)?;
let value = deserialize_value_bytes(data, &comparator)?;
columns.push((key_col.name.clone(), value));
} else {
let mut offset = 0;
for key_col in &schema.partition_keys {
if offset + 2 > data.len() {
return Err(Error::InvalidInput(format!(
"Truncated multi-component partition key at offset {}",
offset
)));
}
let len = u16::from_be_bytes([data[offset], data[offset + 1]]) as usize;
offset += 2;
if offset + len > data.len() {
return Err(Error::InvalidInput(format!(
"Partition key component extends beyond data: offset={}, len={}, data_len={}",
offset,
len,
data.len()
)));
}
let comparator = ComparatorType::from_data_type(&key_col.data_type)?;
let value = deserialize_value_bytes(&data[offset..offset + len], &comparator)?;
columns.push((key_col.name.clone(), value));
offset += len;
if offset < data.len() {
offset += 1;
}
}
}
Ok(columns)
}
pub fn deserialize_value_bytes(data: &[u8], comparator: &ComparatorType) -> Result<Value> {
match comparator {
ComparatorType::Boolean => {
if data.is_empty() {
return Err(Error::InvalidInput("Empty boolean value".to_string()));
}
Ok(Value::Boolean(data[0] != 0))
}
ComparatorType::TinyInt => {
if data.is_empty() {
return Err(Error::InvalidInput("Empty tinyint value".to_string()));
}
Ok(Value::TinyInt(data[0] as i8))
}
ComparatorType::SmallInt => {
let bytes: [u8; 2] = data.try_into().map_err(|_| {
Error::InvalidInput(format!("SmallInt requires 2 bytes, got {}", data.len()))
})?;
Ok(Value::SmallInt(i16::from_be_bytes(bytes)))
}
ComparatorType::Int => {
let bytes: [u8; 4] = data.try_into().map_err(|_| {
Error::InvalidInput(format!("Int requires 4 bytes, got {}", data.len()))
})?;
Ok(Value::Integer(i32::from_be_bytes(bytes)))
}
ComparatorType::BigInt => {
let bytes: [u8; 8] = data.try_into().map_err(|_| {
Error::InvalidInput(format!("BigInt requires 8 bytes, got {}", data.len()))
})?;
Ok(Value::BigInt(i64::from_be_bytes(bytes)))
}
ComparatorType::Counter => {
let bytes: [u8; 8] = data.try_into().map_err(|_| {
Error::InvalidInput(format!("Counter requires 8 bytes, got {}", data.len()))
})?;
Ok(Value::Counter(i64::from_be_bytes(bytes)))
}
ComparatorType::Float32 => {
let bytes: [u8; 4] = data.try_into().map_err(|_| {
Error::InvalidInput(format!("Float32 requires 4 bytes, got {}", data.len()))
})?;
Ok(Value::Float32(f32::from_bits(u32::from_be_bytes(bytes))))
}
ComparatorType::Float => {
let bytes: [u8; 8] = data.try_into().map_err(|_| {
Error::InvalidInput(format!("Float requires 8 bytes, got {}", data.len()))
})?;
Ok(Value::Float(f64::from_bits(u64::from_be_bytes(bytes))))
}
ComparatorType::Text => {
let s = String::from_utf8(data.to_vec())
.map_err(|e| Error::InvalidInput(format!("Invalid UTF-8 in text value: {}", e)))?;
Ok(Value::Text(s))
}
ComparatorType::Blob => Ok(Value::Blob(data.to_vec())),
ComparatorType::Timestamp => {
let bytes: [u8; 8] = data.try_into().map_err(|_| {
Error::InvalidInput(format!("Timestamp requires 8 bytes, got {}", data.len()))
})?;
Ok(Value::Timestamp(i64::from_be_bytes(bytes)))
}
ComparatorType::Date => {
let bytes: [u8; 4] = data.try_into().map_err(|_| {
Error::InvalidInput(format!("Date requires 4 bytes, got {}", data.len()))
})?;
let stored = u32::from_be_bytes(bytes);
Ok(Value::Date((stored as i32).wrapping_add(i32::MIN)))
}
ComparatorType::Uuid => {
let bytes: [u8; 16] = data.try_into().map_err(|_| {
Error::InvalidInput(format!("UUID requires 16 bytes, got {}", data.len()))
})?;
Ok(Value::Uuid(bytes))
}
ComparatorType::Custom(name) if name == "time" => {
let bytes: [u8; 8] = data.try_into().map_err(|_| {
Error::InvalidInput(format!("Time requires 8 bytes, got {}", data.len()))
})?;
Ok(Value::Time(i64::from_be_bytes(bytes)))
}
ComparatorType::Custom(name) if name == "inet" => Ok(Value::Inet(data.to_vec())),
ComparatorType::Varint => Ok(Value::Varint(data.to_vec())),
ComparatorType::Decimal => {
if data.len() < 4 {
return Err(Error::InvalidInput(format!(
"Decimal requires at least 4 bytes, got {}",
data.len()
)));
}
let scale_bytes: [u8; 4] = data[..4]
.try_into()
.map_err(|_| Error::InvalidInput("Decimal scale conversion failed".to_string()))?;
let scale = i32::from_be_bytes(scale_bytes);
let unscaled = data[4..].to_vec();
Ok(Value::Decimal { scale, unscaled })
}
ComparatorType::Duration => {
if data.len() < 16 {
return Err(Error::InvalidInput(format!(
"Duration requires 16 bytes, got {}",
data.len()
)));
}
let months = i32::from_be_bytes(data[..4].try_into().map_err(|_| {
Error::InvalidInput("Duration months conversion failed".to_string())
})?);
let days =
i32::from_be_bytes(data[4..8].try_into().map_err(|_| {
Error::InvalidInput("Duration days conversion failed".to_string())
})?);
let nanos = i64::from_be_bytes(data[8..16].try_into().map_err(|_| {
Error::InvalidInput("Duration nanos conversion failed".to_string())
})?);
Ok(Value::Duration {
months,
days,
nanos,
})
}
_ => Err(Error::InvalidInput(format!(
"Unsupported comparator for deserialization: {:?}",
comparator
))),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::schema::KeyColumn;
use std::collections::HashMap;
fn schema_with_pks(pks: &[(&str, &str)]) -> TableSchema {
TableSchema {
keyspace: "ks".to_string(),
table: "t".to_string(),
partition_keys: pks
.iter()
.enumerate()
.map(|(i, (name, ty))| KeyColumn {
name: name.to_string(),
data_type: ty.to_string(),
position: i,
})
.collect(),
clustering_keys: vec![],
columns: vec![],
comments: HashMap::new(),
}
}
#[test]
fn single_text_pk_is_raw_bytes() {
let schema = schema_with_pks(&[("id", "text")]);
let cols = decode_partition_key_columns(b"k0000000000000000", &schema).unwrap();
assert_eq!(
cols,
vec![(
"id".to_string(),
Value::Text("k0000000000000000".to_string())
)]
);
}
#[test]
fn single_uuid_pk_is_raw_16_bytes() {
let schema = schema_with_pks(&[("id", "uuid")]);
let raw = [
0u8, 35, 236, 231, 124, 78, 71, 5, 144, 104, 209, 165, 158, 197, 254, 25,
];
let cols = decode_partition_key_columns(&raw, &schema).unwrap();
assert_eq!(cols, vec![("id".to_string(), Value::Uuid(raw))]);
}
#[test]
fn composite_text_pk_uses_framing_per_component() {
let schema = schema_with_pks(&[("application_id", "text"), ("metric_name", "text")]);
let data = [0u8, 1, b'a', 0, 0, 4, b'v', b'i', b'e', b'w', 0];
let cols = decode_partition_key_columns(&data, &schema).unwrap();
assert_eq!(
cols,
vec![
("application_id".to_string(), Value::Text("a".to_string())),
("metric_name".to_string(), Value::Text("view".to_string())),
]
);
}
#[test]
fn composite_text_and_date_decode_by_type() {
let schema = schema_with_pks(&[("symbol", "text"), ("trading_day", "date")]);
let data = [0u8, 4, b'A', b'A', b'P', b'L', 0, 0, 4, 128, 0, 79, 136, 0];
let cols = decode_partition_key_columns(&data, &schema).unwrap();
assert_eq!(
cols[0],
("symbol".to_string(), Value::Text("AAPL".to_string()))
);
assert_eq!(cols[1].0, "trading_day");
assert!(
matches!(cols[1].1, Value::Date(_)),
"date component must decode to Value::Date, got {:?}",
cols[1].1
);
}
#[test]
fn empty_bytes_error() {
let schema = schema_with_pks(&[("id", "text")]);
assert!(decode_partition_key_columns(&[], &schema).is_err());
}
}