use crate::error::{Error, Result};
use crate::schema::{ClusteringOrder, TableSchema};
use crate::types::{ComparatorType, Value};
use std::cmp::Ordering;
#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub struct TableId {
pub keyspace: String,
pub table: String,
}
impl TableId {
pub fn new(keyspace: impl Into<String>, table: impl Into<String>) -> Self {
Self {
keyspace: keyspace.into(),
table: table.into(),
}
}
pub fn qualified_name(&self) -> String {
format!("{}.{}", self.keyspace, self.table)
}
}
impl std::fmt::Display for TableId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}.{}", self.keyspace, self.table)
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Mutation {
pub table: TableId,
pub partition_key: PartitionKey,
pub clustering_key: Option<ClusteringKey>,
pub operations: Vec<CellOperation>,
pub timestamp_micros: i64,
pub ttl_seconds: Option<u32>,
pub partition_tombstone: Option<PartitionTombstone>,
pub range_tombstones: Vec<RangeTombstone>,
}
impl Mutation {
pub fn new(
table: TableId,
partition_key: PartitionKey,
clustering_key: Option<ClusteringKey>,
operations: Vec<CellOperation>,
timestamp_micros: i64,
ttl_seconds: Option<u32>,
) -> Self {
Self {
table,
partition_key,
clustering_key,
operations,
timestamp_micros,
ttl_seconds,
partition_tombstone: None,
range_tombstones: Vec::new(),
}
}
pub fn decorated_key(&self, schema: &TableSchema) -> Result<DecoratedKey> {
self.partition_key.to_decorated_key(schema)
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct PartitionTombstone {
pub deletion_time: i64,
pub local_deletion_time: i32,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct RangeTombstone {
pub start: ClusteringBound,
pub end: ClusteringBound,
pub deletion_time: i64,
pub local_deletion_time: i32,
}
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
pub enum ClusteringBound {
Inclusive(ClusteringKey),
Exclusive(ClusteringKey),
Bottom,
Top,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum CellOperation {
Write {
column: String,
value: Value,
},
WriteWithTtl {
column: String,
value: Value,
ttl_seconds: u32,
},
Delete {
column: String,
},
DeleteRow,
}
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct PartitionKey {
pub columns: Vec<(String, Value)>,
}
impl PartitionKey {
pub fn new(columns: Vec<(String, Value)>) -> Self {
Self { columns }
}
pub fn single(column: impl Into<String>, value: Value) -> Self {
Self {
columns: vec![(column.into(), value)],
}
}
pub fn to_bytes(&self, schema: &TableSchema) -> Result<Vec<u8>> {
if self.columns.is_empty() {
return Err(Error::InvalidInput("Empty partition key".to_string()));
}
if self.columns.len() != schema.partition_keys.len() {
return Err(Error::InvalidInput(format!(
"Partition key column count mismatch: expected {}, got {}",
schema.partition_keys.len(),
self.columns.len()
)));
}
let mut result = Vec::new();
if self.columns.len() == 1 {
let value_bytes =
self.serialize_value(&self.columns[0].1, &schema.partition_keys[0])?;
result.extend_from_slice(&value_bytes);
return Ok(result);
}
for (i, (_, value)) in self.columns.iter().enumerate() {
let value_bytes = self.serialize_value(value, &schema.partition_keys[i])?;
let len = value_bytes.len();
if len > u16::MAX as usize {
return Err(Error::InvalidInput(format!(
"Partition key component too large: {} bytes",
len
)));
}
result.extend_from_slice(&(len as u16).to_be_bytes());
result.extend_from_slice(&value_bytes);
result.push(0x00);
}
Ok(result)
}
pub fn to_decorated_key(&self, schema: &TableSchema) -> Result<DecoratedKey> {
let key_bytes = self.to_bytes(schema)?;
let token = calculate_murmur3_token(&key_bytes)?;
Ok(DecoratedKey::new(token, key_bytes))
}
pub fn from_bytes(data: &[u8], schema: &TableSchema) -> Result<Self> {
if schema.partition_keys.is_empty() {
return Err(Error::InvalidInput(
"Schema has no partition keys".to_string(),
));
}
if data.is_empty() {
return Err(Error::InvalidInput("Empty partition key bytes".to_string()));
}
let columns =
crate::storage::partition_key_codec::decode_partition_key_columns(data, schema)?;
Ok(PartitionKey { columns })
}
fn serialize_value(
&self,
value: &Value,
key_column: &crate::schema::KeyColumn,
) -> Result<Vec<u8>> {
let comparator = ComparatorType::from_data_type(&key_column.data_type)?;
serialize_value_bytes(value, &comparator)
}
}
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct ClusteringKey {
pub columns: Vec<(String, Value)>,
}
impl ClusteringKey {
pub fn new(columns: Vec<(String, Value)>) -> Self {
Self { columns }
}
pub fn single(column: impl Into<String>, value: Value) -> Self {
Self {
columns: vec![(column.into(), value)],
}
}
pub fn compare(&self, other: &Self, schema: &TableSchema) -> Result<Ordering> {
for (i, ((_, a_val), (_, b_val))) in
self.columns.iter().zip(other.columns.iter()).enumerate()
{
if i >= schema.clustering_keys.len() {
return Err(Error::Schema(format!(
"Clustering key has more columns than schema: {} > {}",
i + 1,
schema.clustering_keys.len()
)));
}
let cluster_col = &schema.clustering_keys[i];
let ordering = compare_values(a_val, b_val)?;
let final_ordering = if cluster_col.order == ClusteringOrder::Desc {
ordering.reverse()
} else {
ordering
};
if final_ordering != Ordering::Equal {
return Ok(final_ordering);
}
}
Ok(Ordering::Equal)
}
}
impl Ord for ClusteringKey {
fn cmp(&self, other: &Self) -> Ordering {
for ((_, a_val), (_, b_val)) in self.columns.iter().zip(other.columns.iter()) {
let ordering = compare_values(a_val, b_val).unwrap_or_else(|_| {
format!("{a_val:?}").cmp(&format!("{b_val:?}"))
});
if ordering != Ordering::Equal {
return ordering;
}
}
self.columns.len().cmp(&other.columns.len())
}
}
impl PartialOrd for ClusteringKey {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Eq for ClusteringKey {}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct DecoratedKey {
pub token: i64,
pub key: Vec<u8>,
}
impl DecoratedKey {
pub fn new(token: i64, key: Vec<u8>) -> Self {
Self { token, key }
}
pub fn from_key_bytes(key_bytes: Vec<u8>) -> Result<Self> {
let token = calculate_murmur3_token(&key_bytes)?;
Ok(Self::new(token, key_bytes))
}
}
impl Ord for DecoratedKey {
fn cmp(&self, other: &Self) -> Ordering {
match self.token.cmp(&other.token) {
Ordering::Equal => {
self.key.cmp(&other.key)
}
other => other,
}
}
}
impl PartialOrd for DecoratedKey {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
fn calculate_murmur3_token(key_bytes: &[u8]) -> Result<i64> {
if key_bytes.is_empty() {
return Ok(i64::MIN);
}
Ok(crate::util::cassandra_murmur3::cassandra_murmur3_token(
key_bytes,
))
}
fn serialize_value_bytes(value: &Value, comparator: &ComparatorType) -> Result<Vec<u8>> {
match (value, comparator) {
(Value::Null, _) => Ok(Vec::new()),
(Value::Boolean(b), ComparatorType::Boolean) => Ok(vec![if *b { 1 } else { 0 }]),
(Value::TinyInt(n), ComparatorType::TinyInt) => Ok(vec![*n as u8]),
(Value::SmallInt(n), ComparatorType::SmallInt) => Ok(n.to_be_bytes().to_vec()),
(Value::Integer(n), ComparatorType::Int) => Ok(n.to_be_bytes().to_vec()),
(Value::BigInt(n), ComparatorType::BigInt) => Ok(n.to_be_bytes().to_vec()),
(Value::Counter(n), ComparatorType::Counter) => Ok(n.to_be_bytes().to_vec()),
(Value::Float32(f), ComparatorType::Float32) => Ok(f.to_bits().to_be_bytes().to_vec()),
(Value::Float(f), ComparatorType::Float) => Ok(f.to_bits().to_be_bytes().to_vec()),
(Value::Text(s), ComparatorType::Text) => Ok(s.as_bytes().to_vec()),
(Value::Blob(bytes), ComparatorType::Blob) => Ok(bytes.clone()),
(Value::Timestamp(millis), ComparatorType::Timestamp) => Ok(millis.to_be_bytes().to_vec()),
(Value::Date(days), ComparatorType::Date) => {
let stored = days.wrapping_sub(i32::MIN) as u32;
Ok(stored.to_be_bytes().to_vec())
}
(Value::Uuid(bytes), ComparatorType::Uuid) => Ok(bytes.to_vec()),
(Value::Time(nanos), ComparatorType::Custom(name)) if name == "time" => {
Ok(nanos.to_be_bytes().to_vec())
}
(Value::Inet(bytes), ComparatorType::Custom(name)) if name == "inet" => Ok(bytes.clone()),
(Value::Varint(bytes), ComparatorType::Varint) => Ok(bytes.clone()),
(Value::Decimal { scale, unscaled }, ComparatorType::Decimal) => {
let mut result = Vec::new();
result.extend_from_slice(&scale.to_be_bytes());
result.extend_from_slice(unscaled);
Ok(result)
}
(
Value::Duration {
months,
days,
nanos,
},
ComparatorType::Duration,
) => {
let mut result = Vec::new();
result.extend_from_slice(&months.to_be_bytes());
result.extend_from_slice(&days.to_be_bytes());
result.extend_from_slice(&nanos.to_be_bytes());
Ok(result)
}
_ => Err(Error::InvalidInput(format!(
"Type mismatch: value {:?} does not match comparator {:?}",
value, comparator
))),
}
}
fn compare_values(a: &Value, b: &Value) -> Result<Ordering> {
use Value::*;
match (a, b) {
(Null, Null) => Ok(Ordering::Equal),
(Null, _) => Ok(Ordering::Less),
(_, Null) => Ok(Ordering::Greater),
(Boolean(a), Boolean(b)) => Ok(a.cmp(b)),
(TinyInt(a), TinyInt(b)) => Ok(a.cmp(b)),
(SmallInt(a), SmallInt(b)) => Ok(a.cmp(b)),
(Integer(a), Integer(b)) => Ok(a.cmp(b)),
(BigInt(a), BigInt(b)) => Ok(a.cmp(b)),
(Counter(a), Counter(b)) => Ok(a.cmp(b)),
(Float32(a), Float32(b)) => Ok(a.partial_cmp(b).unwrap_or(Ordering::Equal)),
(Float(a), Float(b)) => Ok(a.partial_cmp(b).unwrap_or(Ordering::Equal)),
(Text(a), Text(b)) => Ok(a.cmp(b)),
(Blob(a), Blob(b)) => Ok(a.cmp(b)),
(Timestamp(a), Timestamp(b)) => Ok(a.cmp(b)),
(Date(a), Date(b)) => Ok(a.cmp(b)),
(Time(a), Time(b)) => Ok(a.cmp(b)),
(Uuid(a), Uuid(b)) => Ok(a.cmp(b)),
(Inet(a), Inet(b)) => Ok(a.cmp(b)),
(List(a), List(b)) | (Set(a), Set(b)) => {
for (elem_a, elem_b) in a.iter().zip(b.iter()) {
let ord = compare_values(elem_a, elem_b)?;
if ord != Ordering::Equal {
return Ok(ord);
}
}
Ok(a.len().cmp(&b.len()))
}
(Map(a), Map(b)) => {
for ((ka, va), (kb, vb)) in a.iter().zip(b.iter()) {
let key_ord = compare_values(ka, kb)?;
if key_ord != Ordering::Equal {
return Ok(key_ord);
}
let val_ord = compare_values(va, vb)?;
if val_ord != Ordering::Equal {
return Ok(val_ord);
}
}
Ok(a.len().cmp(&b.len()))
}
(Tuple(a), Tuple(b)) => {
for (fa, fb) in a.iter().zip(b.iter()) {
let ord = compare_values(fa, fb)?;
if ord != Ordering::Equal {
return Ok(ord);
}
}
Ok(a.len().cmp(&b.len()))
}
(Frozen(a), Frozen(b)) => compare_values(a, b),
_ => Err(Error::InvalidInput(format!(
"Cannot compare values of different types: {:?} vs {:?}",
a, b
))),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::schema::{ClusteringColumn, ClusteringOrder, KeyColumn};
use std::collections::HashMap;
fn create_test_schema(
partition_cols: Vec<(&str, &str)>,
clustering_cols: Vec<(&str, &str, ClusteringOrder)>,
) -> TableSchema {
TableSchema {
keyspace: "test_ks".to_string(),
table: "test_table".to_string(),
partition_keys: partition_cols
.into_iter()
.enumerate()
.map(|(i, (name, data_type))| KeyColumn {
name: name.to_string(),
data_type: data_type.to_string(),
position: i,
})
.collect(),
clustering_keys: clustering_cols
.into_iter()
.enumerate()
.map(|(i, (name, data_type, order))| ClusteringColumn {
name: name.to_string(),
data_type: data_type.to_string(),
position: i,
order,
})
.collect(),
columns: vec![],
comments: HashMap::new(),
}
}
#[test]
fn test_table_id() {
let table_id = TableId::new("my_keyspace", "my_table");
assert_eq!(table_id.keyspace, "my_keyspace");
assert_eq!(table_id.table, "my_table");
assert_eq!(table_id.qualified_name(), "my_keyspace.my_table");
assert_eq!(table_id.to_string(), "my_keyspace.my_table");
}
#[test]
fn test_partition_key_single_int() {
let schema = create_test_schema(vec![("id", "int")], vec![]);
let pk = PartitionKey::single("id", Value::Integer(42));
let bytes = pk.to_bytes(&schema).unwrap();
assert_eq!(bytes, vec![0x00, 0x00, 0x00, 0x2A]);
}
#[test]
fn test_partition_key_multi_component() {
let schema = create_test_schema(vec![("id", "int"), ("name", "text")], vec![]);
let pk = PartitionKey::new(vec![
("id".to_string(), Value::Integer(42)),
("name".to_string(), Value::Text("hello".to_string())),
]);
let bytes = pk.to_bytes(&schema).unwrap();
let expected = vec![
0x00, 0x04, 0x00, 0x00, 0x00, 0x2A, 0x00, 0x00, 0x05, b'h', b'e', b'l', b'l', b'o', 0x00, ];
assert_eq!(bytes, expected);
}
#[test]
fn test_partition_key_three_components() {
let schema = create_test_schema(
vec![("symbol", "text"), ("exchange", "text"), ("bucket", "int")],
vec![],
);
let pk = PartitionKey::new(vec![
("symbol".to_string(), Value::Text("AAPL".to_string())),
("exchange".to_string(), Value::Text("NYSE".to_string())),
("bucket".to_string(), Value::Integer(100)),
]);
let bytes = pk.to_bytes(&schema).unwrap();
let expected = vec![
0x00, 0x04, b'A', b'A', b'P', b'L', 0x00, 0x00, 0x04, b'N', b'Y', b'S', b'E', 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x64, 0x00, ];
assert_eq!(bytes, expected);
}
#[test]
fn test_decorated_key_ordering() {
let dk1 = DecoratedKey::new(100, vec![1, 2, 3]);
let dk2 = DecoratedKey::new(200, vec![1, 2, 3]);
let dk3 = DecoratedKey::new(100, vec![1, 2, 4]);
assert!(dk1 < dk2);
assert!(dk2 > dk1);
assert!(dk1 < dk3);
assert!(dk3 > dk1);
let dk4 = DecoratedKey::new(100, vec![1, 2, 3]);
assert_eq!(dk1, dk4);
}
#[test]
fn test_murmur3_token_empty_key() {
let token = calculate_murmur3_token(&[]).unwrap();
assert_eq!(token, i64::MIN);
}
#[test]
fn test_murmur3_token_deterministic() {
let key_bytes = b"test_key";
let token1 = calculate_murmur3_token(key_bytes).unwrap();
let token2 = calculate_murmur3_token(key_bytes).unwrap();
assert_eq!(token1, token2, "Token calculation should be deterministic");
}
#[test]
fn test_murmur3_token_different_keys() {
let token1 = calculate_murmur3_token(b"key1").unwrap();
let token2 = calculate_murmur3_token(b"key2").unwrap();
assert_ne!(
token1, token2,
"Different keys should produce different tokens"
);
}
#[test]
fn test_murmur3_token_matches_cassandra_for_composite_uuid_key() {
let key_bytes = vec![
0x00, 0x10, 0x0f, 0x0f, 0x0f, 0x0f, 0x00, 0x00, 0x40, 0x00, 0x80, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x10, 0x0f, 0x0f, 0x0f, 0x0f, 0x00, 0x00, 0x40,
0x00, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xaa, 0x00,
];
let token = calculate_murmur3_token(&key_bytes).unwrap();
assert_eq!(token, -5_116_541_970_184_546_410);
}
#[test]
fn test_decorated_key_from_bytes() {
let key_bytes = vec![0x00, 0x00, 0x00, 0x2A]; let dk = DecoratedKey::from_key_bytes(key_bytes.clone()).unwrap();
assert_eq!(dk.key, key_bytes);
let expected_token = calculate_murmur3_token(&key_bytes).unwrap();
assert_eq!(dk.token, expected_token);
}
#[test]
fn test_clustering_key_ordering() {
let schema = create_test_schema(
vec![("id", "int")],
vec![("ts", "timestamp", ClusteringOrder::Asc)],
);
let ck1 = ClusteringKey::single("ts", Value::Timestamp(1000));
let ck2 = ClusteringKey::single("ts", Value::Timestamp(2000));
let ordering = ck1.compare(&ck2, &schema).unwrap();
assert_eq!(ordering, Ordering::Less);
}
#[test]
fn test_clustering_key_desc_ordering() {
let schema = create_test_schema(
vec![("id", "int")],
vec![("ts", "timestamp", ClusteringOrder::Desc)],
);
let ck1 = ClusteringKey::single("ts", Value::Timestamp(1000));
let ck2 = ClusteringKey::single("ts", Value::Timestamp(2000));
let ordering = ck1.compare(&ck2, &schema).unwrap();
assert_eq!(ordering, Ordering::Greater);
}
#[test]
fn test_mutation_creation() {
let table_id = TableId::new("ks", "table");
let pk = PartitionKey::single("id", Value::Integer(1));
let ops = vec![CellOperation::Write {
column: "name".to_string(),
value: Value::Text("Alice".to_string()),
}];
let mutation = Mutation::new(table_id.clone(), pk, None, ops, 1234567890, None);
assert_eq!(mutation.table.keyspace, "ks");
assert_eq!(mutation.table.table, "table");
assert_eq!(mutation.timestamp_micros, 1234567890);
assert_eq!(mutation.ttl_seconds, None);
assert_eq!(mutation.operations.len(), 1);
}
#[test]
fn test_cell_operation_write() {
let op = CellOperation::Write {
column: "age".to_string(),
value: Value::Integer(30),
};
match op {
CellOperation::Write { column, value } => {
assert_eq!(column, "age");
assert_eq!(value, Value::Integer(30));
}
_ => panic!("Expected Write operation"),
}
}
#[test]
fn test_cell_operation_delete() {
let op = CellOperation::Delete {
column: "name".to_string(),
};
match op {
CellOperation::Delete { column } => {
assert_eq!(column, "name");
}
_ => panic!("Expected Delete operation"),
}
}
#[test]
fn test_cell_operation_delete_row() {
let op = CellOperation::DeleteRow;
assert!(matches!(op, CellOperation::DeleteRow));
}
#[test]
fn test_serialize_value_types() {
let bytes = serialize_value_bytes(&Value::Boolean(true), &ComparatorType::Boolean).unwrap();
assert_eq!(bytes, vec![1]);
let bytes = serialize_value_bytes(&Value::Integer(42), &ComparatorType::Int).unwrap();
assert_eq!(bytes, vec![0x00, 0x00, 0x00, 0x2A]);
let bytes = serialize_value_bytes(&Value::Text("hello".to_string()), &ComparatorType::Text)
.unwrap();
assert_eq!(bytes, b"hello");
let uuid_bytes = [
0x01, 0x23, 0x45, 0x67, 0x89, 0xAB, 0xCD, 0xEF, 0x01, 0x23, 0x45, 0x67, 0x89, 0xAB,
0xCD, 0xEF,
];
let bytes = serialize_value_bytes(&Value::Uuid(uuid_bytes), &ComparatorType::Uuid).unwrap();
assert_eq!(bytes, uuid_bytes);
}
#[test]
fn test_compare_values() {
assert_eq!(
compare_values(&Value::Integer(1), &Value::Integer(2)).unwrap(),
Ordering::Less
);
assert_eq!(
compare_values(&Value::Integer(2), &Value::Integer(1)).unwrap(),
Ordering::Greater
);
assert_eq!(
compare_values(&Value::Integer(1), &Value::Integer(1)).unwrap(),
Ordering::Equal
);
assert_eq!(
compare_values(&Value::Null, &Value::Integer(1)).unwrap(),
Ordering::Less
);
assert_eq!(
compare_values(&Value::Integer(1), &Value::Null).unwrap(),
Ordering::Greater
);
}
#[test]
fn test_partition_key_to_decorated_key() {
let schema = create_test_schema(vec![("id", "int")], vec![]);
let pk = PartitionKey::single("id", Value::Integer(42));
let dk = pk.to_decorated_key(&schema).unwrap();
assert_eq!(dk.key, vec![0x00, 0x00, 0x00, 0x2A]);
let expected_token = calculate_murmur3_token(&dk.key).unwrap();
assert_eq!(dk.token, expected_token);
}
#[test]
fn test_murmur3_token_cassandra_compatibility() {
let key1 = vec![0x00, 0x00, 0x00, 0x01];
let token1 = calculate_murmur3_token(&key1).unwrap();
assert_ne!(token1, 0, "Token should not be zero for non-zero input");
let key2 = vec![0x00, 0x00, 0x00, 0x64];
let token2 = calculate_murmur3_token(&key2).unwrap();
assert_ne!(
token2, token1,
"Different keys should produce different tokens"
);
let key3 = b"test";
let token3 = calculate_murmur3_token(key3).unwrap();
assert_ne!(token3, token1);
assert_ne!(token3, token2);
let token1_repeat = calculate_murmur3_token(&key1).unwrap();
assert_eq!(token1, token1_repeat, "Tokens must be deterministic");
}
#[test]
fn test_decorated_key_btree_ordering() {
use std::collections::BTreeMap;
let mut map = BTreeMap::new();
let dk3 = DecoratedKey::new(300, vec![3]);
let dk1 = DecoratedKey::new(100, vec![1]);
let dk2 = DecoratedKey::new(200, vec![2]);
map.insert(dk3.clone(), "value3");
map.insert(dk1.clone(), "value1");
map.insert(dk2.clone(), "value2");
let keys: Vec<_> = map.keys().collect();
assert_eq!(keys[0].token, 100);
assert_eq!(keys[1].token, 200);
assert_eq!(keys[2].token, 300);
}
#[test]
fn test_decorated_key_hash_collision_handling() {
let token = 12345_i64;
let dk1 = DecoratedKey::new(token, vec![0x00, 0x01, 0x02]); let dk2 = DecoratedKey::new(token, vec![0x00, 0x01, 0x03]); let dk3 = DecoratedKey::new(token, vec![0x00, 0x01, 0x02]);
assert!(dk1 < dk2, "Keys with same token should order by bytes");
assert!(dk2 > dk1, "Key comparison should be consistent");
assert_eq!(
dk1.cmp(&dk3),
Ordering::Equal,
"Identical keys should be equal"
);
use std::collections::BTreeMap;
let mut map = BTreeMap::new();
map.insert(dk2.clone(), "value2");
map.insert(dk1.clone(), "value1");
map.insert(dk3.clone(), "value3");
assert_eq!(map.len(), 2);
let keys: Vec<_> = map.keys().collect();
assert_eq!(keys[0].key, vec![0x00, 0x01, 0x02]); assert_eq!(keys[1].key, vec![0x00, 0x01, 0x03]); }
#[test]
fn test_clustering_key_ord_valid_comparison() {
let ck1 = ClusteringKey::single("ts", Value::Timestamp(1000));
let ck2 = ClusteringKey::single("ts", Value::Timestamp(2000));
let ck3 = ClusteringKey::single("ts", Value::Timestamp(1000));
assert_eq!(ck1.cmp(&ck2), Ordering::Less);
assert_eq!(ck2.cmp(&ck1), Ordering::Greater);
assert_eq!(ck1.cmp(&ck3), Ordering::Equal);
let ck_multi1 = ClusteringKey::new(vec![
("year".to_string(), Value::Integer(2024)),
("month".to_string(), Value::SmallInt(1)),
]);
let ck_multi2 = ClusteringKey::new(vec![
("year".to_string(), Value::Integer(2024)),
("month".to_string(), Value::SmallInt(2)),
]);
assert_eq!(ck_multi1.cmp(&ck_multi2), Ordering::Less);
}
#[test]
fn test_clustering_key_ord_type_mismatch_is_total_and_does_not_panic() {
let ck1 = ClusteringKey::single("ts", Value::Timestamp(1000));
let ck2 = ClusteringKey::single("ts", Value::Integer(2000));
let ord_12 = ck1.cmp(&ck2);
let ord_21 = ck2.cmp(&ck1);
assert_eq!(ord_12, ck1.cmp(&ck2), "comparison must be deterministic");
assert_ne!(
ord_12,
Ordering::Equal,
"mismatched types must not compare Equal"
);
assert_eq!(
ord_12.reverse(),
ord_21,
"ordering must be antisymmetric (a.cmp(b) == b.cmp(a).reverse())"
);
assert_eq!(ck1.cmp(&ck1), Ordering::Equal);
use std::collections::BTreeMap;
let mut map = BTreeMap::new();
map.insert(ck1.clone(), "a");
map.insert(ck2.clone(), "b");
assert_eq!(map.len(), 2, "both distinct keys should be retained");
}
#[test]
fn test_clustering_key_ord_btree_ordering() {
use std::collections::BTreeMap;
let mut map = BTreeMap::new();
let ck3 = ClusteringKey::single("ts", Value::Timestamp(3000));
let ck1 = ClusteringKey::single("ts", Value::Timestamp(1000));
let ck2 = ClusteringKey::single("ts", Value::Timestamp(2000));
map.insert(ck3.clone(), "value3");
map.insert(ck1.clone(), "value1");
map.insert(ck2.clone(), "value2");
let values: Vec<_> = map.values().copied().collect();
assert_eq!(values, vec!["value1", "value2", "value3"]);
}
#[test]
fn test_compare_frozen_list_values() {
let list_a = Value::Frozen(Box::new(Value::List(vec![
Value::Text("a".to_string()),
Value::Text("b".to_string()),
])));
let list_b = Value::Frozen(Box::new(Value::List(vec![
Value::Text("a".to_string()),
Value::Text("c".to_string()),
])));
let list_c = Value::Frozen(Box::new(Value::List(vec![
Value::Text("a".to_string()),
Value::Text("b".to_string()),
Value::Text("c".to_string()),
])));
assert_eq!(compare_values(&list_a, &list_a).unwrap(), Ordering::Equal);
assert_eq!(compare_values(&list_a, &list_b).unwrap(), Ordering::Less);
assert_eq!(compare_values(&list_b, &list_a).unwrap(), Ordering::Greater);
assert_eq!(compare_values(&list_a, &list_c).unwrap(), Ordering::Less);
assert_eq!(compare_values(&list_c, &list_a).unwrap(), Ordering::Greater);
}
#[test]
fn test_frozen_list_clustering_key_btree_ordering() {
use std::collections::BTreeMap;
let mut map = BTreeMap::new();
let ck_2elem = ClusteringKey::single(
"tags",
Value::Frozen(Box::new(Value::List(vec![
Value::Text("ck_0_0".to_string()),
Value::Text("ck_0_1".to_string()),
]))),
);
let ck_3elem = ClusteringKey::single(
"tags",
Value::Frozen(Box::new(Value::List(vec![
Value::Text("ck_1_0".to_string()),
Value::Text("ck_1_1".to_string()),
Value::Text("ck_1_2".to_string()),
]))),
);
let ck_4elem = ClusteringKey::single(
"tags",
Value::Frozen(Box::new(Value::List(vec![
Value::Text("ck_2_0".to_string()),
Value::Text("ck_2_1".to_string()),
Value::Text("ck_2_2".to_string()),
Value::Text("ck_2_3".to_string()),
]))),
);
map.insert(ck_4elem.clone(), "4elem");
map.insert(ck_2elem.clone(), "2elem");
map.insert(ck_3elem.clone(), "3elem");
assert_eq!(map.len(), 3, "All frozen list CKs should be distinct");
let values: Vec<_> = map.values().copied().collect();
assert_eq!(values, vec!["2elem", "3elem", "4elem"]);
}
#[test]
fn test_partition_key_from_bytes_single_int() {
let schema = TableSchema {
keyspace: "ks".to_string(),
table: "tbl".to_string(),
partition_keys: vec![KeyColumn {
name: "id".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns: vec![],
comments: HashMap::new(),
};
let original = PartitionKey::single("id", Value::Integer(42));
let bytes = original.to_bytes(&schema).unwrap();
let decoded = PartitionKey::from_bytes(&bytes, &schema).unwrap();
assert_eq!(original, decoded);
}
#[test]
fn test_partition_key_from_bytes_single_uuid() {
let schema = TableSchema {
keyspace: "ks".to_string(),
table: "tbl".to_string(),
partition_keys: vec![KeyColumn {
name: "id".to_string(),
data_type: "uuid".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns: vec![],
comments: HashMap::new(),
};
let uuid_bytes = [1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
let original = PartitionKey::single("id", Value::Uuid(uuid_bytes));
let bytes = original.to_bytes(&schema).unwrap();
let decoded = PartitionKey::from_bytes(&bytes, &schema).unwrap();
assert_eq!(original, decoded);
}
#[test]
fn test_partition_key_from_bytes_single_text() {
let schema = TableSchema {
keyspace: "ks".to_string(),
table: "tbl".to_string(),
partition_keys: vec![KeyColumn {
name: "name".to_string(),
data_type: "text".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns: vec![],
comments: HashMap::new(),
};
let original = PartitionKey::single("name", Value::Text("hello".to_string()));
let bytes = original.to_bytes(&schema).unwrap();
let decoded = PartitionKey::from_bytes(&bytes, &schema).unwrap();
assert_eq!(original, decoded);
}
#[test]
fn test_partition_key_from_bytes_multi_component() {
let schema = TableSchema {
keyspace: "ks".to_string(),
table: "tbl".to_string(),
partition_keys: vec![
KeyColumn {
name: "tenant".to_string(),
data_type: "text".to_string(),
position: 0,
},
KeyColumn {
name: "id".to_string(),
data_type: "int".to_string(),
position: 1,
},
],
clustering_keys: vec![],
columns: vec![],
comments: HashMap::new(),
};
let original = PartitionKey::new(vec![
("tenant".to_string(), Value::Text("acme".to_string())),
("id".to_string(), Value::Integer(99)),
]);
let bytes = original.to_bytes(&schema).unwrap();
let decoded = PartitionKey::from_bytes(&bytes, &schema).unwrap();
assert_eq!(original, decoded);
}
#[test]
fn test_partition_key_from_bytes_empty_errors() {
let schema = TableSchema {
keyspace: "ks".to_string(),
table: "tbl".to_string(),
partition_keys: vec![KeyColumn {
name: "id".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns: vec![],
comments: HashMap::new(),
};
assert!(PartitionKey::from_bytes(&[], &schema).is_err());
}
#[test]
fn test_clustering_key_cmp_type_mismatch_does_not_panic() {
let key_a = ClusteringKey {
columns: vec![("col".to_string(), Value::Integer(1))],
};
let key_b = ClusteringKey {
columns: vec![("col".to_string(), Value::Text("1".to_string()))],
};
let first = key_a.cmp(&key_b);
let second = key_a.cmp(&key_b);
assert_eq!(first, second);
assert_eq!(key_a.cmp(&key_a), Ordering::Equal);
}
}