use crate::{
schema::{registry::ParsingContext, CqlType},
types::{ComparatorType, Value},
Error, Result,
};
use std::collections::HashMap;
#[derive(Debug)]
pub struct SchemaParser {
context: ParsingContext,
}
impl SchemaParser {
pub fn new(context: ParsingContext) -> Result<Self> {
if !context.is_complete() {
return Err(Error::Schema(
"Incomplete parsing context: schema must be fully defined".to_string(),
));
}
Ok(Self { context })
}
pub fn parse_partition_key(&self, data: &[u8]) -> Result<Vec<Value>> {
if self.context.partition_comparators.is_empty() {
return Err(Error::Schema(
"No partition key comparators defined in schema".to_string(),
));
}
let mut values = Vec::new();
let mut offset = 0;
for (idx, comparator) in self.context.partition_comparators.iter().enumerate() {
let key_column = &self.context.schema.partition_keys[idx];
let (value, consumed) = self.parse_value_with_comparator(
&data[offset..],
comparator,
&key_column.data_type,
)?;
values.push(value);
offset += consumed;
}
Ok(values)
}
pub fn parse_clustering_keys(&self, data: &[u8]) -> Result<Vec<Value>> {
if self.context.clustering_comparators.is_empty() {
return Ok(Vec::new()); }
let mut values = Vec::new();
let mut offset = 0;
for (idx, comparator) in self.context.clustering_comparators.iter().enumerate() {
if offset >= data.len() {
break; }
let key_column = &self.context.schema.clustering_keys[idx];
let (value, consumed) = self.parse_value_with_comparator(
&data[offset..],
comparator,
&key_column.data_type,
)?;
values.push(value);
offset += consumed;
}
Ok(values)
}
pub fn parse_column_value(&self, column_name: &str, data: &[u8]) -> Result<(Value, usize)> {
let comparator = self
.context
.get_column_comparator(column_name)
.ok_or_else(|| {
Error::Schema(format!(
"Column '{}' not found in schema for {}.{}",
column_name, self.context.schema.keyspace, self.context.schema.table
))
})?;
let _column = self
.context
.schema
.columns
.iter()
.find(|c| c.name == column_name)
.ok_or_else(|| Error::Schema(format!("Column '{}' not found", column_name)))?;
self.parse_value_with_provided_comparator(data, comparator)
}
fn parse_value_with_comparator(
&self,
data: &[u8],
comparator: &ComparatorType,
type_str: &str,
) -> Result<(Value, usize)> {
let cql_type = CqlType::parse(type_str)?;
self.parse_typed_value(data, &cql_type, comparator)
}
fn parse_value_with_provided_comparator(
&self,
data: &[u8],
comparator: &ComparatorType,
) -> Result<(Value, usize)> {
let cql_type = self.comparator_to_cql_type(comparator)?;
self.parse_typed_value(data, &cql_type, comparator)
}
fn parse_typed_value(
&self,
data: &[u8],
cql_type: &CqlType,
comparator: &ComparatorType,
) -> Result<(Value, usize)> {
match cql_type {
CqlType::Boolean => self.parse_boolean(data),
CqlType::TinyInt => self.parse_tinyint(data),
CqlType::SmallInt => self.parse_smallint(data),
CqlType::Int => self.parse_int(data),
CqlType::BigInt => self.parse_bigint(data),
CqlType::Counter => self.parse_counter(data),
CqlType::Float => self.parse_float(data),
CqlType::Double => self.parse_double(data),
CqlType::Text | CqlType::Varchar | CqlType::Ascii => self.parse_text(data),
CqlType::Blob => self.parse_blob(data),
CqlType::Timestamp => self.parse_timestamp(data),
CqlType::Uuid | CqlType::TimeUuid => self.parse_uuid(data),
CqlType::List(elem_type) => self.parse_list(data, elem_type, comparator),
CqlType::Set(elem_type) => self.parse_set(data, elem_type, comparator),
CqlType::Map(key_type, val_type) => {
self.parse_map(data, key_type, val_type, comparator)
}
CqlType::Tuple(field_types) => self.parse_tuple(data, field_types, comparator),
CqlType::Udt(type_name, fields) => self.parse_udt(data, type_name, fields, comparator),
CqlType::Frozen(inner_type) => self.parse_frozen(data, inner_type, comparator),
_ => Err(Error::Schema(format!(
"Unsupported type for schema-driven parsing: {:?}",
cql_type
))),
}
}
fn parse_boolean(&self, data: &[u8]) -> Result<(Value, usize)> {
if data.is_empty() {
return Err(Error::schema("Insufficient data for boolean".to_string()));
}
Ok((Value::Boolean(data[0] != 0), 1))
}
fn parse_tinyint(&self, data: &[u8]) -> Result<(Value, usize)> {
if data.is_empty() {
return Err(Error::schema("Insufficient data for tinyint".to_string()));
}
Ok((Value::TinyInt(data[0] as i8), 1))
}
fn parse_smallint(&self, data: &[u8]) -> Result<(Value, usize)> {
if data.len() < 2 {
return Err(Error::schema("Insufficient data for smallint".to_string()));
}
let value = i16::from_be_bytes([data[0], data[1]]);
Ok((Value::SmallInt(value), 2))
}
fn parse_int(&self, data: &[u8]) -> Result<(Value, usize)> {
if data.len() < 4 {
return Err(Error::schema("Insufficient data for int".to_string()));
}
let value = i32::from_be_bytes([data[0], data[1], data[2], data[3]]);
Ok((Value::Integer(value), 4))
}
fn parse_bigint(&self, data: &[u8]) -> Result<(Value, usize)> {
if data.len() < 8 {
return Err(Error::schema("Insufficient data for bigint".to_string()));
}
let mut bytes = [0u8; 8];
bytes.copy_from_slice(&data[0..8]);
let value = i64::from_be_bytes(bytes);
Ok((Value::BigInt(value), 8))
}
fn parse_counter(&self, data: &[u8]) -> Result<(Value, usize)> {
if data.len() < 8 {
return Err(Error::schema("Insufficient data for counter".to_string()));
}
let mut bytes = [0u8; 8];
bytes.copy_from_slice(&data[0..8]);
let value = i64::from_be_bytes(bytes);
Ok((Value::Counter(value), 8))
}
fn parse_float(&self, data: &[u8]) -> Result<(Value, usize)> {
if data.len() < 4 {
return Err(Error::schema("Insufficient data for float".to_string()));
}
let mut bytes = [0u8; 4];
bytes.copy_from_slice(&data[0..4]);
let value = f32::from_be_bytes(bytes);
Ok((Value::Float32(value), 4))
}
fn parse_double(&self, data: &[u8]) -> Result<(Value, usize)> {
if data.len() < 8 {
return Err(Error::schema("Insufficient data for double".to_string()));
}
let mut bytes = [0u8; 8];
bytes.copy_from_slice(&data[0..8]);
let value = f64::from_be_bytes(bytes);
Ok((Value::Float(value), 8))
}
fn parse_text(&self, data: &[u8]) -> Result<(Value, usize)> {
if data.len() < 4 {
return Err(Error::schema(
"Insufficient data for text length".to_string(),
));
}
let len = i32::from_be_bytes([data[0], data[1], data[2], data[3]]) as usize;
if data.len() < 4 + len {
return Err(Error::schema(
"Insufficient data for text content".to_string(),
));
}
let text = String::from_utf8(data[4..4 + len].to_vec())
.map_err(|e| Error::schema(format!("Invalid UTF-8: {}", e)))?;
Ok((Value::Text(text), 4 + len))
}
fn parse_blob(&self, data: &[u8]) -> Result<(Value, usize)> {
if data.len() < 4 {
return Err(Error::schema(
"Insufficient data for blob length".to_string(),
));
}
let len = i32::from_be_bytes([data[0], data[1], data[2], data[3]]) as usize;
if data.len() < 4 + len {
return Err(Error::schema(
"Insufficient data for blob content".to_string(),
));
}
Ok((Value::Blob(data[4..4 + len].to_vec()), 4 + len))
}
fn parse_timestamp(&self, data: &[u8]) -> Result<(Value, usize)> {
if data.len() < 8 {
return Err(Error::schema("Insufficient data for timestamp".to_string()));
}
let mut bytes = [0u8; 8];
bytes.copy_from_slice(&data[0..8]);
let millis = i64::from_be_bytes(bytes);
Ok((Value::Timestamp(millis), 8))
}
fn parse_uuid(&self, data: &[u8]) -> Result<(Value, usize)> {
if data.len() < 16 {
return Err(Error::schema("Insufficient data for UUID".to_string()));
}
let uuid_bytes: [u8; 16] = data[0..16]
.try_into()
.map_err(|_| Error::schema("Invalid UUID bytes".to_string()))?;
Ok((Value::Uuid(uuid_bytes), 16))
}
fn parse_list(
&self,
data: &[u8],
elem_type: &CqlType,
_comparator: &ComparatorType,
) -> Result<(Value, usize)> {
let mut offset = 0;
if data.len() < 4 {
return Err(Error::schema("Insufficient data for list size".to_string()));
}
let count = i32::from_be_bytes([data[0], data[1], data[2], data[3]]) as usize;
offset += 4;
let mut elements = Vec::with_capacity(count);
let elem_comparator = ComparatorType::from_cql_type(elem_type)?;
for _ in 0..count {
let (value, consumed) =
self.parse_typed_value(&data[offset..], elem_type, &elem_comparator)?;
elements.push(value);
offset += consumed;
}
Ok((Value::List(elements), offset))
}
fn parse_set(
&self,
data: &[u8],
elem_type: &CqlType,
_comparator: &ComparatorType,
) -> Result<(Value, usize)> {
let mut offset = 0;
if data.len() < 4 {
return Err(Error::schema("Insufficient data for set size".to_string()));
}
let count = i32::from_be_bytes([data[0], data[1], data[2], data[3]]) as usize;
offset += 4;
let mut elements = Vec::with_capacity(count);
let elem_comparator = ComparatorType::from_cql_type(elem_type)?;
for _ in 0..count {
let (value, consumed) =
self.parse_typed_value(&data[offset..], elem_type, &elem_comparator)?;
elements.push(value);
offset += consumed;
}
Ok((Value::Set(elements), offset))
}
fn parse_map(
&self,
data: &[u8],
key_type: &CqlType,
val_type: &CqlType,
_comparator: &ComparatorType,
) -> Result<(Value, usize)> {
let mut offset = 0;
if data.len() < 4 {
return Err(Error::schema("Insufficient data for map size".to_string()));
}
let count = i32::from_be_bytes([data[0], data[1], data[2], data[3]]) as usize;
offset += 4;
let mut map = Vec::with_capacity(count);
let key_comparator = ComparatorType::from_cql_type(key_type)?;
let val_comparator = ComparatorType::from_cql_type(val_type)?;
for _ in 0..count {
let (key, key_consumed) =
self.parse_typed_value(&data[offset..], key_type, &key_comparator)?;
offset += key_consumed;
let (value, val_consumed) =
self.parse_typed_value(&data[offset..], val_type, &val_comparator)?;
offset += val_consumed;
map.push((key, value));
}
Ok((Value::Map(map), offset))
}
fn parse_tuple(
&self,
data: &[u8],
field_types: &[CqlType],
_comparator: &ComparatorType,
) -> Result<(Value, usize)> {
let mut offset = 0;
let mut values = Vec::with_capacity(field_types.len());
for field_type in field_types {
let field_comparator = ComparatorType::from_cql_type(field_type)?;
let (value, consumed) =
self.parse_typed_value(&data[offset..], field_type, &field_comparator)?;
values.push(value);
offset += consumed;
}
Ok((Value::Tuple(values), offset))
}
fn parse_udt(
&self,
data: &[u8],
type_name: &str,
fields: &[(String, CqlType)],
_comparator: &ComparatorType,
) -> Result<(Value, usize)> {
let mut offset = 0;
let mut field_values = Vec::with_capacity(fields.len());
for (field_name, field_type) in fields {
let field_comparator = ComparatorType::from_cql_type(field_type)?;
if data.len() >= offset + 4 {
let field_len = i32::from_be_bytes([
data[offset],
data[offset + 1],
data[offset + 2],
data[offset + 3],
]);
if field_len < 0 {
field_values.push(crate::types::UdtField {
name: field_name.clone(),
value: None,
});
offset += 4;
continue;
}
}
let (value, consumed) =
self.parse_typed_value(&data[offset..], field_type, &field_comparator)?;
field_values.push(crate::types::UdtField {
name: field_name.clone(),
value: Some(value),
});
offset += consumed;
}
Ok((
Value::Udt(crate::types::UdtValue {
type_name: type_name.to_string(),
keyspace: self.context.schema.keyspace.clone(),
fields: field_values,
}),
offset,
))
}
fn parse_frozen(
&self,
data: &[u8],
inner_type: &CqlType,
_comparator: &ComparatorType,
) -> Result<(Value, usize)> {
let inner_comparator = ComparatorType::from_cql_type(inner_type)?;
let (inner_value, consumed) =
self.parse_typed_value(data, inner_type, &inner_comparator)?;
Ok((Value::Frozen(Box::new(inner_value)), consumed))
}
pub fn parse_row(&self, data: &[u8]) -> Result<HashMap<String, Value>> {
let mut row = HashMap::new();
let mut offset = 0;
for column in &self.context.schema.columns {
if offset >= data.len() {
row.insert(column.name.clone(), Value::Null);
continue;
}
if data.len() >= offset + 4 {
let value_len = i32::from_be_bytes([
data[offset],
data[offset + 1],
data[offset + 2],
data[offset + 3],
]);
if value_len < 0 {
row.insert(column.name.clone(), Value::Null);
offset += 4;
continue;
}
}
let (value, consumed) = self.parse_column_value(&column.name, &data[offset..])?;
row.insert(column.name.clone(), value);
offset += consumed;
}
Ok(row)
}
#[allow(clippy::only_used_in_recursion)]
fn comparator_to_cql_type(&self, comparator: &ComparatorType) -> Result<CqlType> {
match comparator {
ComparatorType::Boolean => Ok(CqlType::Boolean),
ComparatorType::TinyInt => Ok(CqlType::TinyInt),
ComparatorType::SmallInt => Ok(CqlType::SmallInt),
ComparatorType::Int => Ok(CqlType::Int),
ComparatorType::BigInt => Ok(CqlType::BigInt),
ComparatorType::Counter => Ok(CqlType::Counter),
ComparatorType::Float32 => Ok(CqlType::Float),
ComparatorType::Float => Ok(CqlType::Double),
ComparatorType::Text => Ok(CqlType::Text),
ComparatorType::Blob => Ok(CqlType::Blob),
ComparatorType::Timestamp => Ok(CqlType::Timestamp),
ComparatorType::Uuid => Ok(CqlType::Uuid),
ComparatorType::Varint => Ok(CqlType::Custom("varint".to_string())),
ComparatorType::Decimal => Ok(CqlType::Decimal),
ComparatorType::Duration => Ok(CqlType::Duration),
ComparatorType::Date => Ok(CqlType::Date),
ComparatorType::Json => Ok(CqlType::Custom("json".to_string())),
ComparatorType::List(elem_comparator) => {
let elem_type = self.comparator_to_cql_type(elem_comparator)?;
Ok(CqlType::List(Box::new(elem_type)))
}
ComparatorType::Set(elem_comparator) => {
let elem_type = self.comparator_to_cql_type(elem_comparator)?;
Ok(CqlType::Set(Box::new(elem_type)))
}
ComparatorType::Map(key_comparator, val_comparator) => {
let key_type = self.comparator_to_cql_type(key_comparator)?;
let val_type = self.comparator_to_cql_type(val_comparator)?;
Ok(CqlType::Map(Box::new(key_type), Box::new(val_type)))
}
ComparatorType::Tuple(field_comparators) => {
let mut field_types = Vec::new();
for field_comparator in field_comparators {
field_types.push(self.comparator_to_cql_type(field_comparator)?);
}
Ok(CqlType::Tuple(field_types))
}
ComparatorType::Udt {
type_name,
field_comparators,
..
} => {
let mut fields = Vec::new();
for (field_name, field_comparator) in field_comparators {
let field_type = self.comparator_to_cql_type(field_comparator)?;
fields.push((field_name.clone(), field_type));
}
Ok(CqlType::Udt(type_name.clone(), fields))
}
ComparatorType::Frozen(inner_comparator) => {
let inner_type = self.comparator_to_cql_type(inner_comparator)?;
Ok(CqlType::Frozen(Box::new(inner_type)))
}
ComparatorType::Custom(type_name) => Ok(CqlType::Custom(type_name.clone())),
}
}
}
#[cfg(test)]
#[path = "parser_tests.rs"]
mod tests;