1use crate::error::{Error, Result};
14use crate::schema::{ClusteringOrder, TableSchema};
15use crate::types::{ComparatorType, Value};
16use std::cmp::Ordering;
17
18#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
20pub struct TableId {
21 pub keyspace: String,
23 pub table: String,
25}
26
27impl TableId {
28 pub fn new(keyspace: impl Into<String>, table: impl Into<String>) -> Self {
30 Self {
31 keyspace: keyspace.into(),
32 table: table.into(),
33 }
34 }
35
36 pub fn qualified_name(&self) -> String {
38 format!("{}.{}", self.keyspace, self.table)
39 }
40}
41
42impl std::fmt::Display for TableId {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 write!(f, "{}.{}", self.keyspace, self.table)
45 }
46}
47
48#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
63pub struct Mutation {
64 pub table: TableId,
66 pub partition_key: PartitionKey,
68 pub clustering_key: Option<ClusteringKey>,
70 pub operations: Vec<CellOperation>,
72 pub timestamp_micros: i64,
74 pub ttl_seconds: Option<u32>,
80 pub partition_tombstone: Option<PartitionTombstone>,
82 pub range_tombstones: Vec<RangeTombstone>,
84}
85
86impl Mutation {
87 pub fn new(
89 table: TableId,
90 partition_key: PartitionKey,
91 clustering_key: Option<ClusteringKey>,
92 operations: Vec<CellOperation>,
93 timestamp_micros: i64,
94 ttl_seconds: Option<u32>,
95 ) -> Self {
96 Self {
97 table,
98 partition_key,
99 clustering_key,
100 operations,
101 timestamp_micros,
102 ttl_seconds,
103 partition_tombstone: None,
104 range_tombstones: Vec::new(),
105 }
106 }
107
108 pub fn decorated_key(&self, schema: &TableSchema) -> Result<DecoratedKey> {
110 self.partition_key.to_decorated_key(schema)
111 }
112}
113
114#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
119pub struct PartitionTombstone {
120 pub deletion_time: i64,
122 pub local_deletion_time: i32,
124}
125
126#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
131pub struct RangeTombstone {
132 pub start: ClusteringBound,
134 pub end: ClusteringBound,
136 pub deletion_time: i64,
138 pub local_deletion_time: i32,
140}
141
142#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
147pub enum ClusteringBound {
148 Inclusive(ClusteringKey),
150 Exclusive(ClusteringKey),
152 Bottom,
154 Top,
156}
157
158#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
172pub enum CellOperation {
173 Write {
175 column: String,
177 value: Value,
179 },
180 WriteWithTtl {
186 column: String,
188 value: Value,
190 ttl_seconds: u32,
192 },
193 Delete {
195 column: String,
197 },
198 DeleteRow,
200}
201
202#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
207pub struct PartitionKey {
208 pub columns: Vec<(String, Value)>,
210}
211
212impl PartitionKey {
213 pub fn new(columns: Vec<(String, Value)>) -> Self {
215 Self { columns }
216 }
217
218 pub fn single(column: impl Into<String>, value: Value) -> Self {
220 Self {
221 columns: vec![(column.into(), value)],
222 }
223 }
224
225 pub fn to_bytes(&self, schema: &TableSchema) -> Result<Vec<u8>> {
231 if self.columns.is_empty() {
232 return Err(Error::InvalidInput("Empty partition key".to_string()));
233 }
234
235 if self.columns.len() != schema.partition_keys.len() {
237 return Err(Error::InvalidInput(format!(
238 "Partition key column count mismatch: expected {}, got {}",
239 schema.partition_keys.len(),
240 self.columns.len()
241 )));
242 }
243
244 let mut result = Vec::new();
245
246 if self.columns.len() == 1 {
248 let value_bytes =
249 self.serialize_value(&self.columns[0].1, &schema.partition_keys[0])?;
250 result.extend_from_slice(&value_bytes);
251 return Ok(result);
252 }
253
254 for (i, (_, value)) in self.columns.iter().enumerate() {
257 let value_bytes = self.serialize_value(value, &schema.partition_keys[i])?;
258 let len = value_bytes.len();
259 if len > u16::MAX as usize {
260 return Err(Error::InvalidInput(format!(
261 "Partition key component too large: {} bytes",
262 len
263 )));
264 }
265 result.extend_from_slice(&(len as u16).to_be_bytes());
267 result.extend_from_slice(&value_bytes);
268 result.push(0x00);
269 }
270
271 Ok(result)
272 }
273
274 pub fn to_decorated_key(&self, schema: &TableSchema) -> Result<DecoratedKey> {
276 let key_bytes = self.to_bytes(schema)?;
277 let token = calculate_murmur3_token(&key_bytes)?;
278 Ok(DecoratedKey::new(token, key_bytes))
279 }
280
281 pub fn from_bytes(data: &[u8], schema: &TableSchema) -> Result<Self> {
286 if schema.partition_keys.is_empty() {
287 return Err(Error::InvalidInput(
288 "Schema has no partition keys".to_string(),
289 ));
290 }
291
292 if data.is_empty() {
293 return Err(Error::InvalidInput("Empty partition key bytes".to_string()));
294 }
295
296 let columns =
299 crate::storage::partition_key_codec::decode_partition_key_columns(data, schema)?;
300
301 Ok(PartitionKey { columns })
302 }
303
304 fn serialize_value(
306 &self,
307 value: &Value,
308 key_column: &crate::schema::KeyColumn,
309 ) -> Result<Vec<u8>> {
310 let comparator = ComparatorType::from_data_type(&key_column.data_type)?;
312
313 serialize_value_bytes(value, &comparator)
314 }
315}
316
317#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
322pub struct ClusteringKey {
323 pub columns: Vec<(String, Value)>,
325}
326
327impl ClusteringKey {
328 pub fn new(columns: Vec<(String, Value)>) -> Self {
330 Self { columns }
331 }
332
333 pub fn single(column: impl Into<String>, value: Value) -> Self {
335 Self {
336 columns: vec![(column.into(), value)],
337 }
338 }
339
340 pub fn compare(&self, other: &Self, schema: &TableSchema) -> Result<Ordering> {
345 for (i, ((_, a_val), (_, b_val))) in
347 self.columns.iter().zip(other.columns.iter()).enumerate()
348 {
349 if i >= schema.clustering_keys.len() {
350 return Err(Error::Schema(format!(
351 "Clustering key has more columns than schema: {} > {}",
352 i + 1,
353 schema.clustering_keys.len()
354 )));
355 }
356
357 let cluster_col = &schema.clustering_keys[i];
358 let ordering = compare_values(a_val, b_val)?;
359
360 let final_ordering = if cluster_col.order == ClusteringOrder::Desc {
362 ordering.reverse()
363 } else {
364 ordering
365 };
366
367 if final_ordering != Ordering::Equal {
368 return Ok(final_ordering);
369 }
370 }
371
372 Ok(Ordering::Equal)
373 }
374}
375
376impl Ord for ClusteringKey {
377 fn cmp(&self, other: &Self) -> Ordering {
378 for ((_, a_val), (_, b_val)) in self.columns.iter().zip(other.columns.iter()) {
382 let ordering = compare_values(a_val, b_val).unwrap_or_else(|_| {
383 format!("{a_val:?}").cmp(&format!("{b_val:?}"))
387 });
388 if ordering != Ordering::Equal {
389 return ordering;
390 }
391 }
392 self.columns.len().cmp(&other.columns.len())
393 }
394}
395
396impl PartialOrd for ClusteringKey {
397 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
398 Some(self.cmp(other))
399 }
400}
401
402impl Eq for ClusteringKey {}
403
404#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
419pub struct DecoratedKey {
420 pub token: i64,
422 pub key: Vec<u8>,
424}
425
426impl DecoratedKey {
427 pub fn new(token: i64, key: Vec<u8>) -> Self {
429 Self { token, key }
430 }
431
432 pub fn from_key_bytes(key_bytes: Vec<u8>) -> Result<Self> {
434 let token = calculate_murmur3_token(&key_bytes)?;
435 Ok(Self::new(token, key_bytes))
436 }
437}
438
439impl Ord for DecoratedKey {
440 fn cmp(&self, other: &Self) -> Ordering {
441 match self.token.cmp(&other.token) {
443 Ordering::Equal => {
444 self.key.cmp(&other.key)
446 }
447 other => other,
448 }
449 }
450}
451
452impl PartialOrd for DecoratedKey {
453 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
454 Some(self.cmp(other))
455 }
456}
457
458fn calculate_murmur3_token(key_bytes: &[u8]) -> Result<i64> {
465 if key_bytes.is_empty() {
466 return Ok(i64::MIN);
467 }
468
469 Ok(crate::util::cassandra_murmur3::cassandra_murmur3_token(
470 key_bytes,
471 ))
472}
473
474fn serialize_value_bytes(value: &Value, comparator: &ComparatorType) -> Result<Vec<u8>> {
479 match (value, comparator) {
480 (Value::Null, _) => Ok(Vec::new()),
481
482 (Value::Boolean(b), ComparatorType::Boolean) => Ok(vec![if *b { 1 } else { 0 }]),
483
484 (Value::TinyInt(n), ComparatorType::TinyInt) => Ok(vec![*n as u8]),
485
486 (Value::SmallInt(n), ComparatorType::SmallInt) => Ok(n.to_be_bytes().to_vec()),
487
488 (Value::Integer(n), ComparatorType::Int) => Ok(n.to_be_bytes().to_vec()),
489
490 (Value::BigInt(n), ComparatorType::BigInt) => Ok(n.to_be_bytes().to_vec()),
491
492 (Value::Counter(n), ComparatorType::Counter) => Ok(n.to_be_bytes().to_vec()),
493
494 (Value::Float32(f), ComparatorType::Float32) => Ok(f.to_bits().to_be_bytes().to_vec()),
495
496 (Value::Float(f), ComparatorType::Float) => Ok(f.to_bits().to_be_bytes().to_vec()),
497
498 (Value::Text(s), ComparatorType::Text) => Ok(s.as_bytes().to_vec()),
499
500 (Value::Blob(bytes), ComparatorType::Blob) => Ok(bytes.clone()),
501
502 (Value::Timestamp(millis), ComparatorType::Timestamp) => Ok(millis.to_be_bytes().to_vec()),
503
504 (Value::Date(days), ComparatorType::Date) => {
505 let stored = days.wrapping_sub(i32::MIN) as u32;
507 Ok(stored.to_be_bytes().to_vec())
508 }
509
510 (Value::Uuid(bytes), ComparatorType::Uuid) => Ok(bytes.to_vec()),
511
512 (Value::Time(nanos), ComparatorType::Custom(name)) if name == "time" => {
514 Ok(nanos.to_be_bytes().to_vec())
515 }
516
517 (Value::Inet(bytes), ComparatorType::Custom(name)) if name == "inet" => Ok(bytes.clone()),
518
519 (Value::Varint(bytes), ComparatorType::Varint) => Ok(bytes.clone()),
520
521 (Value::Decimal { scale, unscaled }, ComparatorType::Decimal) => {
522 let mut result = Vec::new();
524 result.extend_from_slice(&scale.to_be_bytes());
525 result.extend_from_slice(unscaled);
526 Ok(result)
527 }
528
529 (
530 Value::Duration {
531 months,
532 days,
533 nanos,
534 },
535 ComparatorType::Duration,
536 ) => {
537 let mut result = Vec::new();
539 result.extend_from_slice(&months.to_be_bytes());
540 result.extend_from_slice(&days.to_be_bytes());
541 result.extend_from_slice(&nanos.to_be_bytes());
542 Ok(result)
543 }
544
545 _ => Err(Error::InvalidInput(format!(
546 "Type mismatch: value {:?} does not match comparator {:?}",
547 value, comparator
548 ))),
549 }
550}
551
552fn compare_values(a: &Value, b: &Value) -> Result<Ordering> {
554 use Value::*;
555
556 match (a, b) {
557 (Null, Null) => Ok(Ordering::Equal),
558 (Null, _) => Ok(Ordering::Less),
559 (_, Null) => Ok(Ordering::Greater),
560
561 (Boolean(a), Boolean(b)) => Ok(a.cmp(b)),
562 (TinyInt(a), TinyInt(b)) => Ok(a.cmp(b)),
563 (SmallInt(a), SmallInt(b)) => Ok(a.cmp(b)),
564 (Integer(a), Integer(b)) => Ok(a.cmp(b)),
565 (BigInt(a), BigInt(b)) => Ok(a.cmp(b)),
566 (Counter(a), Counter(b)) => Ok(a.cmp(b)),
567 (Float32(a), Float32(b)) => Ok(a.partial_cmp(b).unwrap_or(Ordering::Equal)),
568 (Float(a), Float(b)) => Ok(a.partial_cmp(b).unwrap_or(Ordering::Equal)),
569 (Text(a), Text(b)) => Ok(a.cmp(b)),
570 (Blob(a), Blob(b)) => Ok(a.cmp(b)),
571 (Timestamp(a), Timestamp(b)) => Ok(a.cmp(b)),
572 (Date(a), Date(b)) => Ok(a.cmp(b)),
573 (Time(a), Time(b)) => Ok(a.cmp(b)),
574 (Uuid(a), Uuid(b)) => Ok(a.cmp(b)),
575 (Inet(a), Inet(b)) => Ok(a.cmp(b)),
576
577 (List(a), List(b)) | (Set(a), Set(b)) => {
579 for (elem_a, elem_b) in a.iter().zip(b.iter()) {
580 let ord = compare_values(elem_a, elem_b)?;
581 if ord != Ordering::Equal {
582 return Ok(ord);
583 }
584 }
585 Ok(a.len().cmp(&b.len()))
586 }
587 (Map(a), Map(b)) => {
588 for ((ka, va), (kb, vb)) in a.iter().zip(b.iter()) {
589 let key_ord = compare_values(ka, kb)?;
590 if key_ord != Ordering::Equal {
591 return Ok(key_ord);
592 }
593 let val_ord = compare_values(va, vb)?;
594 if val_ord != Ordering::Equal {
595 return Ok(val_ord);
596 }
597 }
598 Ok(a.len().cmp(&b.len()))
599 }
600 (Tuple(a), Tuple(b)) => {
601 for (fa, fb) in a.iter().zip(b.iter()) {
602 let ord = compare_values(fa, fb)?;
603 if ord != Ordering::Equal {
604 return Ok(ord);
605 }
606 }
607 Ok(a.len().cmp(&b.len()))
608 }
609
610 (Frozen(a), Frozen(b)) => compare_values(a, b),
612
613 _ => Err(Error::InvalidInput(format!(
614 "Cannot compare values of different types: {:?} vs {:?}",
615 a, b
616 ))),
617 }
618}
619
620#[cfg(test)]
621mod tests {
622 use super::*;
623 use crate::schema::{ClusteringColumn, ClusteringOrder, KeyColumn};
624 use std::collections::HashMap;
625
626 fn create_test_schema(
627 partition_cols: Vec<(&str, &str)>,
628 clustering_cols: Vec<(&str, &str, ClusteringOrder)>,
629 ) -> TableSchema {
630 TableSchema {
631 keyspace: "test_ks".to_string(),
632 table: "test_table".to_string(),
633 partition_keys: partition_cols
634 .into_iter()
635 .enumerate()
636 .map(|(i, (name, data_type))| KeyColumn {
637 name: name.to_string(),
638 data_type: data_type.to_string(),
639 position: i,
640 })
641 .collect(),
642 clustering_keys: clustering_cols
643 .into_iter()
644 .enumerate()
645 .map(|(i, (name, data_type, order))| ClusteringColumn {
646 name: name.to_string(),
647 data_type: data_type.to_string(),
648 position: i,
649 order,
650 })
651 .collect(),
652 columns: vec![],
653 comments: HashMap::new(),
654 }
655 }
656
657 #[test]
658 fn test_table_id() {
659 let table_id = TableId::new("my_keyspace", "my_table");
660 assert_eq!(table_id.keyspace, "my_keyspace");
661 assert_eq!(table_id.table, "my_table");
662 assert_eq!(table_id.qualified_name(), "my_keyspace.my_table");
663 assert_eq!(table_id.to_string(), "my_keyspace.my_table");
664 }
665
666 #[test]
667 fn test_partition_key_single_int() {
668 let schema = create_test_schema(vec![("id", "int")], vec![]);
669 let pk = PartitionKey::single("id", Value::Integer(42));
670
671 let bytes = pk.to_bytes(&schema).unwrap();
672 assert_eq!(bytes, vec![0x00, 0x00, 0x00, 0x2A]);
674 }
675
676 #[test]
677 fn test_partition_key_multi_component() {
678 let schema = create_test_schema(vec![("id", "int"), ("name", "text")], vec![]);
679 let pk = PartitionKey::new(vec![
680 ("id".to_string(), Value::Integer(42)),
681 ("name".to_string(), Value::Text("hello".to_string())),
682 ]);
683
684 let bytes = pk.to_bytes(&schema).unwrap();
685 let expected = vec![
688 0x00, 0x04, 0x00, 0x00, 0x00, 0x2A, 0x00, 0x00, 0x05, b'h', b'e', b'l', b'l', b'o', 0x00, ];
695 assert_eq!(bytes, expected);
696 }
697
698 #[test]
699 fn test_partition_key_three_components() {
700 let schema = create_test_schema(
702 vec![("symbol", "text"), ("exchange", "text"), ("bucket", "int")],
703 vec![],
704 );
705 let pk = PartitionKey::new(vec![
706 ("symbol".to_string(), Value::Text("AAPL".to_string())),
707 ("exchange".to_string(), Value::Text("NYSE".to_string())),
708 ("bucket".to_string(), Value::Integer(100)),
709 ]);
710
711 let bytes = pk.to_bytes(&schema).unwrap();
712 let expected = vec![
714 0x00, 0x04, b'A', b'A', b'P', b'L', 0x00, 0x00, 0x04, b'N', b'Y', b'S', b'E', 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x64, 0x00, ];
724 assert_eq!(bytes, expected);
725 }
726
727 #[test]
728 fn test_decorated_key_ordering() {
729 let dk1 = DecoratedKey::new(100, vec![1, 2, 3]);
730 let dk2 = DecoratedKey::new(200, vec![1, 2, 3]);
731 let dk3 = DecoratedKey::new(100, vec![1, 2, 4]);
732
733 assert!(dk1 < dk2);
735 assert!(dk2 > dk1);
736
737 assert!(dk1 < dk3);
739 assert!(dk3 > dk1);
740
741 let dk4 = DecoratedKey::new(100, vec![1, 2, 3]);
743 assert_eq!(dk1, dk4);
744 }
745
746 #[test]
747 fn test_murmur3_token_empty_key() {
748 let token = calculate_murmur3_token(&[]).unwrap();
749 assert_eq!(token, i64::MIN);
750 }
751
752 #[test]
753 fn test_murmur3_token_deterministic() {
754 let key_bytes = b"test_key";
755 let token1 = calculate_murmur3_token(key_bytes).unwrap();
756 let token2 = calculate_murmur3_token(key_bytes).unwrap();
757 assert_eq!(token1, token2, "Token calculation should be deterministic");
758 }
759
760 #[test]
761 fn test_murmur3_token_different_keys() {
762 let token1 = calculate_murmur3_token(b"key1").unwrap();
763 let token2 = calculate_murmur3_token(b"key2").unwrap();
764 assert_ne!(
765 token1, token2,
766 "Different keys should produce different tokens"
767 );
768 }
769
770 #[test]
771 fn test_murmur3_token_matches_cassandra_for_composite_uuid_key() {
772 let key_bytes = vec![
775 0x00, 0x10, 0x0f, 0x0f, 0x0f, 0x0f, 0x00, 0x00, 0x40, 0x00, 0x80, 0x00, 0x00, 0x00,
776 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x10, 0x0f, 0x0f, 0x0f, 0x0f, 0x00, 0x00, 0x40,
777 0x00, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xaa, 0x00,
778 ];
779
780 let token = calculate_murmur3_token(&key_bytes).unwrap();
781 assert_eq!(token, -5_116_541_970_184_546_410);
782 }
783
784 #[test]
785 fn test_decorated_key_from_bytes() {
786 let key_bytes = vec![0x00, 0x00, 0x00, 0x2A]; let dk = DecoratedKey::from_key_bytes(key_bytes.clone()).unwrap();
788
789 assert_eq!(dk.key, key_bytes);
790 let expected_token = calculate_murmur3_token(&key_bytes).unwrap();
792 assert_eq!(dk.token, expected_token);
793 }
794
795 #[test]
796 fn test_clustering_key_ordering() {
797 let schema = create_test_schema(
798 vec![("id", "int")],
799 vec![("ts", "timestamp", ClusteringOrder::Asc)],
800 );
801
802 let ck1 = ClusteringKey::single("ts", Value::Timestamp(1000));
803 let ck2 = ClusteringKey::single("ts", Value::Timestamp(2000));
804
805 let ordering = ck1.compare(&ck2, &schema).unwrap();
806 assert_eq!(ordering, Ordering::Less);
807 }
808
809 #[test]
810 fn test_clustering_key_desc_ordering() {
811 let schema = create_test_schema(
812 vec![("id", "int")],
813 vec![("ts", "timestamp", ClusteringOrder::Desc)],
814 );
815
816 let ck1 = ClusteringKey::single("ts", Value::Timestamp(1000));
817 let ck2 = ClusteringKey::single("ts", Value::Timestamp(2000));
818
819 let ordering = ck1.compare(&ck2, &schema).unwrap();
820 assert_eq!(ordering, Ordering::Greater);
822 }
823
824 #[test]
825 fn test_mutation_creation() {
826 let table_id = TableId::new("ks", "table");
827 let pk = PartitionKey::single("id", Value::Integer(1));
828 let ops = vec![CellOperation::Write {
829 column: "name".to_string(),
830 value: Value::Text("Alice".to_string()),
831 }];
832
833 let mutation = Mutation::new(table_id.clone(), pk, None, ops, 1234567890, None);
834
835 assert_eq!(mutation.table.keyspace, "ks");
836 assert_eq!(mutation.table.table, "table");
837 assert_eq!(mutation.timestamp_micros, 1234567890);
838 assert_eq!(mutation.ttl_seconds, None);
839 assert_eq!(mutation.operations.len(), 1);
840 }
841
842 #[test]
843 fn test_cell_operation_write() {
844 let op = CellOperation::Write {
845 column: "age".to_string(),
846 value: Value::Integer(30),
847 };
848
849 match op {
850 CellOperation::Write { column, value } => {
851 assert_eq!(column, "age");
852 assert_eq!(value, Value::Integer(30));
853 }
854 _ => panic!("Expected Write operation"),
855 }
856 }
857
858 #[test]
859 fn test_cell_operation_delete() {
860 let op = CellOperation::Delete {
861 column: "name".to_string(),
862 };
863
864 match op {
865 CellOperation::Delete { column } => {
866 assert_eq!(column, "name");
867 }
868 _ => panic!("Expected Delete operation"),
869 }
870 }
871
872 #[test]
873 fn test_cell_operation_delete_row() {
874 let op = CellOperation::DeleteRow;
875 assert!(matches!(op, CellOperation::DeleteRow));
876 }
877
878 #[test]
879 fn test_serialize_value_types() {
880 let bytes = serialize_value_bytes(&Value::Boolean(true), &ComparatorType::Boolean).unwrap();
882 assert_eq!(bytes, vec![1]);
883
884 let bytes = serialize_value_bytes(&Value::Integer(42), &ComparatorType::Int).unwrap();
886 assert_eq!(bytes, vec![0x00, 0x00, 0x00, 0x2A]);
887
888 let bytes = serialize_value_bytes(&Value::Text("hello".to_string()), &ComparatorType::Text)
890 .unwrap();
891 assert_eq!(bytes, b"hello");
892
893 let uuid_bytes = [
895 0x01, 0x23, 0x45, 0x67, 0x89, 0xAB, 0xCD, 0xEF, 0x01, 0x23, 0x45, 0x67, 0x89, 0xAB,
896 0xCD, 0xEF,
897 ];
898 let bytes = serialize_value_bytes(&Value::Uuid(uuid_bytes), &ComparatorType::Uuid).unwrap();
899 assert_eq!(bytes, uuid_bytes);
900 }
901
902 #[test]
903 fn test_compare_values() {
904 assert_eq!(
905 compare_values(&Value::Integer(1), &Value::Integer(2)).unwrap(),
906 Ordering::Less
907 );
908 assert_eq!(
909 compare_values(&Value::Integer(2), &Value::Integer(1)).unwrap(),
910 Ordering::Greater
911 );
912 assert_eq!(
913 compare_values(&Value::Integer(1), &Value::Integer(1)).unwrap(),
914 Ordering::Equal
915 );
916
917 assert_eq!(
919 compare_values(&Value::Null, &Value::Integer(1)).unwrap(),
920 Ordering::Less
921 );
922 assert_eq!(
923 compare_values(&Value::Integer(1), &Value::Null).unwrap(),
924 Ordering::Greater
925 );
926 }
927
928 #[test]
929 fn test_partition_key_to_decorated_key() {
930 let schema = create_test_schema(vec![("id", "int")], vec![]);
931 let pk = PartitionKey::single("id", Value::Integer(42));
932
933 let dk = pk.to_decorated_key(&schema).unwrap();
934 assert_eq!(dk.key, vec![0x00, 0x00, 0x00, 0x2A]);
935
936 let expected_token = calculate_murmur3_token(&dk.key).unwrap();
938 assert_eq!(dk.token, expected_token);
939 }
940
941 #[test]
942 fn test_murmur3_token_cassandra_compatibility() {
943 let key1 = vec![0x00, 0x00, 0x00, 0x01];
948 let token1 = calculate_murmur3_token(&key1).unwrap();
949 assert_ne!(token1, 0, "Token should not be zero for non-zero input");
952
953 let key2 = vec![0x00, 0x00, 0x00, 0x64];
955 let token2 = calculate_murmur3_token(&key2).unwrap();
956 assert_ne!(
957 token2, token1,
958 "Different keys should produce different tokens"
959 );
960
961 let key3 = b"test";
963 let token3 = calculate_murmur3_token(key3).unwrap();
964 assert_ne!(token3, token1);
965 assert_ne!(token3, token2);
966
967 let token1_repeat = calculate_murmur3_token(&key1).unwrap();
969 assert_eq!(token1, token1_repeat, "Tokens must be deterministic");
970 }
971
972 #[test]
973 fn test_decorated_key_btree_ordering() {
974 use std::collections::BTreeMap;
976
977 let mut map = BTreeMap::new();
978
979 let dk3 = DecoratedKey::new(300, vec![3]);
981 let dk1 = DecoratedKey::new(100, vec![1]);
982 let dk2 = DecoratedKey::new(200, vec![2]);
983
984 map.insert(dk3.clone(), "value3");
985 map.insert(dk1.clone(), "value1");
986 map.insert(dk2.clone(), "value2");
987
988 let keys: Vec<_> = map.keys().collect();
990 assert_eq!(keys[0].token, 100);
991 assert_eq!(keys[1].token, 200);
992 assert_eq!(keys[2].token, 300);
993 }
994
995 #[test]
996 fn test_decorated_key_hash_collision_handling() {
997 let token = 12345_i64; let dk1 = DecoratedKey::new(token, vec![0x00, 0x01, 0x02]); let dk2 = DecoratedKey::new(token, vec![0x00, 0x01, 0x03]); let dk3 = DecoratedKey::new(token, vec![0x00, 0x01, 0x02]); assert!(dk1 < dk2, "Keys with same token should order by bytes");
1009 assert!(dk2 > dk1, "Key comparison should be consistent");
1010 assert_eq!(
1011 dk1.cmp(&dk3),
1012 Ordering::Equal,
1013 "Identical keys should be equal"
1014 );
1015
1016 use std::collections::BTreeMap;
1018 let mut map = BTreeMap::new();
1019
1020 map.insert(dk2.clone(), "value2");
1021 map.insert(dk1.clone(), "value1");
1022 map.insert(dk3.clone(), "value3"); assert_eq!(map.len(), 2);
1026
1027 let keys: Vec<_> = map.keys().collect();
1029 assert_eq!(keys[0].key, vec![0x00, 0x01, 0x02]); assert_eq!(keys[1].key, vec![0x00, 0x01, 0x03]); }
1032
1033 #[test]
1034 fn test_clustering_key_ord_valid_comparison() {
1035 let ck1 = ClusteringKey::single("ts", Value::Timestamp(1000));
1037 let ck2 = ClusteringKey::single("ts", Value::Timestamp(2000));
1038 let ck3 = ClusteringKey::single("ts", Value::Timestamp(1000));
1039
1040 assert_eq!(ck1.cmp(&ck2), Ordering::Less);
1042 assert_eq!(ck2.cmp(&ck1), Ordering::Greater);
1043 assert_eq!(ck1.cmp(&ck3), Ordering::Equal);
1044
1045 let ck_multi1 = ClusteringKey::new(vec![
1047 ("year".to_string(), Value::Integer(2024)),
1048 ("month".to_string(), Value::SmallInt(1)),
1049 ]);
1050 let ck_multi2 = ClusteringKey::new(vec![
1051 ("year".to_string(), Value::Integer(2024)),
1052 ("month".to_string(), Value::SmallInt(2)),
1053 ]);
1054
1055 assert_eq!(ck_multi1.cmp(&ck_multi2), Ordering::Less);
1056 }
1057
1058 #[test]
1059 fn test_clustering_key_ord_type_mismatch_is_total_and_does_not_panic() {
1060 let ck1 = ClusteringKey::single("ts", Value::Timestamp(1000));
1066 let ck2 = ClusteringKey::single("ts", Value::Integer(2000)); let ord_12 = ck1.cmp(&ck2);
1070 let ord_21 = ck2.cmp(&ck1);
1071
1072 assert_eq!(ord_12, ck1.cmp(&ck2), "comparison must be deterministic");
1074
1075 assert_ne!(
1078 ord_12,
1079 Ordering::Equal,
1080 "mismatched types must not compare Equal"
1081 );
1082 assert_eq!(
1083 ord_12.reverse(),
1084 ord_21,
1085 "ordering must be antisymmetric (a.cmp(b) == b.cmp(a).reverse())"
1086 );
1087
1088 assert_eq!(ck1.cmp(&ck1), Ordering::Equal);
1090
1091 use std::collections::BTreeMap;
1093 let mut map = BTreeMap::new();
1094 map.insert(ck1.clone(), "a");
1095 map.insert(ck2.clone(), "b");
1096 assert_eq!(map.len(), 2, "both distinct keys should be retained");
1097 }
1098
1099 #[test]
1100 fn test_clustering_key_ord_btree_ordering() {
1101 use std::collections::BTreeMap;
1103
1104 let mut map = BTreeMap::new();
1105
1106 let ck3 = ClusteringKey::single("ts", Value::Timestamp(3000));
1107 let ck1 = ClusteringKey::single("ts", Value::Timestamp(1000));
1108 let ck2 = ClusteringKey::single("ts", Value::Timestamp(2000));
1109
1110 map.insert(ck3.clone(), "value3");
1112 map.insert(ck1.clone(), "value1");
1113 map.insert(ck2.clone(), "value2");
1114
1115 let values: Vec<_> = map.values().copied().collect();
1117 assert_eq!(values, vec!["value1", "value2", "value3"]);
1118 }
1119
1120 #[test]
1121 fn test_compare_frozen_list_values() {
1122 let list_a = Value::Frozen(Box::new(Value::List(vec![
1124 Value::Text("a".to_string()),
1125 Value::Text("b".to_string()),
1126 ])));
1127 let list_b = Value::Frozen(Box::new(Value::List(vec![
1128 Value::Text("a".to_string()),
1129 Value::Text("c".to_string()),
1130 ])));
1131 let list_c = Value::Frozen(Box::new(Value::List(vec![
1132 Value::Text("a".to_string()),
1133 Value::Text("b".to_string()),
1134 Value::Text("c".to_string()),
1135 ])));
1136
1137 assert_eq!(compare_values(&list_a, &list_a).unwrap(), Ordering::Equal);
1139 assert_eq!(compare_values(&list_a, &list_b).unwrap(), Ordering::Less);
1141 assert_eq!(compare_values(&list_b, &list_a).unwrap(), Ordering::Greater);
1142 assert_eq!(compare_values(&list_a, &list_c).unwrap(), Ordering::Less);
1144 assert_eq!(compare_values(&list_c, &list_a).unwrap(), Ordering::Greater);
1145 }
1146
1147 #[test]
1148 fn test_frozen_list_clustering_key_btree_ordering() {
1149 use std::collections::BTreeMap;
1151
1152 let mut map = BTreeMap::new();
1153
1154 let ck_2elem = ClusteringKey::single(
1156 "tags",
1157 Value::Frozen(Box::new(Value::List(vec![
1158 Value::Text("ck_0_0".to_string()),
1159 Value::Text("ck_0_1".to_string()),
1160 ]))),
1161 );
1162 let ck_3elem = ClusteringKey::single(
1163 "tags",
1164 Value::Frozen(Box::new(Value::List(vec![
1165 Value::Text("ck_1_0".to_string()),
1166 Value::Text("ck_1_1".to_string()),
1167 Value::Text("ck_1_2".to_string()),
1168 ]))),
1169 );
1170 let ck_4elem = ClusteringKey::single(
1171 "tags",
1172 Value::Frozen(Box::new(Value::List(vec![
1173 Value::Text("ck_2_0".to_string()),
1174 Value::Text("ck_2_1".to_string()),
1175 Value::Text("ck_2_2".to_string()),
1176 Value::Text("ck_2_3".to_string()),
1177 ]))),
1178 );
1179
1180 map.insert(ck_4elem.clone(), "4elem");
1182 map.insert(ck_2elem.clone(), "2elem");
1183 map.insert(ck_3elem.clone(), "3elem");
1184
1185 assert_eq!(map.len(), 3, "All frozen list CKs should be distinct");
1187
1188 let values: Vec<_> = map.values().copied().collect();
1190 assert_eq!(values, vec!["2elem", "3elem", "4elem"]);
1191 }
1192
1193 #[test]
1194 fn test_partition_key_from_bytes_single_int() {
1195 let schema = TableSchema {
1196 keyspace: "ks".to_string(),
1197 table: "tbl".to_string(),
1198 partition_keys: vec![KeyColumn {
1199 name: "id".to_string(),
1200 data_type: "int".to_string(),
1201 position: 0,
1202 }],
1203 clustering_keys: vec![],
1204 columns: vec![],
1205 comments: HashMap::new(),
1206 };
1207
1208 let original = PartitionKey::single("id", Value::Integer(42));
1209 let bytes = original.to_bytes(&schema).unwrap();
1210 let decoded = PartitionKey::from_bytes(&bytes, &schema).unwrap();
1211 assert_eq!(original, decoded);
1212 }
1213
1214 #[test]
1215 fn test_partition_key_from_bytes_single_uuid() {
1216 let schema = TableSchema {
1217 keyspace: "ks".to_string(),
1218 table: "tbl".to_string(),
1219 partition_keys: vec![KeyColumn {
1220 name: "id".to_string(),
1221 data_type: "uuid".to_string(),
1222 position: 0,
1223 }],
1224 clustering_keys: vec![],
1225 columns: vec![],
1226 comments: HashMap::new(),
1227 };
1228
1229 let uuid_bytes = [1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
1230 let original = PartitionKey::single("id", Value::Uuid(uuid_bytes));
1231 let bytes = original.to_bytes(&schema).unwrap();
1232 let decoded = PartitionKey::from_bytes(&bytes, &schema).unwrap();
1233 assert_eq!(original, decoded);
1234 }
1235
1236 #[test]
1237 fn test_partition_key_from_bytes_single_text() {
1238 let schema = TableSchema {
1239 keyspace: "ks".to_string(),
1240 table: "tbl".to_string(),
1241 partition_keys: vec![KeyColumn {
1242 name: "name".to_string(),
1243 data_type: "text".to_string(),
1244 position: 0,
1245 }],
1246 clustering_keys: vec![],
1247 columns: vec![],
1248 comments: HashMap::new(),
1249 };
1250
1251 let original = PartitionKey::single("name", Value::Text("hello".to_string()));
1252 let bytes = original.to_bytes(&schema).unwrap();
1253 let decoded = PartitionKey::from_bytes(&bytes, &schema).unwrap();
1254 assert_eq!(original, decoded);
1255 }
1256
1257 #[test]
1258 fn test_partition_key_from_bytes_multi_component() {
1259 let schema = TableSchema {
1260 keyspace: "ks".to_string(),
1261 table: "tbl".to_string(),
1262 partition_keys: vec![
1263 KeyColumn {
1264 name: "tenant".to_string(),
1265 data_type: "text".to_string(),
1266 position: 0,
1267 },
1268 KeyColumn {
1269 name: "id".to_string(),
1270 data_type: "int".to_string(),
1271 position: 1,
1272 },
1273 ],
1274 clustering_keys: vec![],
1275 columns: vec![],
1276 comments: HashMap::new(),
1277 };
1278
1279 let original = PartitionKey::new(vec![
1280 ("tenant".to_string(), Value::Text("acme".to_string())),
1281 ("id".to_string(), Value::Integer(99)),
1282 ]);
1283 let bytes = original.to_bytes(&schema).unwrap();
1284 let decoded = PartitionKey::from_bytes(&bytes, &schema).unwrap();
1285 assert_eq!(original, decoded);
1286 }
1287
1288 #[test]
1289 fn test_partition_key_from_bytes_empty_errors() {
1290 let schema = TableSchema {
1291 keyspace: "ks".to_string(),
1292 table: "tbl".to_string(),
1293 partition_keys: vec![KeyColumn {
1294 name: "id".to_string(),
1295 data_type: "int".to_string(),
1296 position: 0,
1297 }],
1298 clustering_keys: vec![],
1299 columns: vec![],
1300 comments: HashMap::new(),
1301 };
1302
1303 assert!(PartitionKey::from_bytes(&[], &schema).is_err());
1304 }
1305
1306 #[test]
1307 fn test_clustering_key_cmp_type_mismatch_does_not_panic() {
1308 let key_a = ClusteringKey {
1312 columns: vec![("col".to_string(), Value::Integer(1))],
1313 };
1314 let key_b = ClusteringKey {
1315 columns: vec![("col".to_string(), Value::Text("1".to_string()))],
1316 };
1317
1318 let first = key_a.cmp(&key_b);
1320 let second = key_a.cmp(&key_b);
1322 assert_eq!(first, second);
1323
1324 assert_eq!(key_a.cmp(&key_a), Ordering::Equal);
1326 }
1327}