Skip to main content

cqlite_core/storage/sstable/
key_digest.rs

1//! Cassandra-compatible partition key digest computation
2//!
3//! This module implements the exact key digest algorithm used by Cassandra
4//! for Index.db storage. The digest is computed by:
5//! 1. Parsing partition key bytes according to the schema comparators
6//! 2. Creating byte-comparable encoding of the key components
7//! 3. Computing Murmur3 hash of the byte-comparable representation
8//! 4. Returning the digest bytes in the format expected by Index.db
9
10use crate::error::{Error, Result};
11use crate::schema::registry::ParsingContext;
12use crate::storage::sstable::bti::encoder::ByteComparableEncoder;
13use crate::types::{ComparatorType, Value};
14use murmur3::murmur3_32;
15use std::io::Cursor;
16
17/// Cassandra-compatible key digest computer
18///
19/// This struct provides the exact key digest computation algorithm used by
20/// Cassandra for partition key hashing in Index.db files.
21pub struct KeyDigestComputer {
22    encoder: ByteComparableEncoder,
23}
24
25impl KeyDigestComputer {
26    /// Create a new key digest computer
27    pub fn new() -> Self {
28        Self {
29            encoder: ByteComparableEncoder::new(),
30        }
31    }
32
33    /// Compute the partition key digest for Index.db lookup
34    ///
35    /// This method implements the exact Cassandra algorithm:
36    /// 1. Parse the partition key bytes according to schema comparators
37    /// 2. Create byte-comparable encoding for each component
38    /// 3. Combine components into a single byte-comparable key
39    /// 4. Compute Murmur3 hash with seed 0 (Cassandra default)
40    /// 5. Return the hash as little-endian bytes
41    pub fn compute_partition_key_digest(
42        &mut self,
43        partition_key_bytes: &[u8],
44        parsing_context: &ParsingContext,
45    ) -> Result<Vec<u8>> {
46        // Step 1: Parse partition key bytes into typed values
47        let partition_values = self.parse_partition_key_bytes(
48            partition_key_bytes,
49            &parsing_context.partition_comparators,
50        )?;
51
52        // Step 2: Create byte-comparable encoding for the composite key
53        let byte_comparable_key = self.encoder.encode_composite_key(&partition_values)?;
54
55        // Step 3: Compute Murmur3 hash with seed 0 (Cassandra standard)
56        let mut cursor = Cursor::new(&byte_comparable_key);
57        let hash = murmur3_32(&mut cursor, 0)
58            .map_err(|e| Error::corruption(format!("Failed to compute Murmur3 hash: {}", e)))?;
59
60        // Step 4: Return hash as little-endian bytes (Cassandra format)
61        Ok(hash.to_le_bytes().to_vec())
62    }
63
64    /// Parse partition key bytes into typed values according to comparators
65    ///
66    /// This method handles both single and multi-component partition keys,
67    /// parsing each component according to its type comparator.
68    fn parse_partition_key_bytes(
69        &self,
70        key_bytes: &[u8],
71        partition_comparators: &[ComparatorType],
72    ) -> Result<Vec<Value>> {
73        if partition_comparators.is_empty() {
74            return Err(Error::Schema(
75                "No partition key comparators provided".to_string(),
76            ));
77        }
78
79        // Handle single component partition key
80        if partition_comparators.len() == 1 {
81            let value = self.parse_value_bytes(key_bytes, &partition_comparators[0])?;
82            return Ok(vec![value]);
83        }
84
85        // Handle multi-component partition key
86        // Multi-component keys are encoded as:
87        //   [len][bytes][0x00][len][bytes][0x00]...
88        // with a trailing `0x00` after the final component as well.
89        let mut values = Vec::new();
90        let mut offset = 0;
91
92        for (index, comparator) in partition_comparators.iter().enumerate() {
93            if offset >= key_bytes.len() {
94                return Err(Error::corruption(
95                    "Insufficient bytes for multi-component partition key".to_string(),
96                ));
97            }
98
99            // Read component length (2 bytes, big-endian)
100            if offset + 2 > key_bytes.len() {
101                return Err(Error::corruption(
102                    "Invalid component length in partition key".to_string(),
103                ));
104            }
105
106            let component_len =
107                u16::from_be_bytes([key_bytes[offset], key_bytes[offset + 1]]) as usize;
108            offset += 2;
109
110            // Read component bytes
111            if offset + component_len > key_bytes.len() {
112                return Err(Error::corruption(
113                    "Component length exceeds available bytes".to_string(),
114                ));
115            }
116
117            let component_bytes = &key_bytes[offset..offset + component_len];
118            let value = self.parse_value_bytes(component_bytes, comparator)?;
119            values.push(value);
120            offset += component_len;
121
122            if offset >= key_bytes.len() {
123                return Err(Error::corruption(
124                    "Missing end-of-component marker in multi-component partition key".to_string(),
125                ));
126            }
127            if key_bytes[offset] != 0x00 {
128                return Err(Error::corruption(
129                    "Invalid end-of-component marker in multi-component partition key".to_string(),
130                ));
131            }
132            offset += 1;
133
134            if index + 1 == partition_comparators.len() && offset != key_bytes.len() {
135                return Err(Error::corruption(
136                    "Unexpected trailing bytes in multi-component partition key".to_string(),
137                ));
138            }
139        }
140
141        Ok(values)
142    }
143
144    /// Parse bytes for a single value according to its comparator type
145    fn parse_value_bytes(&self, bytes: &[u8], comparator: &ComparatorType) -> Result<Value> {
146        match comparator {
147            ComparatorType::Boolean => {
148                if bytes.len() != 1 {
149                    return Err(Error::corruption("Invalid boolean bytes".to_string()));
150                }
151                Ok(Value::Boolean(bytes[0] != 0))
152            }
153            ComparatorType::TinyInt => {
154                if bytes.len() != 1 {
155                    return Err(Error::corruption("Invalid tinyint bytes".to_string()));
156                }
157                Ok(Value::TinyInt(bytes[0] as i8))
158            }
159            ComparatorType::SmallInt => {
160                if bytes.len() != 2 {
161                    return Err(Error::corruption("Invalid smallint bytes".to_string()));
162                }
163                let value = i16::from_be_bytes([bytes[0], bytes[1]]);
164                Ok(Value::SmallInt(value))
165            }
166            ComparatorType::Int => {
167                if bytes.len() != 4 {
168                    return Err(Error::corruption("Invalid int bytes".to_string()));
169                }
170                let value = i32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
171                Ok(Value::Integer(value))
172            }
173            ComparatorType::BigInt => {
174                if bytes.len() != 8 {
175                    return Err(Error::corruption("Invalid bigint bytes".to_string()));
176                }
177                let value = i64::from_be_bytes([
178                    bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
179                ]);
180                Ok(Value::BigInt(value))
181            }
182            ComparatorType::Counter => {
183                if bytes.len() != 8 {
184                    return Err(Error::corruption("Invalid counter bytes".to_string()));
185                }
186                let value = i64::from_be_bytes([
187                    bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
188                ]);
189                Ok(Value::Counter(value))
190            }
191            ComparatorType::Float32 => {
192                if bytes.len() != 4 {
193                    return Err(Error::corruption("Invalid float32 bytes".to_string()));
194                }
195                let bits = u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
196                let value = f32::from_bits(bits);
197                Ok(Value::Float32(value))
198            }
199            ComparatorType::Float => {
200                if bytes.len() != 8 {
201                    return Err(Error::corruption("Invalid float bytes".to_string()));
202                }
203                let bits = u64::from_be_bytes([
204                    bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
205                ]);
206                let value = f64::from_bits(bits);
207                Ok(Value::Float(value))
208            }
209            ComparatorType::Text => {
210                let text = String::from_utf8(bytes.to_vec())
211                    .map_err(|e| Error::corruption(format!("Invalid UTF-8 in text: {}", e)))?;
212                Ok(Value::Text(text))
213            }
214            ComparatorType::Blob => Ok(Value::Blob(bytes.to_vec())),
215            ComparatorType::Timestamp => {
216                if bytes.len() != 8 {
217                    return Err(Error::corruption("Invalid timestamp bytes".to_string()));
218                }
219                let millis = i64::from_be_bytes([
220                    bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
221                ]);
222                Ok(Value::Timestamp(millis))
223            }
224            ComparatorType::Uuid => {
225                if bytes.len() != 16 {
226                    return Err(Error::corruption("Invalid UUID bytes".to_string()));
227                }
228                let uuid_bytes: [u8; 16] = bytes
229                    .try_into()
230                    .map_err(|_| Error::invalid_format("Invalid UUID byte length"))?;
231                Ok(Value::Uuid(uuid_bytes))
232            }
233            ComparatorType::Date => {
234                if bytes.len() != 4 {
235                    return Err(Error::corruption("Invalid date bytes".to_string()));
236                }
237                // Cassandra DATE: 4-byte big-endian unsigned int with Integer.MIN_VALUE offset
238                let stored = u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
239                let days_since_epoch = stored.wrapping_add(i32::MIN as u32) as i32;
240                Ok(Value::Date(days_since_epoch))
241            }
242            // For complex types, we need more sophisticated parsing
243            // For now, treat them as blobs to avoid breaking existing functionality
244            ComparatorType::List(_)
245            | ComparatorType::Set(_)
246            | ComparatorType::Map(_, _)
247            | ComparatorType::Tuple(_)
248            | ComparatorType::Udt { .. }
249            | ComparatorType::Frozen(_)
250            | ComparatorType::Custom(_)
251            | ComparatorType::Varint
252            | ComparatorType::Decimal
253            | ComparatorType::Duration
254            | ComparatorType::Json => {
255                log::warn!(
256                    "Complex type {} in partition key - using blob fallback",
257                    comparator.type_name()
258                );
259                Ok(Value::Blob(bytes.to_vec()))
260            }
261        }
262    }
263
264    /// Compute a simple hash-based digest (fallback for when schema is unavailable)
265    ///
266    /// This method provides compatibility with the existing implementation
267    /// when full schema information is not available.
268    pub fn compute_simple_digest(&self, partition_key: &[u8]) -> Result<Vec<u8>> {
269        // Use Murmur3 instead of DefaultHasher for better Cassandra compatibility
270        let mut cursor = Cursor::new(partition_key);
271        let hash = murmur3_32(&mut cursor, 0)
272            .map_err(|e| Error::corruption(format!("Failed to compute Murmur3 hash: {}", e)))?;
273
274        Ok(hash.to_le_bytes().to_vec())
275    }
276}
277
278impl Default for KeyDigestComputer {
279    fn default() -> Self {
280        Self::new()
281    }
282}
283
284#[cfg(test)]
285mod tests {
286    use super::*;
287    use crate::schema::{KeyColumn, TableSchema};
288    use std::collections::HashMap;
289
290    fn create_test_parsing_context(partition_comparators: Vec<ComparatorType>) -> ParsingContext {
291        let schema = TableSchema {
292            keyspace: "test".to_string(),
293            table: "table".to_string(),
294            partition_keys: vec![KeyColumn {
295                name: "pk".to_string(),
296                data_type: "int".to_string(),
297                position: 0,
298            }],
299            clustering_keys: vec![],
300            columns: vec![],
301            comments: HashMap::new(),
302        };
303
304        ParsingContext {
305            schema,
306            partition_comparators,
307            clustering_comparators: vec![],
308            column_comparators: HashMap::new(),
309        }
310    }
311
312    #[test]
313    fn test_single_component_int_key() {
314        let mut computer = KeyDigestComputer::new();
315        let context = create_test_parsing_context(vec![ComparatorType::Int]);
316
317        // Create a 4-byte big-endian integer (value: 42)
318        let key_bytes = [0x00, 0x00, 0x00, 0x2A]; // 42 in big-endian
319
320        let digest = computer
321            .compute_partition_key_digest(&key_bytes, &context)
322            .unwrap();
323
324        // Digest should be 4 bytes (32-bit Murmur3 hash)
325        assert_eq!(digest.len(), 4);
326
327        // Test deterministic - same input should produce same digest
328        let digest2 = computer
329            .compute_partition_key_digest(&key_bytes, &context)
330            .unwrap();
331        assert_eq!(digest, digest2);
332    }
333
334    #[test]
335    fn test_single_component_text_key() {
336        let mut computer = KeyDigestComputer::new();
337        let context = create_test_parsing_context(vec![ComparatorType::Text]);
338
339        let key_bytes = b"hello";
340
341        let digest = computer
342            .compute_partition_key_digest(key_bytes, &context)
343            .unwrap();
344
345        // Digest should be 4 bytes (32-bit Murmur3 hash)
346        assert_eq!(digest.len(), 4);
347    }
348
349    #[test]
350    fn test_multi_component_key() {
351        let mut computer = KeyDigestComputer::new();
352        let context = create_test_parsing_context(vec![ComparatorType::Int, ComparatorType::Text]);
353
354        // Multi-component key: int(42) + text("hello")
355        // Format: [len1(2 bytes)][int_bytes(4 bytes)][0x00][len2(2 bytes)][text_bytes(5 bytes)][0x00]
356        let mut key_bytes = Vec::new();
357        key_bytes.extend_from_slice(&[0x00, 0x04]); // length of int (4 bytes)
358        key_bytes.extend_from_slice(&[0x00, 0x00, 0x00, 0x2A]); // int value 42
359        key_bytes.push(0x00); // separator
360        key_bytes.extend_from_slice(&[0x00, 0x05]); // length of text (5 bytes)
361        key_bytes.extend_from_slice(b"hello"); // text value
362        key_bytes.push(0x00); // separator
363
364        let digest = computer
365            .compute_partition_key_digest(&key_bytes, &context)
366            .unwrap();
367
368        // Digest should be 4 bytes (32-bit Murmur3 hash)
369        assert_eq!(digest.len(), 4);
370    }
371
372    #[test]
373    fn test_multi_component_key_rejects_missing_final_separator() {
374        let mut computer = KeyDigestComputer::new();
375        let context = create_test_parsing_context(vec![ComparatorType::Int, ComparatorType::Text]);
376
377        let mut key_bytes = Vec::new();
378        key_bytes.extend_from_slice(&[0x00, 0x04]);
379        key_bytes.extend_from_slice(&[0x00, 0x00, 0x00, 0x2A]);
380        key_bytes.push(0x00);
381        key_bytes.extend_from_slice(&[0x00, 0x05]);
382        key_bytes.extend_from_slice(b"hello");
383
384        let err = computer
385            .compute_partition_key_digest(&key_bytes, &context)
386            .expect_err("missing final separator must be rejected");
387
388        assert!(
389            err.to_string().contains("Missing end-of-component marker"),
390            "unexpected error: {err}"
391        );
392    }
393
394    #[test]
395    fn test_simple_digest_fallback() -> Result<()> {
396        let computer = KeyDigestComputer::new();
397        let key_bytes = b"test_key";
398
399        let digest = computer.compute_simple_digest(key_bytes)?;
400
401        // Digest should be 4 bytes (32-bit Murmur3 hash)
402        assert_eq!(digest.len(), 4);
403
404        // Test deterministic
405        let digest2 = computer.compute_simple_digest(key_bytes)?;
406        assert_eq!(digest, digest2);
407        Ok(())
408    }
409
410    #[test]
411    fn test_byte_ordering_equivalence() {
412        let mut computer = KeyDigestComputer::new();
413        let context = create_test_parsing_context(vec![ComparatorType::Int]);
414
415        // Test that smaller values produce smaller digests when possible
416        let key1_bytes = [0x00, 0x00, 0x00, 0x01]; // 1
417        let key2_bytes = [0x00, 0x00, 0x00, 0x02]; // 2
418
419        let digest1 = computer
420            .compute_partition_key_digest(&key1_bytes, &context)
421            .unwrap();
422        let digest2 = computer
423            .compute_partition_key_digest(&key2_bytes, &context)
424            .unwrap();
425
426        // While hash ordering may not match value ordering, digests should be different
427        assert_ne!(digest1, digest2);
428    }
429}