cqlite_core/storage/sstable/
key_digest.rs1use crate::error::{Error, Result};
11use crate::schema::registry::ParsingContext;
12use crate::storage::sstable::bti::encoder::ByteComparableEncoder;
13use crate::types::{ComparatorType, Value};
14use murmur3::murmur3_32;
15use std::io::Cursor;
16
17pub struct KeyDigestComputer {
22 encoder: ByteComparableEncoder,
23}
24
25impl KeyDigestComputer {
26 pub fn new() -> Self {
28 Self {
29 encoder: ByteComparableEncoder::new(),
30 }
31 }
32
33 pub fn compute_partition_key_digest(
42 &mut self,
43 partition_key_bytes: &[u8],
44 parsing_context: &ParsingContext,
45 ) -> Result<Vec<u8>> {
46 let partition_values = self.parse_partition_key_bytes(
48 partition_key_bytes,
49 &parsing_context.partition_comparators,
50 )?;
51
52 let byte_comparable_key = self.encoder.encode_composite_key(&partition_values)?;
54
55 let mut cursor = Cursor::new(&byte_comparable_key);
57 let hash = murmur3_32(&mut cursor, 0)
58 .map_err(|e| Error::corruption(format!("Failed to compute Murmur3 hash: {}", e)))?;
59
60 Ok(hash.to_le_bytes().to_vec())
62 }
63
64 fn parse_partition_key_bytes(
69 &self,
70 key_bytes: &[u8],
71 partition_comparators: &[ComparatorType],
72 ) -> Result<Vec<Value>> {
73 if partition_comparators.is_empty() {
74 return Err(Error::Schema(
75 "No partition key comparators provided".to_string(),
76 ));
77 }
78
79 if partition_comparators.len() == 1 {
81 let value = self.parse_value_bytes(key_bytes, &partition_comparators[0])?;
82 return Ok(vec![value]);
83 }
84
85 let mut values = Vec::new();
90 let mut offset = 0;
91
92 for (index, comparator) in partition_comparators.iter().enumerate() {
93 if offset >= key_bytes.len() {
94 return Err(Error::corruption(
95 "Insufficient bytes for multi-component partition key".to_string(),
96 ));
97 }
98
99 if offset + 2 > key_bytes.len() {
101 return Err(Error::corruption(
102 "Invalid component length in partition key".to_string(),
103 ));
104 }
105
106 let component_len =
107 u16::from_be_bytes([key_bytes[offset], key_bytes[offset + 1]]) as usize;
108 offset += 2;
109
110 if offset + component_len > key_bytes.len() {
112 return Err(Error::corruption(
113 "Component length exceeds available bytes".to_string(),
114 ));
115 }
116
117 let component_bytes = &key_bytes[offset..offset + component_len];
118 let value = self.parse_value_bytes(component_bytes, comparator)?;
119 values.push(value);
120 offset += component_len;
121
122 if offset >= key_bytes.len() {
123 return Err(Error::corruption(
124 "Missing end-of-component marker in multi-component partition key".to_string(),
125 ));
126 }
127 if key_bytes[offset] != 0x00 {
128 return Err(Error::corruption(
129 "Invalid end-of-component marker in multi-component partition key".to_string(),
130 ));
131 }
132 offset += 1;
133
134 if index + 1 == partition_comparators.len() && offset != key_bytes.len() {
135 return Err(Error::corruption(
136 "Unexpected trailing bytes in multi-component partition key".to_string(),
137 ));
138 }
139 }
140
141 Ok(values)
142 }
143
144 fn parse_value_bytes(&self, bytes: &[u8], comparator: &ComparatorType) -> Result<Value> {
146 match comparator {
147 ComparatorType::Boolean => {
148 if bytes.len() != 1 {
149 return Err(Error::corruption("Invalid boolean bytes".to_string()));
150 }
151 Ok(Value::Boolean(bytes[0] != 0))
152 }
153 ComparatorType::TinyInt => {
154 if bytes.len() != 1 {
155 return Err(Error::corruption("Invalid tinyint bytes".to_string()));
156 }
157 Ok(Value::TinyInt(bytes[0] as i8))
158 }
159 ComparatorType::SmallInt => {
160 if bytes.len() != 2 {
161 return Err(Error::corruption("Invalid smallint bytes".to_string()));
162 }
163 let value = i16::from_be_bytes([bytes[0], bytes[1]]);
164 Ok(Value::SmallInt(value))
165 }
166 ComparatorType::Int => {
167 if bytes.len() != 4 {
168 return Err(Error::corruption("Invalid int bytes".to_string()));
169 }
170 let value = i32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
171 Ok(Value::Integer(value))
172 }
173 ComparatorType::BigInt => {
174 if bytes.len() != 8 {
175 return Err(Error::corruption("Invalid bigint bytes".to_string()));
176 }
177 let value = i64::from_be_bytes([
178 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
179 ]);
180 Ok(Value::BigInt(value))
181 }
182 ComparatorType::Counter => {
183 if bytes.len() != 8 {
184 return Err(Error::corruption("Invalid counter bytes".to_string()));
185 }
186 let value = i64::from_be_bytes([
187 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
188 ]);
189 Ok(Value::Counter(value))
190 }
191 ComparatorType::Float32 => {
192 if bytes.len() != 4 {
193 return Err(Error::corruption("Invalid float32 bytes".to_string()));
194 }
195 let bits = u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
196 let value = f32::from_bits(bits);
197 Ok(Value::Float32(value))
198 }
199 ComparatorType::Float => {
200 if bytes.len() != 8 {
201 return Err(Error::corruption("Invalid float bytes".to_string()));
202 }
203 let bits = u64::from_be_bytes([
204 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
205 ]);
206 let value = f64::from_bits(bits);
207 Ok(Value::Float(value))
208 }
209 ComparatorType::Text => {
210 let text = String::from_utf8(bytes.to_vec())
211 .map_err(|e| Error::corruption(format!("Invalid UTF-8 in text: {}", e)))?;
212 Ok(Value::Text(text))
213 }
214 ComparatorType::Blob => Ok(Value::Blob(bytes.to_vec())),
215 ComparatorType::Timestamp => {
216 if bytes.len() != 8 {
217 return Err(Error::corruption("Invalid timestamp bytes".to_string()));
218 }
219 let millis = i64::from_be_bytes([
220 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
221 ]);
222 Ok(Value::Timestamp(millis))
223 }
224 ComparatorType::Uuid => {
225 if bytes.len() != 16 {
226 return Err(Error::corruption("Invalid UUID bytes".to_string()));
227 }
228 let uuid_bytes: [u8; 16] = bytes
229 .try_into()
230 .map_err(|_| Error::invalid_format("Invalid UUID byte length"))?;
231 Ok(Value::Uuid(uuid_bytes))
232 }
233 ComparatorType::Date => {
234 if bytes.len() != 4 {
235 return Err(Error::corruption("Invalid date bytes".to_string()));
236 }
237 let stored = u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
239 let days_since_epoch = stored.wrapping_add(i32::MIN as u32) as i32;
240 Ok(Value::Date(days_since_epoch))
241 }
242 ComparatorType::List(_)
245 | ComparatorType::Set(_)
246 | ComparatorType::Map(_, _)
247 | ComparatorType::Tuple(_)
248 | ComparatorType::Udt { .. }
249 | ComparatorType::Frozen(_)
250 | ComparatorType::Custom(_)
251 | ComparatorType::Varint
252 | ComparatorType::Decimal
253 | ComparatorType::Duration
254 | ComparatorType::Json => {
255 log::warn!(
256 "Complex type {} in partition key - using blob fallback",
257 comparator.type_name()
258 );
259 Ok(Value::Blob(bytes.to_vec()))
260 }
261 }
262 }
263
264 pub fn compute_simple_digest(&self, partition_key: &[u8]) -> Result<Vec<u8>> {
269 let mut cursor = Cursor::new(partition_key);
271 let hash = murmur3_32(&mut cursor, 0)
272 .map_err(|e| Error::corruption(format!("Failed to compute Murmur3 hash: {}", e)))?;
273
274 Ok(hash.to_le_bytes().to_vec())
275 }
276}
277
278impl Default for KeyDigestComputer {
279 fn default() -> Self {
280 Self::new()
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use super::*;
287 use crate::schema::{KeyColumn, TableSchema};
288 use std::collections::HashMap;
289
290 fn create_test_parsing_context(partition_comparators: Vec<ComparatorType>) -> ParsingContext {
291 let schema = TableSchema {
292 keyspace: "test".to_string(),
293 table: "table".to_string(),
294 partition_keys: vec![KeyColumn {
295 name: "pk".to_string(),
296 data_type: "int".to_string(),
297 position: 0,
298 }],
299 clustering_keys: vec![],
300 columns: vec![],
301 comments: HashMap::new(),
302 };
303
304 ParsingContext {
305 schema,
306 partition_comparators,
307 clustering_comparators: vec![],
308 column_comparators: HashMap::new(),
309 }
310 }
311
312 #[test]
313 fn test_single_component_int_key() {
314 let mut computer = KeyDigestComputer::new();
315 let context = create_test_parsing_context(vec![ComparatorType::Int]);
316
317 let key_bytes = [0x00, 0x00, 0x00, 0x2A]; let digest = computer
321 .compute_partition_key_digest(&key_bytes, &context)
322 .unwrap();
323
324 assert_eq!(digest.len(), 4);
326
327 let digest2 = computer
329 .compute_partition_key_digest(&key_bytes, &context)
330 .unwrap();
331 assert_eq!(digest, digest2);
332 }
333
334 #[test]
335 fn test_single_component_text_key() {
336 let mut computer = KeyDigestComputer::new();
337 let context = create_test_parsing_context(vec![ComparatorType::Text]);
338
339 let key_bytes = b"hello";
340
341 let digest = computer
342 .compute_partition_key_digest(key_bytes, &context)
343 .unwrap();
344
345 assert_eq!(digest.len(), 4);
347 }
348
349 #[test]
350 fn test_multi_component_key() {
351 let mut computer = KeyDigestComputer::new();
352 let context = create_test_parsing_context(vec![ComparatorType::Int, ComparatorType::Text]);
353
354 let mut key_bytes = Vec::new();
357 key_bytes.extend_from_slice(&[0x00, 0x04]); key_bytes.extend_from_slice(&[0x00, 0x00, 0x00, 0x2A]); key_bytes.push(0x00); key_bytes.extend_from_slice(&[0x00, 0x05]); key_bytes.extend_from_slice(b"hello"); key_bytes.push(0x00); let digest = computer
365 .compute_partition_key_digest(&key_bytes, &context)
366 .unwrap();
367
368 assert_eq!(digest.len(), 4);
370 }
371
372 #[test]
373 fn test_multi_component_key_rejects_missing_final_separator() {
374 let mut computer = KeyDigestComputer::new();
375 let context = create_test_parsing_context(vec![ComparatorType::Int, ComparatorType::Text]);
376
377 let mut key_bytes = Vec::new();
378 key_bytes.extend_from_slice(&[0x00, 0x04]);
379 key_bytes.extend_from_slice(&[0x00, 0x00, 0x00, 0x2A]);
380 key_bytes.push(0x00);
381 key_bytes.extend_from_slice(&[0x00, 0x05]);
382 key_bytes.extend_from_slice(b"hello");
383
384 let err = computer
385 .compute_partition_key_digest(&key_bytes, &context)
386 .expect_err("missing final separator must be rejected");
387
388 assert!(
389 err.to_string().contains("Missing end-of-component marker"),
390 "unexpected error: {err}"
391 );
392 }
393
394 #[test]
395 fn test_simple_digest_fallback() -> Result<()> {
396 let computer = KeyDigestComputer::new();
397 let key_bytes = b"test_key";
398
399 let digest = computer.compute_simple_digest(key_bytes)?;
400
401 assert_eq!(digest.len(), 4);
403
404 let digest2 = computer.compute_simple_digest(key_bytes)?;
406 assert_eq!(digest, digest2);
407 Ok(())
408 }
409
410 #[test]
411 fn test_byte_ordering_equivalence() {
412 let mut computer = KeyDigestComputer::new();
413 let context = create_test_parsing_context(vec![ComparatorType::Int]);
414
415 let key1_bytes = [0x00, 0x00, 0x00, 0x01]; let key2_bytes = [0x00, 0x00, 0x00, 0x02]; let digest1 = computer
420 .compute_partition_key_digest(&key1_bytes, &context)
421 .unwrap();
422 let digest2 = computer
423 .compute_partition_key_digest(&key2_bytes, &context)
424 .unwrap();
425
426 assert_ne!(digest1, digest2);
428 }
429}