Skip to main content

cqlite_core/storage/write_engine/
mutation.rs

1//! Mutation types for CQL write operations
2//!
3//! Represents INSERT, UPDATE, DELETE operations as structured mutations.
4//! Supports cell-level operations with timestamps and TTL.
5//!
6//! This module implements the core data types for M5 write support:
7//! - `Mutation`: Represents a write operation (INSERT, UPDATE, DELETE)
8//! - `DecoratedKey`: Token + raw key bytes for partition ordering
9//! - `PartitionKey`: Multi-column partition key with schema-aware encoding
10//! - `ClusteringKey`: Multi-column clustering key with ASC/DESC ordering
11//! - `CellOperation`: Cell-level write/delete operations
12
13use crate::error::{Error, Result};
14use crate::schema::{ClusteringOrder, TableSchema};
15use crate::types::{ComparatorType, Value};
16use std::cmp::Ordering;
17
18/// Table identifier (keyspace + table name)
19#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
20pub struct TableId {
21    /// Keyspace name
22    pub keyspace: String,
23    /// Table name
24    pub table: String,
25}
26
27impl TableId {
28    /// Create a new table identifier
29    pub fn new(keyspace: impl Into<String>, table: impl Into<String>) -> Self {
30        Self {
31            keyspace: keyspace.into(),
32            table: table.into(),
33        }
34    }
35
36    /// Get the fully qualified table name (keyspace.table)
37    pub fn qualified_name(&self) -> String {
38        format!("{}.{}", self.keyspace, self.table)
39    }
40}
41
42impl std::fmt::Display for TableId {
43    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        write!(f, "{}.{}", self.keyspace, self.table)
45    }
46}
47
48/// A mutation represents a write operation (INSERT, UPDATE, DELETE)
49///
50/// This is the fundamental unit of write operations in CQLite, corresponding to
51/// a single CQL INSERT/UPDATE/DELETE statement. Each mutation targets a specific
52/// row (identified by partition key + optional clustering key) and contains
53/// one or more cell operations.
54///
55/// # Tombstone Support (M5.2)
56///
57/// Mutations can represent various deletion types:
58/// - Cell tombstone: `CellOperation::Delete` for single column
59/// - Row tombstone: `CellOperation::DeleteRow` for entire row
60/// - Range tombstone: `range_tombstones` field for clustering key ranges
61/// - Partition tombstone: `partition_tombstone` field for entire partition
62#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
63pub struct Mutation {
64    /// Target table
65    pub table: TableId,
66    /// Partition key values
67    pub partition_key: PartitionKey,
68    /// Clustering key values (None for tables without clustering keys)
69    pub clustering_key: Option<ClusteringKey>,
70    /// Cell-level operations (writes or deletes)
71    pub operations: Vec<CellOperation>,
72    /// Timestamp in microseconds since Unix epoch
73    pub timestamp_micros: i64,
74    /// Time-to-live in seconds applied to all cells in this mutation (None = no expiration).
75    ///
76    /// This is set by `USING TTL` in CQL statements and applies uniformly to all
77    /// `Write` operations. For per-column TTL, use `CellOperation::WriteWithTtl`
78    /// in the operations list instead.
79    pub ttl_seconds: Option<u32>,
80    /// Partition tombstone (deletes entire partition)
81    pub partition_tombstone: Option<PartitionTombstone>,
82    /// Range tombstones (delete clustering key ranges within partition)
83    pub range_tombstones: Vec<RangeTombstone>,
84}
85
86impl Mutation {
87    /// Create a new mutation
88    pub fn new(
89        table: TableId,
90        partition_key: PartitionKey,
91        clustering_key: Option<ClusteringKey>,
92        operations: Vec<CellOperation>,
93        timestamp_micros: i64,
94        ttl_seconds: Option<u32>,
95    ) -> Self {
96        Self {
97            table,
98            partition_key,
99            clustering_key,
100            operations,
101            timestamp_micros,
102            ttl_seconds,
103            partition_tombstone: None,
104            range_tombstones: Vec::new(),
105        }
106    }
107
108    /// Get the decorated key for this mutation (token + raw bytes)
109    pub fn decorated_key(&self, schema: &TableSchema) -> Result<DecoratedKey> {
110        self.partition_key.to_decorated_key(schema)
111    }
112}
113
114/// Partition tombstone for deleting entire partition
115///
116/// Stored in the partition header and shadows all rows in the partition
117/// when the partition deletion time is greater than the row timestamps.
118#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
119pub struct PartitionTombstone {
120    /// Deletion timestamp in microseconds since Unix epoch
121    pub deletion_time: i64,
122    /// Local deletion time in seconds since Unix epoch
123    pub local_deletion_time: i32,
124}
125
126/// Range tombstone for deleting a range of clustering keys
127///
128/// Stored as markers within the partition data and shadows all rows
129/// in the specified clustering key range.
130#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
131pub struct RangeTombstone {
132    /// Start bound (inclusive or exclusive)
133    pub start: ClusteringBound,
134    /// End bound (inclusive or exclusive)
135    pub end: ClusteringBound,
136    /// Deletion timestamp in microseconds since Unix epoch
137    pub deletion_time: i64,
138    /// Local deletion time in seconds since Unix epoch
139    pub local_deletion_time: i32,
140}
141
142/// Clustering key bound for range tombstones
143///
144/// Defines the boundary of a range deletion. Can be inclusive or exclusive,
145/// or represent the minimum/maximum possible clustering key.
146#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
147pub enum ClusteringBound {
148    /// Inclusive bound (clustering key is part of the deletion range)
149    Inclusive(ClusteringKey),
150    /// Exclusive bound (clustering key is NOT part of the deletion range)
151    Exclusive(ClusteringKey),
152    /// Before all clustering keys (start of partition)
153    Bottom,
154    /// After all clustering keys (end of partition)
155    Top,
156}
157
158/// Operations that can be applied to individual cells within a row.
159///
160/// # Per-Cell TTL
161///
162/// Per-cell TTL is supported via the `WriteWithTtl` variant when using
163/// the JSON mutation format directly. CQL syntax (`USING TTL`) applies
164/// TTL uniformly to all cells in a statement. To set different TTLs
165/// per column, submit separate mutations or use JSON mutations with
166/// `WriteWithTtl`:
167///
168/// ```json
169/// {"WriteWithTtl": {"column": "session_token", "value": {"Text": "abc"}, "ttl_seconds": 3600}}
170/// ```
171#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
172pub enum CellOperation {
173    /// Write a value to a column
174    Write {
175        /// Column name
176        column: String,
177        /// Column value
178        value: Value,
179    },
180    /// Write a value to a column with TTL (expiring cell).
181    ///
182    /// The cell will expire after `ttl_seconds` seconds. This is the only
183    /// way to set per-column TTL — CQL `USING TTL` applies to all cells
184    /// in a statement. Use JSON mutations to set different TTLs per column.
185    WriteWithTtl {
186        /// Column name
187        column: String,
188        /// Column value
189        value: Value,
190        /// Time-to-live in seconds
191        ttl_seconds: u32,
192    },
193    /// Delete a specific column
194    Delete {
195        /// Column name
196        column: String,
197    },
198    /// Delete entire row (row tombstone)
199    DeleteRow,
200}
201
202/// Partition key with multi-column support
203///
204/// Stores the partition key as a list of (column name, value) pairs.
205/// The order must match the schema's partition key definition.
206#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
207pub struct PartitionKey {
208    /// Column name and value pairs (in schema order)
209    pub columns: Vec<(String, Value)>,
210}
211
212impl PartitionKey {
213    /// Create a new partition key
214    pub fn new(columns: Vec<(String, Value)>) -> Self {
215        Self { columns }
216    }
217
218    /// Create a partition key from a single column
219    pub fn single(column: impl Into<String>, value: Value) -> Self {
220        Self {
221            columns: vec![(column.into(), value)],
222        }
223    }
224
225    /// Serialize partition key to bytes according to Cassandra's on-disk encoding.
226    ///
227    /// Single-component keys are written as raw value bytes.
228    /// Multi-component keys use `[len][value][0x00]` per component, including a
229    /// trailing `0x00` after the final component.
230    pub fn to_bytes(&self, schema: &TableSchema) -> Result<Vec<u8>> {
231        if self.columns.is_empty() {
232            return Err(Error::InvalidInput("Empty partition key".to_string()));
233        }
234
235        // Validate column count matches schema
236        if self.columns.len() != schema.partition_keys.len() {
237            return Err(Error::InvalidInput(format!(
238                "Partition key column count mismatch: expected {}, got {}",
239                schema.partition_keys.len(),
240                self.columns.len()
241            )));
242        }
243
244        let mut result = Vec::new();
245
246        // Single-component key: no length prefix
247        if self.columns.len() == 1 {
248            let value_bytes =
249                self.serialize_value(&self.columns[0].1, &schema.partition_keys[0])?;
250            result.extend_from_slice(&value_bytes);
251            return Ok(result);
252        }
253
254        // Multi-component partition keys use a `0x00` end-of-component marker
255        // after every component, including the last one.
256        for (i, (_, value)) in self.columns.iter().enumerate() {
257            let value_bytes = self.serialize_value(value, &schema.partition_keys[i])?;
258            let len = value_bytes.len();
259            if len > u16::MAX as usize {
260                return Err(Error::InvalidInput(format!(
261                    "Partition key component too large: {} bytes",
262                    len
263                )));
264            }
265            // 2-byte big-endian length prefix
266            result.extend_from_slice(&(len as u16).to_be_bytes());
267            result.extend_from_slice(&value_bytes);
268            result.push(0x00);
269        }
270
271        Ok(result)
272    }
273
274    /// Convert to DecoratedKey (token + raw bytes)
275    pub fn to_decorated_key(&self, schema: &TableSchema) -> Result<DecoratedKey> {
276        let key_bytes = self.to_bytes(schema)?;
277        let token = calculate_murmur3_token(&key_bytes)?;
278        Ok(DecoratedKey::new(token, key_bytes))
279    }
280
281    /// Deserialize partition key from raw bytes (inverse of `to_bytes`)
282    ///
283    /// Single-component keys are raw value bytes.
284    /// Multi-component keys use `[len:u16 BE][value bytes][0x00]` per component.
285    pub fn from_bytes(data: &[u8], schema: &TableSchema) -> Result<Self> {
286        if schema.partition_keys.is_empty() {
287            return Err(Error::InvalidInput(
288                "Schema has no partition keys".to_string(),
289            ));
290        }
291
292        if data.is_empty() {
293            return Err(Error::InvalidInput("Empty partition key bytes".to_string()));
294        }
295
296        // Delegate to the canonical codec shared with the read/scan path so the
297        // two never diverge (Issue #586).
298        let columns =
299            crate::storage::partition_key_codec::decode_partition_key_columns(data, schema)?;
300
301        Ok(PartitionKey { columns })
302    }
303
304    /// Serialize a single value to bytes according to its CQL type
305    fn serialize_value(
306        &self,
307        value: &Value,
308        key_column: &crate::schema::KeyColumn,
309    ) -> Result<Vec<u8>> {
310        // Get comparator type from schema
311        let comparator = ComparatorType::from_data_type(&key_column.data_type)?;
312
313        serialize_value_bytes(value, &comparator)
314    }
315}
316
317/// Clustering key with multi-column support
318///
319/// Stores the clustering key as a list of (column name, value) pairs.
320/// The order must match the schema's clustering key definition.
321#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
322pub struct ClusteringKey {
323    /// Column name and value pairs (in schema order)
324    pub columns: Vec<(String, Value)>,
325}
326
327impl ClusteringKey {
328    /// Create a new clustering key
329    pub fn new(columns: Vec<(String, Value)>) -> Self {
330        Self { columns }
331    }
332
333    /// Create a clustering key from a single column
334    pub fn single(column: impl Into<String>, value: Value) -> Self {
335        Self {
336            columns: vec![(column.into(), value)],
337        }
338    }
339
340    /// Compare two clustering keys according to schema-defined ordering
341    ///
342    /// Each clustering column can be ASC or DESC. This method requires
343    /// schema information to determine the correct ordering.
344    pub fn compare(&self, other: &Self, schema: &TableSchema) -> Result<Ordering> {
345        // Compare column by column according to schema ordering
346        for (i, ((_, a_val), (_, b_val))) in
347            self.columns.iter().zip(other.columns.iter()).enumerate()
348        {
349            if i >= schema.clustering_keys.len() {
350                return Err(Error::Schema(format!(
351                    "Clustering key has more columns than schema: {} > {}",
352                    i + 1,
353                    schema.clustering_keys.len()
354                )));
355            }
356
357            let cluster_col = &schema.clustering_keys[i];
358            let ordering = compare_values(a_val, b_val)?;
359
360            // Apply DESC ordering if specified in schema
361            let final_ordering = if cluster_col.order == ClusteringOrder::Desc {
362                ordering.reverse()
363            } else {
364                ordering
365            };
366
367            if final_ordering != Ordering::Equal {
368                return Ok(final_ordering);
369            }
370        }
371
372        Ok(Ordering::Equal)
373    }
374}
375
376impl Ord for ClusteringKey {
377    fn cmp(&self, other: &Self) -> Ordering {
378        // Fallback comparison without schema: lexicographic by value
379        // This is used for BTreeMap ordering in memtable.
380        // Schema-aware comparison should use `compare()` method.
381        for ((_, a_val), (_, b_val)) in self.columns.iter().zip(other.columns.iter()) {
382            let ordering = compare_values(a_val, b_val).unwrap_or_else(|_| {
383                // Type mismatch fallback: deterministic ordering via Debug representation.
384                // This only matters for BTreeMap key placement in memtable, not persistence.
385                // Heterogeneous SSTables (e.g. Frozen(List) vs List) must not crash the process.
386                format!("{a_val:?}").cmp(&format!("{b_val:?}"))
387            });
388            if ordering != Ordering::Equal {
389                return ordering;
390            }
391        }
392        self.columns.len().cmp(&other.columns.len())
393    }
394}
395
396impl PartialOrd for ClusteringKey {
397    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
398        Some(self.cmp(other))
399    }
400}
401
402impl Eq for ClusteringKey {}
403
404/// Decorated key: Murmur3 token + raw partition key bytes
405///
406/// This is the fundamental ordering key in Cassandra SSTables. Partitions are
407/// ordered first by token (i64), then by raw key bytes for collision resolution.
408///
409/// # Hash Collision Handling
410///
411/// While Murmur3 hash collisions are extremely rare in practice, the ordering
412/// implementation handles them correctly:
413/// 1. Primary ordering: by token (Murmur3 hash value)
414/// 2. Secondary ordering: by raw partition key bytes (for hash collisions)
415///
416/// This ensures deterministic, stable ordering even when two different partition
417/// keys produce the same token value.
418#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
419pub struct DecoratedKey {
420    /// Murmur3 hash token (i64)
421    pub token: i64,
422    /// Raw partition key bytes
423    pub key: Vec<u8>,
424}
425
426impl DecoratedKey {
427    /// Create a new decorated key
428    pub fn new(token: i64, key: Vec<u8>) -> Self {
429        Self { token, key }
430    }
431
432    /// Create a decorated key from raw partition key bytes
433    pub fn from_key_bytes(key_bytes: Vec<u8>) -> Result<Self> {
434        let token = calculate_murmur3_token(&key_bytes)?;
435        Ok(Self::new(token, key_bytes))
436    }
437}
438
439impl Ord for DecoratedKey {
440    fn cmp(&self, other: &Self) -> Ordering {
441        // Primary ordering: by token
442        match self.token.cmp(&other.token) {
443            Ordering::Equal => {
444                // Secondary ordering: by raw key bytes (for hash collisions)
445                self.key.cmp(&other.key)
446            }
447            other => other,
448        }
449    }
450}
451
452impl PartialOrd for DecoratedKey {
453    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
454        Some(self.cmp(other))
455    }
456}
457
458/// Calculate Murmur3 token from partition key bytes
459///
460/// Uses Cassandra's Murmur3Partitioner algorithm:
461/// 1. Compute Cassandra's `MurmurHash.hash3_x64_128`
462/// 2. Take `h1` as the signed token
463/// 3. Apply `normalize`: map `i64::MIN` → `i64::MAX` (Cassandra excludes MIN_VALUE)
464fn calculate_murmur3_token(key_bytes: &[u8]) -> Result<i64> {
465    if key_bytes.is_empty() {
466        return Ok(i64::MIN);
467    }
468
469    Ok(crate::util::cassandra_murmur3::cassandra_murmur3_token(
470        key_bytes,
471    ))
472}
473
474/// Serialize a Value to bytes according to its CQL type
475///
476/// This is used for partition key encoding and follows Cassandra's
477/// type-specific serialization rules.
478fn serialize_value_bytes(value: &Value, comparator: &ComparatorType) -> Result<Vec<u8>> {
479    match (value, comparator) {
480        (Value::Null, _) => Ok(Vec::new()),
481
482        (Value::Boolean(b), ComparatorType::Boolean) => Ok(vec![if *b { 1 } else { 0 }]),
483
484        (Value::TinyInt(n), ComparatorType::TinyInt) => Ok(vec![*n as u8]),
485
486        (Value::SmallInt(n), ComparatorType::SmallInt) => Ok(n.to_be_bytes().to_vec()),
487
488        (Value::Integer(n), ComparatorType::Int) => Ok(n.to_be_bytes().to_vec()),
489
490        (Value::BigInt(n), ComparatorType::BigInt) => Ok(n.to_be_bytes().to_vec()),
491
492        (Value::Counter(n), ComparatorType::Counter) => Ok(n.to_be_bytes().to_vec()),
493
494        (Value::Float32(f), ComparatorType::Float32) => Ok(f.to_bits().to_be_bytes().to_vec()),
495
496        (Value::Float(f), ComparatorType::Float) => Ok(f.to_bits().to_be_bytes().to_vec()),
497
498        (Value::Text(s), ComparatorType::Text) => Ok(s.as_bytes().to_vec()),
499
500        (Value::Blob(bytes), ComparatorType::Blob) => Ok(bytes.clone()),
501
502        (Value::Timestamp(millis), ComparatorType::Timestamp) => Ok(millis.to_be_bytes().to_vec()),
503
504        (Value::Date(days), ComparatorType::Date) => {
505            // Cassandra DATE: stored as unsigned int with Integer.MIN_VALUE offset
506            let stored = days.wrapping_sub(i32::MIN) as u32;
507            Ok(stored.to_be_bytes().to_vec())
508        }
509
510        (Value::Uuid(bytes), ComparatorType::Uuid) => Ok(bytes.to_vec()),
511
512        // Time and Inet are mapped to Custom types in ComparatorType
513        (Value::Time(nanos), ComparatorType::Custom(name)) if name == "time" => {
514            Ok(nanos.to_be_bytes().to_vec())
515        }
516
517        (Value::Inet(bytes), ComparatorType::Custom(name)) if name == "inet" => Ok(bytes.clone()),
518
519        (Value::Varint(bytes), ComparatorType::Varint) => Ok(bytes.clone()),
520
521        (Value::Decimal { scale, unscaled }, ComparatorType::Decimal) => {
522            // Decimal: [scale (4B BE i32)][unscaled bytes]
523            let mut result = Vec::new();
524            result.extend_from_slice(&scale.to_be_bytes());
525            result.extend_from_slice(unscaled);
526            Ok(result)
527        }
528
529        (
530            Value::Duration {
531                months,
532                days,
533                nanos,
534            },
535            ComparatorType::Duration,
536        ) => {
537            // Duration: [months (4B)][days (4B)][nanos (8B)]
538            let mut result = Vec::new();
539            result.extend_from_slice(&months.to_be_bytes());
540            result.extend_from_slice(&days.to_be_bytes());
541            result.extend_from_slice(&nanos.to_be_bytes());
542            Ok(result)
543        }
544
545        _ => Err(Error::InvalidInput(format!(
546            "Type mismatch: value {:?} does not match comparator {:?}",
547            value, comparator
548        ))),
549    }
550}
551
552/// Compare two values for ordering
553fn compare_values(a: &Value, b: &Value) -> Result<Ordering> {
554    use Value::*;
555
556    match (a, b) {
557        (Null, Null) => Ok(Ordering::Equal),
558        (Null, _) => Ok(Ordering::Less),
559        (_, Null) => Ok(Ordering::Greater),
560
561        (Boolean(a), Boolean(b)) => Ok(a.cmp(b)),
562        (TinyInt(a), TinyInt(b)) => Ok(a.cmp(b)),
563        (SmallInt(a), SmallInt(b)) => Ok(a.cmp(b)),
564        (Integer(a), Integer(b)) => Ok(a.cmp(b)),
565        (BigInt(a), BigInt(b)) => Ok(a.cmp(b)),
566        (Counter(a), Counter(b)) => Ok(a.cmp(b)),
567        (Float32(a), Float32(b)) => Ok(a.partial_cmp(b).unwrap_or(Ordering::Equal)),
568        (Float(a), Float(b)) => Ok(a.partial_cmp(b).unwrap_or(Ordering::Equal)),
569        (Text(a), Text(b)) => Ok(a.cmp(b)),
570        (Blob(a), Blob(b)) => Ok(a.cmp(b)),
571        (Timestamp(a), Timestamp(b)) => Ok(a.cmp(b)),
572        (Date(a), Date(b)) => Ok(a.cmp(b)),
573        (Time(a), Time(b)) => Ok(a.cmp(b)),
574        (Uuid(a), Uuid(b)) => Ok(a.cmp(b)),
575        (Inet(a), Inet(b)) => Ok(a.cmp(b)),
576
577        // Collection types (element-wise lexicographic comparison)
578        (List(a), List(b)) | (Set(a), Set(b)) => {
579            for (elem_a, elem_b) in a.iter().zip(b.iter()) {
580                let ord = compare_values(elem_a, elem_b)?;
581                if ord != Ordering::Equal {
582                    return Ok(ord);
583                }
584            }
585            Ok(a.len().cmp(&b.len()))
586        }
587        (Map(a), Map(b)) => {
588            for ((ka, va), (kb, vb)) in a.iter().zip(b.iter()) {
589                let key_ord = compare_values(ka, kb)?;
590                if key_ord != Ordering::Equal {
591                    return Ok(key_ord);
592                }
593                let val_ord = compare_values(va, vb)?;
594                if val_ord != Ordering::Equal {
595                    return Ok(val_ord);
596                }
597            }
598            Ok(a.len().cmp(&b.len()))
599        }
600        (Tuple(a), Tuple(b)) => {
601            for (fa, fb) in a.iter().zip(b.iter()) {
602                let ord = compare_values(fa, fb)?;
603                if ord != Ordering::Equal {
604                    return Ok(ord);
605                }
606            }
607            Ok(a.len().cmp(&b.len()))
608        }
609
610        // Frozen wrapper: compare inner values
611        (Frozen(a), Frozen(b)) => compare_values(a, b),
612
613        _ => Err(Error::InvalidInput(format!(
614            "Cannot compare values of different types: {:?} vs {:?}",
615            a, b
616        ))),
617    }
618}
619
620#[cfg(test)]
621mod tests {
622    use super::*;
623    use crate::schema::{ClusteringColumn, ClusteringOrder, KeyColumn};
624    use std::collections::HashMap;
625
626    fn create_test_schema(
627        partition_cols: Vec<(&str, &str)>,
628        clustering_cols: Vec<(&str, &str, ClusteringOrder)>,
629    ) -> TableSchema {
630        TableSchema {
631            keyspace: "test_ks".to_string(),
632            table: "test_table".to_string(),
633            partition_keys: partition_cols
634                .into_iter()
635                .enumerate()
636                .map(|(i, (name, data_type))| KeyColumn {
637                    name: name.to_string(),
638                    data_type: data_type.to_string(),
639                    position: i,
640                })
641                .collect(),
642            clustering_keys: clustering_cols
643                .into_iter()
644                .enumerate()
645                .map(|(i, (name, data_type, order))| ClusteringColumn {
646                    name: name.to_string(),
647                    data_type: data_type.to_string(),
648                    position: i,
649                    order,
650                })
651                .collect(),
652            columns: vec![],
653            comments: HashMap::new(),
654        }
655    }
656
657    #[test]
658    fn test_table_id() {
659        let table_id = TableId::new("my_keyspace", "my_table");
660        assert_eq!(table_id.keyspace, "my_keyspace");
661        assert_eq!(table_id.table, "my_table");
662        assert_eq!(table_id.qualified_name(), "my_keyspace.my_table");
663        assert_eq!(table_id.to_string(), "my_keyspace.my_table");
664    }
665
666    #[test]
667    fn test_partition_key_single_int() {
668        let schema = create_test_schema(vec![("id", "int")], vec![]);
669        let pk = PartitionKey::single("id", Value::Integer(42));
670
671        let bytes = pk.to_bytes(&schema).unwrap();
672        // Single component: no length prefix, just 4-byte big-endian int
673        assert_eq!(bytes, vec![0x00, 0x00, 0x00, 0x2A]);
674    }
675
676    #[test]
677    fn test_partition_key_multi_component() {
678        let schema = create_test_schema(vec![("id", "int"), ("name", "text")], vec![]);
679        let pk = PartitionKey::new(vec![
680            ("id".to_string(), Value::Integer(42)),
681            ("name".to_string(), Value::Text("hello".to_string())),
682        ]);
683
684        let bytes = pk.to_bytes(&schema).unwrap();
685        // Multi-component partition key format:
686        // [len1(2B)][val1][0x00][len2(2B)][val2][0x00]
687        let expected = vec![
688            0x00, 0x04, // len1 = 4
689            0x00, 0x00, 0x00, 0x2A, // int = 42
690            0x00, // end-of-component after component 1
691            0x00, 0x05, // len2 = 5
692            b'h', b'e', b'l', b'l', b'o', // text = "hello"
693            0x00, // end-of-component after component 2
694        ];
695        assert_eq!(bytes, expected);
696    }
697
698    #[test]
699    fn test_partition_key_three_components() {
700        // Issue #438: Verify 3-component composite keys (e.g., tick_data table)
701        let schema = create_test_schema(
702            vec![("symbol", "text"), ("exchange", "text"), ("bucket", "int")],
703            vec![],
704        );
705        let pk = PartitionKey::new(vec![
706            ("symbol".to_string(), Value::Text("AAPL".to_string())),
707            ("exchange".to_string(), Value::Text("NYSE".to_string())),
708            ("bucket".to_string(), Value::Integer(100)),
709        ]);
710
711        let bytes = pk.to_bytes(&schema).unwrap();
712        // Composite partition key format: end-of-component byte after every component
713        let expected = vec![
714            0x00, 0x04, // len1 = 4
715            b'A', b'A', b'P', b'L', // "AAPL"
716            0x00, // end-of-component
717            0x00, 0x04, // len2 = 4
718            b'N', b'Y', b'S', b'E', // "NYSE"
719            0x00, // end-of-component
720            0x00, 0x04, // len3 = 4
721            0x00, 0x00, 0x00, 0x64, // int = 100
722            0x00, // end-of-component
723        ];
724        assert_eq!(bytes, expected);
725    }
726
727    #[test]
728    fn test_decorated_key_ordering() {
729        let dk1 = DecoratedKey::new(100, vec![1, 2, 3]);
730        let dk2 = DecoratedKey::new(200, vec![1, 2, 3]);
731        let dk3 = DecoratedKey::new(100, vec![1, 2, 4]);
732
733        // Order by token first
734        assert!(dk1 < dk2);
735        assert!(dk2 > dk1);
736
737        // Equal tokens: order by key bytes
738        assert!(dk1 < dk3);
739        assert!(dk3 > dk1);
740
741        // Equal tokens and keys
742        let dk4 = DecoratedKey::new(100, vec![1, 2, 3]);
743        assert_eq!(dk1, dk4);
744    }
745
746    #[test]
747    fn test_murmur3_token_empty_key() {
748        let token = calculate_murmur3_token(&[]).unwrap();
749        assert_eq!(token, i64::MIN);
750    }
751
752    #[test]
753    fn test_murmur3_token_deterministic() {
754        let key_bytes = b"test_key";
755        let token1 = calculate_murmur3_token(key_bytes).unwrap();
756        let token2 = calculate_murmur3_token(key_bytes).unwrap();
757        assert_eq!(token1, token2, "Token calculation should be deterministic");
758    }
759
760    #[test]
761    fn test_murmur3_token_different_keys() {
762        let token1 = calculate_murmur3_token(b"key1").unwrap();
763        let token2 = calculate_murmur3_token(b"key2").unwrap();
764        assert_ne!(
765            token1, token2,
766            "Different keys should produce different tokens"
767        );
768    }
769
770    #[test]
771    fn test_murmur3_token_matches_cassandra_for_composite_uuid_key() {
772        // Verified against Cassandra 5.0:
773        // SELECT token(tenant_id, user_id) FROM issue438_probe.multi_pk_raw;
774        let key_bytes = vec![
775            0x00, 0x10, 0x0f, 0x0f, 0x0f, 0x0f, 0x00, 0x00, 0x40, 0x00, 0x80, 0x00, 0x00, 0x00,
776            0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x10, 0x0f, 0x0f, 0x0f, 0x0f, 0x00, 0x00, 0x40,
777            0x00, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xaa, 0x00,
778        ];
779
780        let token = calculate_murmur3_token(&key_bytes).unwrap();
781        assert_eq!(token, -5_116_541_970_184_546_410);
782    }
783
784    #[test]
785    fn test_decorated_key_from_bytes() {
786        let key_bytes = vec![0x00, 0x00, 0x00, 0x2A]; // int = 42
787        let dk = DecoratedKey::from_key_bytes(key_bytes.clone()).unwrap();
788
789        assert_eq!(dk.key, key_bytes);
790        // Token should be calculated consistently
791        let expected_token = calculate_murmur3_token(&key_bytes).unwrap();
792        assert_eq!(dk.token, expected_token);
793    }
794
795    #[test]
796    fn test_clustering_key_ordering() {
797        let schema = create_test_schema(
798            vec![("id", "int")],
799            vec![("ts", "timestamp", ClusteringOrder::Asc)],
800        );
801
802        let ck1 = ClusteringKey::single("ts", Value::Timestamp(1000));
803        let ck2 = ClusteringKey::single("ts", Value::Timestamp(2000));
804
805        let ordering = ck1.compare(&ck2, &schema).unwrap();
806        assert_eq!(ordering, Ordering::Less);
807    }
808
809    #[test]
810    fn test_clustering_key_desc_ordering() {
811        let schema = create_test_schema(
812            vec![("id", "int")],
813            vec![("ts", "timestamp", ClusteringOrder::Desc)],
814        );
815
816        let ck1 = ClusteringKey::single("ts", Value::Timestamp(1000));
817        let ck2 = ClusteringKey::single("ts", Value::Timestamp(2000));
818
819        let ordering = ck1.compare(&ck2, &schema).unwrap();
820        // DESC ordering reverses the comparison
821        assert_eq!(ordering, Ordering::Greater);
822    }
823
824    #[test]
825    fn test_mutation_creation() {
826        let table_id = TableId::new("ks", "table");
827        let pk = PartitionKey::single("id", Value::Integer(1));
828        let ops = vec![CellOperation::Write {
829            column: "name".to_string(),
830            value: Value::Text("Alice".to_string()),
831        }];
832
833        let mutation = Mutation::new(table_id.clone(), pk, None, ops, 1234567890, None);
834
835        assert_eq!(mutation.table.keyspace, "ks");
836        assert_eq!(mutation.table.table, "table");
837        assert_eq!(mutation.timestamp_micros, 1234567890);
838        assert_eq!(mutation.ttl_seconds, None);
839        assert_eq!(mutation.operations.len(), 1);
840    }
841
842    #[test]
843    fn test_cell_operation_write() {
844        let op = CellOperation::Write {
845            column: "age".to_string(),
846            value: Value::Integer(30),
847        };
848
849        match op {
850            CellOperation::Write { column, value } => {
851                assert_eq!(column, "age");
852                assert_eq!(value, Value::Integer(30));
853            }
854            _ => panic!("Expected Write operation"),
855        }
856    }
857
858    #[test]
859    fn test_cell_operation_delete() {
860        let op = CellOperation::Delete {
861            column: "name".to_string(),
862        };
863
864        match op {
865            CellOperation::Delete { column } => {
866                assert_eq!(column, "name");
867            }
868            _ => panic!("Expected Delete operation"),
869        }
870    }
871
872    #[test]
873    fn test_cell_operation_delete_row() {
874        let op = CellOperation::DeleteRow;
875        assert!(matches!(op, CellOperation::DeleteRow));
876    }
877
878    #[test]
879    fn test_serialize_value_types() {
880        // Boolean
881        let bytes = serialize_value_bytes(&Value::Boolean(true), &ComparatorType::Boolean).unwrap();
882        assert_eq!(bytes, vec![1]);
883
884        // Integer
885        let bytes = serialize_value_bytes(&Value::Integer(42), &ComparatorType::Int).unwrap();
886        assert_eq!(bytes, vec![0x00, 0x00, 0x00, 0x2A]);
887
888        // Text
889        let bytes = serialize_value_bytes(&Value::Text("hello".to_string()), &ComparatorType::Text)
890            .unwrap();
891        assert_eq!(bytes, b"hello");
892
893        // UUID
894        let uuid_bytes = [
895            0x01, 0x23, 0x45, 0x67, 0x89, 0xAB, 0xCD, 0xEF, 0x01, 0x23, 0x45, 0x67, 0x89, 0xAB,
896            0xCD, 0xEF,
897        ];
898        let bytes = serialize_value_bytes(&Value::Uuid(uuid_bytes), &ComparatorType::Uuid).unwrap();
899        assert_eq!(bytes, uuid_bytes);
900    }
901
902    #[test]
903    fn test_compare_values() {
904        assert_eq!(
905            compare_values(&Value::Integer(1), &Value::Integer(2)).unwrap(),
906            Ordering::Less
907        );
908        assert_eq!(
909            compare_values(&Value::Integer(2), &Value::Integer(1)).unwrap(),
910            Ordering::Greater
911        );
912        assert_eq!(
913            compare_values(&Value::Integer(1), &Value::Integer(1)).unwrap(),
914            Ordering::Equal
915        );
916
917        // Null comparison
918        assert_eq!(
919            compare_values(&Value::Null, &Value::Integer(1)).unwrap(),
920            Ordering::Less
921        );
922        assert_eq!(
923            compare_values(&Value::Integer(1), &Value::Null).unwrap(),
924            Ordering::Greater
925        );
926    }
927
928    #[test]
929    fn test_partition_key_to_decorated_key() {
930        let schema = create_test_schema(vec![("id", "int")], vec![]);
931        let pk = PartitionKey::single("id", Value::Integer(42));
932
933        let dk = pk.to_decorated_key(&schema).unwrap();
934        assert_eq!(dk.key, vec![0x00, 0x00, 0x00, 0x2A]);
935
936        // Token should match direct calculation
937        let expected_token = calculate_murmur3_token(&dk.key).unwrap();
938        assert_eq!(dk.token, expected_token);
939    }
940
941    #[test]
942    fn test_murmur3_token_cassandra_compatibility() {
943        // Test known token values from Cassandra to validate our implementation
944        // These values were generated by Cassandra 5.0 Murmur3Partitioner
945
946        // Test case 1: int value 1
947        let key1 = vec![0x00, 0x00, 0x00, 0x01];
948        let token1 = calculate_murmur3_token(&key1).unwrap();
949        // Cassandra produces deterministic tokens for the same input
950        // The exact value depends on Murmur3 algorithm implementation
951        assert_ne!(token1, 0, "Token should not be zero for non-zero input");
952
953        // Test case 2: int value 100
954        let key2 = vec![0x00, 0x00, 0x00, 0x64];
955        let token2 = calculate_murmur3_token(&key2).unwrap();
956        assert_ne!(
957            token2, token1,
958            "Different keys should produce different tokens"
959        );
960
961        // Test case 3: text value "test"
962        let key3 = b"test";
963        let token3 = calculate_murmur3_token(key3).unwrap();
964        assert_ne!(token3, token1);
965        assert_ne!(token3, token2);
966
967        // Test consistency: same key should always produce same token
968        let token1_repeat = calculate_murmur3_token(&key1).unwrap();
969        assert_eq!(token1, token1_repeat, "Tokens must be deterministic");
970    }
971
972    #[test]
973    fn test_decorated_key_btree_ordering() {
974        // Verify that DecoratedKey ordering is correct for use in BTreeMap
975        use std::collections::BTreeMap;
976
977        let mut map = BTreeMap::new();
978
979        // Insert keys in non-sorted order
980        let dk3 = DecoratedKey::new(300, vec![3]);
981        let dk1 = DecoratedKey::new(100, vec![1]);
982        let dk2 = DecoratedKey::new(200, vec![2]);
983
984        map.insert(dk3.clone(), "value3");
985        map.insert(dk1.clone(), "value1");
986        map.insert(dk2.clone(), "value2");
987
988        // Verify BTreeMap orders by token
989        let keys: Vec<_> = map.keys().collect();
990        assert_eq!(keys[0].token, 100);
991        assert_eq!(keys[1].token, 200);
992        assert_eq!(keys[2].token, 300);
993    }
994
995    #[test]
996    fn test_decorated_key_hash_collision_handling() {
997        // Test Issue #406: Explicit hash collision scenario
998        // When two different keys produce the same token (extremely rare but possible),
999        // they should be ordered by raw key bytes to ensure deterministic ordering.
1000
1001        let token = 12345_i64; // Shared token value (simulated collision)
1002
1003        let dk1 = DecoratedKey::new(token, vec![0x00, 0x01, 0x02]); // Key A
1004        let dk2 = DecoratedKey::new(token, vec![0x00, 0x01, 0x03]); // Key B (differs in last byte)
1005        let dk3 = DecoratedKey::new(token, vec![0x00, 0x01, 0x02]); // Key C (identical to A)
1006
1007        // Equal tokens: order by key bytes
1008        assert!(dk1 < dk2, "Keys with same token should order by bytes");
1009        assert!(dk2 > dk1, "Key comparison should be consistent");
1010        assert_eq!(
1011            dk1.cmp(&dk3),
1012            Ordering::Equal,
1013            "Identical keys should be equal"
1014        );
1015
1016        // Verify ordering is stable in BTreeMap
1017        use std::collections::BTreeMap;
1018        let mut map = BTreeMap::new();
1019
1020        map.insert(dk2.clone(), "value2");
1021        map.insert(dk1.clone(), "value1");
1022        map.insert(dk3.clone(), "value3"); // Overwrites dk1 (same key)
1023
1024        // Should have 2 entries (dk1/dk3 are same key)
1025        assert_eq!(map.len(), 2);
1026
1027        // Verify ordering by raw bytes
1028        let keys: Vec<_> = map.keys().collect();
1029        assert_eq!(keys[0].key, vec![0x00, 0x01, 0x02]); // dk1/dk3
1030        assert_eq!(keys[1].key, vec![0x00, 0x01, 0x03]); // dk2
1031    }
1032
1033    #[test]
1034    fn test_clustering_key_ord_valid_comparison() {
1035        // Test Issue #409: Valid comparisons work correctly
1036        let ck1 = ClusteringKey::single("ts", Value::Timestamp(1000));
1037        let ck2 = ClusteringKey::single("ts", Value::Timestamp(2000));
1038        let ck3 = ClusteringKey::single("ts", Value::Timestamp(1000));
1039
1040        // Basic ordering
1041        assert_eq!(ck1.cmp(&ck2), Ordering::Less);
1042        assert_eq!(ck2.cmp(&ck1), Ordering::Greater);
1043        assert_eq!(ck1.cmp(&ck3), Ordering::Equal);
1044
1045        // Multi-column clustering key
1046        let ck_multi1 = ClusteringKey::new(vec![
1047            ("year".to_string(), Value::Integer(2024)),
1048            ("month".to_string(), Value::SmallInt(1)),
1049        ]);
1050        let ck_multi2 = ClusteringKey::new(vec![
1051            ("year".to_string(), Value::Integer(2024)),
1052            ("month".to_string(), Value::SmallInt(2)),
1053        ]);
1054
1055        assert_eq!(ck_multi1.cmp(&ck_multi2), Ordering::Less);
1056    }
1057
1058    #[test]
1059    fn test_clustering_key_ord_type_mismatch_is_total_and_does_not_panic() {
1060        // Issue #458/#465: `Ord for ClusteringKey` MUST NOT panic on a type mismatch.
1061        // Heterogeneous SSTables can produce mismatched clustering value types, and a
1062        // panic in `cmp` would crash the memtable BTreeMap. Instead, the implementation
1063        // falls back to a deterministic ordering. This test confirms it is panic-free,
1064        // total (antisymmetric), and deterministic.
1065        let ck1 = ClusteringKey::single("ts", Value::Timestamp(1000));
1066        let ck2 = ClusteringKey::single("ts", Value::Integer(2000)); // Different type
1067
1068        // Must not panic.
1069        let ord_12 = ck1.cmp(&ck2);
1070        let ord_21 = ck2.cmp(&ck1);
1071
1072        // Determinism: repeated comparisons return the same result.
1073        assert_eq!(ord_12, ck1.cmp(&ck2), "comparison must be deterministic");
1074
1075        // Antisymmetry / totality: a<b implies b>a (and the mismatch is never reported
1076        // as Equal, since the underlying values differ).
1077        assert_ne!(
1078            ord_12,
1079            Ordering::Equal,
1080            "mismatched types must not compare Equal"
1081        );
1082        assert_eq!(
1083            ord_12.reverse(),
1084            ord_21,
1085            "ordering must be antisymmetric (a.cmp(b) == b.cmp(a).reverse())"
1086        );
1087
1088        // Reflexivity: a key compares Equal to itself even across the fallback path.
1089        assert_eq!(ck1.cmp(&ck1), Ordering::Equal);
1090
1091        // It must remain usable as a BTreeMap key without panicking.
1092        use std::collections::BTreeMap;
1093        let mut map = BTreeMap::new();
1094        map.insert(ck1.clone(), "a");
1095        map.insert(ck2.clone(), "b");
1096        assert_eq!(map.len(), 2, "both distinct keys should be retained");
1097    }
1098
1099    #[test]
1100    fn test_clustering_key_ord_btree_ordering() {
1101        // Test Issue #409: Verify ClusteringKey works correctly in BTreeMap
1102        use std::collections::BTreeMap;
1103
1104        let mut map = BTreeMap::new();
1105
1106        let ck3 = ClusteringKey::single("ts", Value::Timestamp(3000));
1107        let ck1 = ClusteringKey::single("ts", Value::Timestamp(1000));
1108        let ck2 = ClusteringKey::single("ts", Value::Timestamp(2000));
1109
1110        // Insert in non-sorted order
1111        map.insert(ck3.clone(), "value3");
1112        map.insert(ck1.clone(), "value1");
1113        map.insert(ck2.clone(), "value2");
1114
1115        // Verify BTreeMap orders correctly
1116        let values: Vec<_> = map.values().copied().collect();
1117        assert_eq!(values, vec!["value1", "value2", "value3"]);
1118    }
1119
1120    #[test]
1121    fn test_compare_frozen_list_values() {
1122        // Issue #437: Frozen collection clustering keys must be comparable
1123        let list_a = Value::Frozen(Box::new(Value::List(vec![
1124            Value::Text("a".to_string()),
1125            Value::Text("b".to_string()),
1126        ])));
1127        let list_b = Value::Frozen(Box::new(Value::List(vec![
1128            Value::Text("a".to_string()),
1129            Value::Text("c".to_string()),
1130        ])));
1131        let list_c = Value::Frozen(Box::new(Value::List(vec![
1132            Value::Text("a".to_string()),
1133            Value::Text("b".to_string()),
1134            Value::Text("c".to_string()),
1135        ])));
1136
1137        // Same elements: equal
1138        assert_eq!(compare_values(&list_a, &list_a).unwrap(), Ordering::Equal);
1139        // Different second element: a < c
1140        assert_eq!(compare_values(&list_a, &list_b).unwrap(), Ordering::Less);
1141        assert_eq!(compare_values(&list_b, &list_a).unwrap(), Ordering::Greater);
1142        // Prefix match, shorter < longer
1143        assert_eq!(compare_values(&list_a, &list_c).unwrap(), Ordering::Less);
1144        assert_eq!(compare_values(&list_c, &list_a).unwrap(), Ordering::Greater);
1145    }
1146
1147    #[test]
1148    fn test_frozen_list_clustering_key_btree_ordering() {
1149        // Issue #437: Frozen list clustering keys must sort correctly in BTreeMap
1150        use std::collections::BTreeMap;
1151
1152        let mut map = BTreeMap::new();
1153
1154        // Create clustering keys with frozen lists of varying sizes (mimics test data generator)
1155        let ck_2elem = ClusteringKey::single(
1156            "tags",
1157            Value::Frozen(Box::new(Value::List(vec![
1158                Value::Text("ck_0_0".to_string()),
1159                Value::Text("ck_0_1".to_string()),
1160            ]))),
1161        );
1162        let ck_3elem = ClusteringKey::single(
1163            "tags",
1164            Value::Frozen(Box::new(Value::List(vec![
1165                Value::Text("ck_1_0".to_string()),
1166                Value::Text("ck_1_1".to_string()),
1167                Value::Text("ck_1_2".to_string()),
1168            ]))),
1169        );
1170        let ck_4elem = ClusteringKey::single(
1171            "tags",
1172            Value::Frozen(Box::new(Value::List(vec![
1173                Value::Text("ck_2_0".to_string()),
1174                Value::Text("ck_2_1".to_string()),
1175                Value::Text("ck_2_2".to_string()),
1176                Value::Text("ck_2_3".to_string()),
1177            ]))),
1178        );
1179
1180        // Insert in non-sorted order
1181        map.insert(ck_4elem.clone(), "4elem");
1182        map.insert(ck_2elem.clone(), "2elem");
1183        map.insert(ck_3elem.clone(), "3elem");
1184
1185        // All three should be distinct keys (no deduplication)
1186        assert_eq!(map.len(), 3, "All frozen list CKs should be distinct");
1187
1188        // Verify ordering: ck_0_* < ck_1_* < ck_2_* (lexicographic by first element)
1189        let values: Vec<_> = map.values().copied().collect();
1190        assert_eq!(values, vec!["2elem", "3elem", "4elem"]);
1191    }
1192
1193    #[test]
1194    fn test_partition_key_from_bytes_single_int() {
1195        let schema = TableSchema {
1196            keyspace: "ks".to_string(),
1197            table: "tbl".to_string(),
1198            partition_keys: vec![KeyColumn {
1199                name: "id".to_string(),
1200                data_type: "int".to_string(),
1201                position: 0,
1202            }],
1203            clustering_keys: vec![],
1204            columns: vec![],
1205            comments: HashMap::new(),
1206        };
1207
1208        let original = PartitionKey::single("id", Value::Integer(42));
1209        let bytes = original.to_bytes(&schema).unwrap();
1210        let decoded = PartitionKey::from_bytes(&bytes, &schema).unwrap();
1211        assert_eq!(original, decoded);
1212    }
1213
1214    #[test]
1215    fn test_partition_key_from_bytes_single_uuid() {
1216        let schema = TableSchema {
1217            keyspace: "ks".to_string(),
1218            table: "tbl".to_string(),
1219            partition_keys: vec![KeyColumn {
1220                name: "id".to_string(),
1221                data_type: "uuid".to_string(),
1222                position: 0,
1223            }],
1224            clustering_keys: vec![],
1225            columns: vec![],
1226            comments: HashMap::new(),
1227        };
1228
1229        let uuid_bytes = [1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
1230        let original = PartitionKey::single("id", Value::Uuid(uuid_bytes));
1231        let bytes = original.to_bytes(&schema).unwrap();
1232        let decoded = PartitionKey::from_bytes(&bytes, &schema).unwrap();
1233        assert_eq!(original, decoded);
1234    }
1235
1236    #[test]
1237    fn test_partition_key_from_bytes_single_text() {
1238        let schema = TableSchema {
1239            keyspace: "ks".to_string(),
1240            table: "tbl".to_string(),
1241            partition_keys: vec![KeyColumn {
1242                name: "name".to_string(),
1243                data_type: "text".to_string(),
1244                position: 0,
1245            }],
1246            clustering_keys: vec![],
1247            columns: vec![],
1248            comments: HashMap::new(),
1249        };
1250
1251        let original = PartitionKey::single("name", Value::Text("hello".to_string()));
1252        let bytes = original.to_bytes(&schema).unwrap();
1253        let decoded = PartitionKey::from_bytes(&bytes, &schema).unwrap();
1254        assert_eq!(original, decoded);
1255    }
1256
1257    #[test]
1258    fn test_partition_key_from_bytes_multi_component() {
1259        let schema = TableSchema {
1260            keyspace: "ks".to_string(),
1261            table: "tbl".to_string(),
1262            partition_keys: vec![
1263                KeyColumn {
1264                    name: "tenant".to_string(),
1265                    data_type: "text".to_string(),
1266                    position: 0,
1267                },
1268                KeyColumn {
1269                    name: "id".to_string(),
1270                    data_type: "int".to_string(),
1271                    position: 1,
1272                },
1273            ],
1274            clustering_keys: vec![],
1275            columns: vec![],
1276            comments: HashMap::new(),
1277        };
1278
1279        let original = PartitionKey::new(vec![
1280            ("tenant".to_string(), Value::Text("acme".to_string())),
1281            ("id".to_string(), Value::Integer(99)),
1282        ]);
1283        let bytes = original.to_bytes(&schema).unwrap();
1284        let decoded = PartitionKey::from_bytes(&bytes, &schema).unwrap();
1285        assert_eq!(original, decoded);
1286    }
1287
1288    #[test]
1289    fn test_partition_key_from_bytes_empty_errors() {
1290        let schema = TableSchema {
1291            keyspace: "ks".to_string(),
1292            table: "tbl".to_string(),
1293            partition_keys: vec![KeyColumn {
1294                name: "id".to_string(),
1295                data_type: "int".to_string(),
1296                position: 0,
1297            }],
1298            clustering_keys: vec![],
1299            columns: vec![],
1300            comments: HashMap::new(),
1301        };
1302
1303        assert!(PartitionKey::from_bytes(&[], &schema).is_err());
1304    }
1305
1306    #[test]
1307    fn test_clustering_key_cmp_type_mismatch_does_not_panic() {
1308        // Two ClusteringKeys whose sole column has mismatched value types.
1309        // This can happen with heterogeneous SSTables (e.g. Frozen(List) vs List).
1310        // The Ord impl must not panic; it must produce a deterministic result.
1311        let key_a = ClusteringKey {
1312            columns: vec![("col".to_string(), Value::Integer(1))],
1313        };
1314        let key_b = ClusteringKey {
1315            columns: vec![("col".to_string(), Value::Text("1".to_string()))],
1316        };
1317
1318        // Must not panic.
1319        let first = key_a.cmp(&key_b);
1320        // Must be deterministic: same result on repeated calls.
1321        let second = key_a.cmp(&key_b);
1322        assert_eq!(first, second);
1323
1324        // Reflexive: a key compared against itself is Equal.
1325        assert_eq!(key_a.cmp(&key_a), Ordering::Equal);
1326    }
1327}