Skip to main content

cqlite_core/storage/sstable/bti/
encoder.rs

1//! CEP-25 Compliant Byte-comparable key encoding for BTI format
2//!
3//! Converts CQL keys to byte sequences where lexicographic comparison
4//! of unsigned bytes produces the same result as typed comparison.
5//!
6//! This implementation follows the CEP-25 specification for Cassandra 5.0
7//! byte-comparable key encoding used in trie-indexed SSTables.
8//!
9//! Key Requirements:
10//! - Lexicographic byte comparison must match typed comparison
11//! - Support for all Cassandra 5.0 data types  
12//! - Proper null value handling with type prefixes
13//! - Variable-length encoding for efficiency
14//! - Nested type support (collections, UDTs, tuples)
15//! - Escape sequences for proper ordering
16
17use super::BtiError;
18use crate::error::Result;
19use crate::types::{UdtValue, Value};
20
21/// CEP-25 compliant byte-comparable key encoder
22///
23/// Implements the byte-comparable key encoding as specified in CEP-25.
24/// The encoding ensures that lexicographic comparison of the encoded bytes
25/// produces the same ordering as the typed comparison of the original values.
26pub struct ByteComparableEncoder {
27    /// Buffer for building encoded keys
28    buffer: Vec<u8>,
29    /// Configuration for encoding behavior
30    config: EncoderConfig,
31}
32
33/// Configuration for the byte-comparable encoder
34#[derive(Debug, Clone)]
35pub struct EncoderConfig {
36    /// Enable variable-length integer encoding
37    pub use_varint_encoding: bool,
38    /// Maximum depth for nested types to prevent infinite recursion
39    pub max_nesting_depth: usize,
40    /// Enable prefix compression for collections
41    pub enable_prefix_compression: bool,
42    /// Strict CEP-25 compliance mode
43    pub strict_compliance: bool,
44}
45
46impl Default for EncoderConfig {
47    fn default() -> Self {
48        Self {
49            use_varint_encoding: true,
50            max_nesting_depth: 32,
51            enable_prefix_compression: true,
52            strict_compliance: true,
53        }
54    }
55}
56
57/// Type prefixes for byte-comparable encoding as per CEP-25
58mod type_prefixes {
59    pub const NULL: u8 = 0x00;
60    pub const BOOLEAN_FALSE: u8 = 0x01;
61    pub const BOOLEAN_TRUE: u8 = 0x02;
62    pub const TINYINT: u8 = 0x10;
63    pub const SMALLINT: u8 = 0x11;
64    pub const INTEGER: u8 = 0x12;
65    pub const BIGINT: u8 = 0x13;
66    pub const FLOAT: u8 = 0x20;
67    pub const DOUBLE: u8 = 0x21;
68    #[allow(dead_code)]
69    pub const DECIMAL: u8 = 0x22;
70    #[allow(dead_code)]
71    pub const VARINT: u8 = 0x23;
72    pub const TEXT: u8 = 0x30;
73    pub const BLOB: u8 = 0x31;
74    pub const UUID: u8 = 0x40;
75    pub const TIMESTAMP: u8 = 0x41;
76    #[allow(dead_code)]
77    pub const DATE: u8 = 0x42;
78    #[allow(dead_code)]
79    pub const TIME: u8 = 0x43;
80    #[allow(dead_code)]
81    pub const DURATION: u8 = 0x44;
82    pub const LIST: u8 = 0x50;
83    pub const SET: u8 = 0x51;
84    pub const MAP: u8 = 0x52;
85    pub const TUPLE: u8 = 0x60;
86    pub const UDT: u8 = 0x61;
87    pub const FROZEN: u8 = 0x70;
88    #[allow(dead_code)]
89    pub const TOMBSTONE: u8 = 0x80;
90    #[allow(dead_code)]
91    pub const ESCAPE: u8 = 0xFF;
92    pub const SEPARATOR: u8 = 0x00;
93    pub const TERMINATOR: u8 = 0x01;
94}
95
96/// Escape sequences for special bytes in values
97mod escape_sequences {
98    #[allow(dead_code)]
99    pub const ESCAPE_BYTE: u8 = 0xFF;
100    #[allow(dead_code)]
101    pub const ESCAPED_NULL: &[u8] = &[0xFF, 0x00];
102    #[allow(dead_code)]
103    pub const ESCAPED_ESCAPE: &[u8] = &[0xFF, 0xFF];
104    #[allow(dead_code)]
105    pub const ESCAPED_SEPARATOR: &[u8] = &[0xFF, 0x01];
106}
107
108impl Default for ByteComparableEncoder {
109    fn default() -> Self {
110        Self::new()
111    }
112}
113
114impl ByteComparableEncoder {
115    /// Create new encoder with default configuration
116    pub fn new() -> Self {
117        Self {
118            buffer: Vec::new(),
119            config: EncoderConfig::default(),
120        }
121    }
122
123    /// Create new encoder with custom configuration
124    pub fn with_config(config: EncoderConfig) -> Self {
125        Self {
126            buffer: Vec::new(),
127            config,
128        }
129    }
130
131    /// Get current configuration
132    pub fn config(&self) -> &EncoderConfig {
133        &self.config
134    }
135
136    /// Set new configuration
137    pub fn set_config(&mut self, config: EncoderConfig) {
138        self.config = config;
139    }
140
141    /// Encode a single value to byte-comparable format
142    pub fn encode_value(&mut self, value: &Value) -> Result<Vec<u8>> {
143        self.buffer.clear();
144        self.encode_value_to_buffer(value)?;
145        Ok(self.buffer.clone())
146    }
147
148    /// Encode a composite key (multiple values) to byte-comparable format
149    pub fn encode_composite_key(&mut self, values: &[Value]) -> Result<Vec<u8>> {
150        self.buffer.clear();
151
152        for (i, value) in values.iter().enumerate() {
153            if i > 0 {
154                // Add separator byte between key components
155                self.buffer.push(0x00);
156            }
157            self.encode_value_to_buffer(value)?;
158        }
159
160        Ok(self.buffer.clone())
161    }
162
163    /// Encode value directly to internal buffer with depth tracking
164    fn encode_value_to_buffer(&mut self, value: &Value) -> Result<()> {
165        self.encode_value_to_buffer_with_depth(value, 0)
166    }
167
168    /// Encode value with nesting depth tracking
169    fn encode_value_to_buffer_with_depth(&mut self, value: &Value, depth: usize) -> Result<()> {
170        if depth > self.config.max_nesting_depth {
171            return Err(BtiError::InvalidByteComparableKey(format!(
172                "Maximum nesting depth {} exceeded",
173                self.config.max_nesting_depth
174            ))
175            .into());
176        }
177
178        match value {
179            Value::Null => self.encode_null(),
180            Value::Boolean(b) => self.encode_boolean(*b),
181            Value::TinyInt(i) => self.encode_tinyint(*i),
182            Value::SmallInt(i) => self.encode_smallint(*i),
183            Value::Integer(i) => self.encode_int(*i),
184            Value::BigInt(i) => self.encode_bigint(*i),
185            Value::Counter(c) => self.encode_bigint(*c), // Counter encoded as bigint
186            Value::Float32(f) => self.encode_float32(*f),
187            Value::Float(f) => self.encode_double(*f),
188            Value::Text(s) => self.encode_text(s),
189            Value::Blob(bytes) => self.encode_blob(bytes),
190            Value::Uuid(uuid) => self.encode_uuid_bytes(uuid),
191            Value::Timestamp(ts) => self.encode_timestamp(*ts),
192            Value::Json(json) => self.encode_json(json),
193            Value::List(items) => self.encode_list_with_depth(items, depth + 1),
194            Value::Set(items) => self.encode_set_with_depth(items, depth + 1),
195            Value::Map(map) => self.encode_map_with_depth(map, depth + 1),
196            Value::Tuple(items) => self.encode_tuple_with_depth(items, depth + 1),
197            Value::Udt(udt) => self.encode_udt_with_depth(udt, depth + 1),
198            Value::Frozen(inner) => {
199                self.buffer.push(type_prefixes::FROZEN);
200                self.encode_value_to_buffer_with_depth(inner, depth + 1)
201            }
202            Value::Varint(data) => {
203                // For BTI encoding, treat varint as blob since we don't have the original value
204                self.encode_blob(data)
205            }
206            Value::Decimal { scale: _, unscaled } => self.encode_blob(unscaled), // Treat decimal as blob for BTI
207            Value::Duration {
208                months,
209                days,
210                nanos,
211            } => {
212                // Encode duration as a composite key
213                self.buffer.push(type_prefixes::DURATION);
214                self.encode_int(*months)?;
215                self.encode_int(*days)?;
216                self.encode_bigint(*nanos)?;
217                Ok(())
218            }
219            Value::Tombstone(_) => {
220                // Tombstones are encoded as null with special marker
221                self.buffer.push(type_prefixes::NULL);
222                self.buffer.push(0xFF); // Special tombstone marker
223                Ok(())
224            }
225            Value::Date(d) => {
226                self.buffer.push(type_prefixes::DATE);
227                self.encode_int(*d)
228            }
229            Value::Time(t) => {
230                self.buffer.push(type_prefixes::TIME);
231                self.encode_bigint(*t)
232            }
233            Value::Inet(bytes) => self.encode_blob(bytes),
234        }
235    }
236
237    /// Encode null value with proper type prefix
238    fn encode_null(&mut self) -> Result<()> {
239        self.buffer.push(type_prefixes::NULL);
240        Ok(())
241    }
242
243    /// Encode text/varchar with proper UTF-8 ordering and escape sequences
244    fn encode_text(&mut self, text: &str) -> Result<()> {
245        self.buffer.push(type_prefixes::TEXT);
246
247        // Encode UTF-8 bytes with proper escaping
248        for &byte in text.as_bytes() {
249            match byte {
250                0x00 => self
251                    .buffer
252                    .extend_from_slice(escape_sequences::ESCAPED_NULL),
253                0xFF => self
254                    .buffer
255                    .extend_from_slice(escape_sequences::ESCAPED_ESCAPE),
256                _ => self.buffer.push(byte),
257            }
258        }
259
260        // Add terminator for proper ordering
261        self.buffer.push(type_prefixes::TERMINATOR);
262        Ok(())
263    }
264
265    /// Encode JSON as escaped text
266    fn encode_json(&mut self, json: &serde_json::Value) -> Result<()> {
267        let json_str = json.to_string();
268        self.encode_text(&json_str)
269    }
270
271    /// Encode tinyint (i8) with proper ordering
272    fn encode_tinyint(&mut self, value: i8) -> Result<()> {
273        self.buffer.push(type_prefixes::TINYINT);
274        // Transform to unsigned for proper lexicographic ordering
275        let unsigned = (value as i16 + 128) as u8;
276        self.buffer.push(unsigned);
277        Ok(())
278    }
279
280    /// Encode smallint (i16) with proper ordering
281    fn encode_smallint(&mut self, value: i16) -> Result<()> {
282        self.buffer.push(type_prefixes::SMALLINT);
283        // Transform to unsigned for proper lexicographic ordering
284        let unsigned = (value as i32 + 32768) as u16;
285        self.buffer.extend_from_slice(&unsigned.to_be_bytes());
286        Ok(())
287    }
288
289    /// Encode integer (i32) with sign-magnitude encoding
290    fn encode_int(&mut self, value: i32) -> Result<()> {
291        self.buffer.push(type_prefixes::INTEGER);
292
293        // Use two's complement transformation for proper ordering
294        // Transform signed to unsigned preserving order: flip sign bit
295        let unsigned = (value as u32) ^ 0x8000_0000;
296
297        self.buffer.extend_from_slice(&unsigned.to_be_bytes());
298        Ok(())
299    }
300
301    /// Encode bigint (i64) with sign-magnitude encoding
302    fn encode_bigint(&mut self, value: i64) -> Result<()> {
303        self.buffer.push(type_prefixes::BIGINT);
304
305        // Use two's complement transformation for proper ordering
306        let unsigned = if value >= 0 {
307            (value as u64) + 0x8000_0000_0000_0000
308        } else {
309            (value as u64) ^ 0xFFFF_FFFF_FFFF_FFFF
310        };
311
312        self.buffer.extend_from_slice(&unsigned.to_be_bytes());
313        Ok(())
314    }
315
316    /// Encode UUID bytes with proper byte ordering
317    fn encode_uuid_bytes(&mut self, uuid: &[u8; 16]) -> Result<()> {
318        self.buffer.push(type_prefixes::UUID);
319        // UUID bytes in network byte order are naturally comparable
320        self.buffer.extend_from_slice(uuid);
321        Ok(())
322    }
323
324    /// Encode timestamp (microseconds since epoch)
325    fn encode_timestamp(&mut self, timestamp: i64) -> Result<()> {
326        self.buffer.push(type_prefixes::TIMESTAMP);
327
328        // Transform for proper ordering (timestamps can be negative)
329        let unsigned = if timestamp >= 0 {
330            (timestamp as u64) + 0x8000_0000_0000_0000
331        } else {
332            (timestamp as u64) ^ 0xFFFF_FFFF_FFFF_FFFF
333        };
334
335        self.buffer.extend_from_slice(&unsigned.to_be_bytes());
336        Ok(())
337    }
338
339    /// Encode boolean with proper type prefixes
340    fn encode_boolean(&mut self, value: bool) -> Result<()> {
341        if value {
342            self.buffer.push(type_prefixes::BOOLEAN_TRUE);
343        } else {
344            self.buffer.push(type_prefixes::BOOLEAN_FALSE);
345        }
346        Ok(())
347    }
348
349    /// Encode float32 with IEEE 754 ordering adjustment
350    fn encode_float32(&mut self, value: f32) -> Result<()> {
351        self.buffer.push(type_prefixes::FLOAT);
352
353        // Handle special values first
354        if value.is_nan() {
355            // NaN sorts after all other values
356            self.buffer.extend_from_slice(&[0xFF, 0xFF, 0xFF, 0xFF]);
357            return Ok(());
358        }
359
360        let bits = value.to_bits();
361
362        // Adjust for proper ordering of IEEE 754 floats
363        let adjusted = if (bits & 0x8000_0000) == 0 {
364            // Positive: add sign bit offset
365            bits | 0x8000_0000
366        } else {
367            // Negative: flip all bits
368            !bits
369        };
370
371        self.buffer.extend_from_slice(&adjusted.to_be_bytes());
372        Ok(())
373    }
374
375    /// Encode double (f64) with IEEE 754 ordering adjustment
376    fn encode_double(&mut self, value: f64) -> Result<()> {
377        self.buffer.push(type_prefixes::DOUBLE);
378
379        // Handle special values first
380        if value.is_nan() {
381            // NaN sorts after all other values
382            self.buffer
383                .extend_from_slice(&[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF]);
384            return Ok(());
385        }
386
387        let bits = value.to_bits();
388
389        // Adjust for proper ordering of IEEE 754 floats
390        let adjusted = if (bits & 0x8000_0000_0000_0000) == 0 {
391            // Positive: add sign bit offset
392            bits | 0x8000_0000_0000_0000
393        } else {
394            // Negative: flip all bits
395            !bits
396        };
397
398        self.buffer.extend_from_slice(&adjusted.to_be_bytes());
399        Ok(())
400    }
401
402    /// Encode blob (binary data) with proper escaping
403    fn encode_blob(&mut self, bytes: &[u8]) -> Result<()> {
404        self.buffer.push(type_prefixes::BLOB);
405
406        // Encode raw bytes with proper escaping
407        for &byte in bytes {
408            match byte {
409                0x00 => self
410                    .buffer
411                    .extend_from_slice(escape_sequences::ESCAPED_NULL),
412                0xFF => self
413                    .buffer
414                    .extend_from_slice(escape_sequences::ESCAPED_ESCAPE),
415                _ => self.buffer.push(byte),
416            }
417        }
418
419        // Add terminator
420        self.buffer.push(type_prefixes::TERMINATOR);
421        Ok(())
422    }
423
424    /// Encode list with element-by-element encoding and depth tracking
425    fn encode_list_with_depth(&mut self, items: &[Value], depth: usize) -> Result<()> {
426        self.buffer.push(type_prefixes::LIST);
427
428        // Encode length as varint if configured
429        if self.config.use_varint_encoding {
430            self.encode_varint(items.len() as u64)?;
431        } else {
432            self.buffer
433                .extend_from_slice(&(items.len() as u32).to_be_bytes());
434        }
435
436        // Encode each element with separator
437        for (i, item) in items.iter().enumerate() {
438            if i > 0 {
439                self.buffer.push(type_prefixes::SEPARATOR);
440            }
441            self.encode_value_to_buffer_with_depth(item, depth)?;
442        }
443
444        // Add terminator
445        self.buffer.push(type_prefixes::TERMINATOR);
446        Ok(())
447    }
448
449    /// Backward compatibility wrapper
450    #[allow(dead_code)]
451    fn encode_list(&mut self, items: &[Value]) -> Result<()> {
452        self.encode_list_with_depth(items, 1)
453    }
454
455    /// Encode set (sorted for deterministic ordering) with depth tracking
456    fn encode_set_with_depth(&mut self, items: &[Value], _depth: usize) -> Result<()> {
457        self.buffer.push(type_prefixes::SET);
458
459        // For byte-comparable encoding, we need to sort the encoded items
460        let mut encoded_items = Vec::new();
461
462        for item in items {
463            let mut encoder = ByteComparableEncoder::with_config(self.config.clone());
464            let encoded = encoder.encode_value(item)?;
465            encoded_items.push(encoded);
466        }
467
468        // Sort encoded items lexicographically for deterministic ordering
469        encoded_items.sort();
470
471        // Encode length
472        if self.config.use_varint_encoding {
473            self.encode_varint(encoded_items.len() as u64)?;
474        } else {
475            self.buffer
476                .extend_from_slice(&(encoded_items.len() as u32).to_be_bytes());
477        }
478
479        // Add sorted encoded items with separators
480        for (i, encoded_item) in encoded_items.iter().enumerate() {
481            if i > 0 {
482                self.buffer.push(type_prefixes::SEPARATOR);
483            }
484            self.buffer.extend_from_slice(encoded_item);
485        }
486
487        // Add terminator
488        self.buffer.push(type_prefixes::TERMINATOR);
489        Ok(())
490    }
491
492    /// Backward compatibility wrapper
493    #[allow(dead_code)]
494    fn encode_set(&mut self, items: &[Value]) -> Result<()> {
495        self.encode_set_with_depth(items, 1)
496    }
497
498    /// Encode map from Vec of tuples with sorted key-value pairs and depth tracking
499    fn encode_map_with_depth(&mut self, map: &Vec<(Value, Value)>, _depth: usize) -> Result<()> {
500        self.buffer.push(type_prefixes::MAP);
501
502        // Encode key-value pairs and sort by encoded keys
503        let mut encoded_pairs = Vec::new();
504
505        for (key, value) in map {
506            let mut key_encoder = ByteComparableEncoder::with_config(self.config.clone());
507            let encoded_key = key_encoder.encode_value(key)?;
508
509            let mut value_encoder = ByteComparableEncoder::with_config(self.config.clone());
510            let encoded_value = value_encoder.encode_value(value)?;
511
512            encoded_pairs.push((encoded_key, encoded_value));
513        }
514
515        // Sort by encoded keys for deterministic ordering
516        encoded_pairs.sort_by(|a, b| a.0.cmp(&b.0));
517
518        // Encode length
519        if self.config.use_varint_encoding {
520            self.encode_varint(encoded_pairs.len() as u64)?;
521        } else {
522            self.buffer
523                .extend_from_slice(&(encoded_pairs.len() as u32).to_be_bytes());
524        }
525
526        // Add sorted pairs with separators
527        for (i, (encoded_key, encoded_value)) in encoded_pairs.iter().enumerate() {
528            if i > 0 {
529                self.buffer.push(type_prefixes::SEPARATOR);
530            }
531
532            // Encode key-value pair
533            self.buffer.extend_from_slice(encoded_key);
534            self.buffer.push(type_prefixes::SEPARATOR);
535            self.buffer.extend_from_slice(encoded_value);
536        }
537
538        // Add terminator
539        self.buffer.push(type_prefixes::TERMINATOR);
540        Ok(())
541    }
542
543    /// Backward compatibility wrapper
544    #[allow(dead_code)]
545    fn encode_map_vec(&mut self, map: &Vec<(Value, Value)>) -> Result<()> {
546        self.encode_map_with_depth(map, 1)
547    }
548
549    /// Encode tuple with positional fields and depth tracking
550    fn encode_tuple_with_depth(&mut self, items: &[Value], depth: usize) -> Result<()> {
551        self.buffer.push(type_prefixes::TUPLE);
552
553        // Encode length
554        if self.config.use_varint_encoding {
555            self.encode_varint(items.len() as u64)?;
556        } else {
557            self.buffer
558                .extend_from_slice(&(items.len() as u32).to_be_bytes());
559        }
560
561        // Encode each field with separator (order is significant for tuples)
562        for (i, item) in items.iter().enumerate() {
563            if i > 0 {
564                self.buffer.push(type_prefixes::SEPARATOR);
565            }
566            self.encode_value_to_buffer_with_depth(item, depth)?;
567        }
568
569        // Add terminator
570        self.buffer.push(type_prefixes::TERMINATOR);
571        Ok(())
572    }
573
574    /// Encode UDT (User Defined Type) with field ordering and depth tracking
575    fn encode_udt_with_depth(&mut self, udt: &UdtValue, depth: usize) -> Result<()> {
576        self.buffer.push(type_prefixes::UDT);
577
578        // Encode type name and keyspace for disambiguation
579        self.encode_text(&udt.keyspace)?;
580        self.encode_text(&udt.type_name)?;
581
582        // Encode field count
583        if self.config.use_varint_encoding {
584            self.encode_varint(udt.fields.len() as u64)?;
585        } else {
586            self.buffer
587                .extend_from_slice(&(udt.fields.len() as u32).to_be_bytes());
588        }
589
590        // Encode fields in schema order (important for UDTs)
591        for (i, field) in udt.fields.iter().enumerate() {
592            if i > 0 {
593                self.buffer.push(type_prefixes::SEPARATOR);
594            }
595
596            // Encode field name
597            self.encode_text(&field.name)?;
598            self.buffer.push(type_prefixes::SEPARATOR);
599
600            // Encode field value (null if None)
601            match &field.value {
602                Some(value) => self.encode_value_to_buffer_with_depth(value, depth)?,
603                None => self.encode_null()?,
604            }
605        }
606
607        // Add terminator
608        self.buffer.push(type_prefixes::TERMINATOR);
609        Ok(())
610    }
611
612    /// Encode variable-length integer (varint)
613    fn encode_varint(&mut self, mut value: u64) -> Result<()> {
614        while value >= 0x80 {
615            self.buffer.push((value & 0xFF) as u8 | 0x80);
616            value >>= 7;
617        }
618        self.buffer.push(value as u8);
619        Ok(())
620    }
621}
622
623/// Byte-comparable key decoder (for debugging/testing)
624pub struct ByteComparableDecoder;
625
626impl ByteComparableDecoder {
627    /// Decode a byte-comparable key back to readable format (best effort)
628    pub fn decode_key_debug(encoded: &[u8]) -> String {
629        if encoded.is_empty() {
630            return "<empty>".to_string();
631        }
632
633        // Simple hex representation for debugging
634        let hex: String = encoded
635            .iter()
636            .map(|b| format!("{:02x}", b))
637            .collect::<Vec<_>>()
638            .join(" ");
639
640        // Try to detect if it looks like text (allow null bytes)
641        if let Ok(text) = std::str::from_utf8(encoded) {
642            // Check if most characters are printable (allow some control chars like null)
643            let printable_count = text
644                .chars()
645                .filter(|c| c.is_ascii_graphic() || c.is_ascii_whitespace())
646                .count();
647            let total_count = text.chars().count();
648
649            if total_count > 0 && (printable_count as f32 / total_count as f32) >= 0.7 {
650                let clean_text = text.trim_end_matches('\0').trim();
651                if !clean_text.is_empty() {
652                    return format!("\"{}\" ({})", clean_text, hex);
653                }
654            }
655        }
656
657        // Try to extract text even from binary data that might contain readable strings
658        let mut text_parts = Vec::new();
659        let mut current_text = String::new();
660
661        for &byte in encoded {
662            if byte.is_ascii_graphic() || byte == b' ' {
663                current_text.push(byte as char);
664            } else if !current_text.is_empty() {
665                text_parts.push(current_text.clone());
666                current_text.clear();
667            }
668        }
669
670        if !current_text.is_empty() {
671            text_parts.push(current_text);
672        }
673
674        // If we found any readable text parts, include them
675        if !text_parts.is_empty() {
676            let text_content = text_parts.join(" ");
677            return format!("\"{}\" ({})", text_content, hex);
678        }
679
680        format!("0x{}", hex)
681    }
682}
683
684/// Batch encoder for efficient encoding of multiple values
685pub struct BatchEncoder {
686    encoder: ByteComparableEncoder,
687    batch_buffer: Vec<Vec<u8>>,
688}
689
690impl Default for BatchEncoder {
691    fn default() -> Self {
692        Self::new()
693    }
694}
695
696impl BatchEncoder {
697    /// Create new batch encoder
698    pub fn new() -> Self {
699        Self {
700            encoder: ByteComparableEncoder::new(),
701            batch_buffer: Vec::new(),
702        }
703    }
704
705    /// Encode a batch of values efficiently
706    pub fn encode_batch(&mut self, values: &[Value]) -> Result<Vec<Vec<u8>>> {
707        self.batch_buffer.clear();
708        self.batch_buffer.reserve(values.len());
709
710        for value in values {
711            let encoded = self.encoder.encode_value(value)?;
712            self.batch_buffer.push(encoded);
713        }
714
715        Ok(self.batch_buffer.clone())
716    }
717
718    /// Clear the batch buffer
719    pub fn clear(&mut self) {
720        self.batch_buffer.clear();
721    }
722}
723
724/// Performance statistics for the encoder
725#[derive(Debug, Clone, Default)]
726pub struct EncoderStats {
727    /// Current buffer capacity
728    pub buffer_capacity: usize,
729    /// Current buffer size
730    pub buffer_size: usize,
731    /// Number of encodings performed
732    pub encodings_performed: u64,
733    /// Total bytes encoded
734    pub total_bytes_encoded: u64,
735}
736
737impl ByteComparableEncoder {
738    /// Reserve capacity in the internal buffer
739    pub fn reserve(&mut self, additional: usize) {
740        self.buffer.reserve(additional);
741    }
742
743    /// Get performance statistics
744    pub fn get_stats(&self) -> EncoderStats {
745        EncoderStats {
746            buffer_capacity: self.buffer.capacity(),
747            buffer_size: self.buffer.len(),
748            encodings_performed: 0, // Would need to track this in practice
749            total_bytes_encoded: self.buffer.len() as u64,
750        }
751    }
752
753    /// Validate an encoded key for correctness
754    pub fn validate_encoded_key(&self, encoded: &[u8]) -> Result<()> {
755        if encoded.is_empty() {
756            return Err(BtiError::InvalidByteComparableKey("Empty encoded key".to_string()).into());
757        }
758
759        let type_prefix = encoded[0];
760
761        // Validate type prefix
762        match type_prefix {
763            type_prefixes::NULL => {
764                if encoded.len() > 2 {
765                    return Err(BtiError::InvalidByteComparableKey(
766                        "Null value too long".to_string(),
767                    )
768                    .into());
769                }
770            }
771            type_prefixes::BOOLEAN_FALSE | type_prefixes::BOOLEAN_TRUE => {
772                if encoded.len() != 1 {
773                    return Err(BtiError::InvalidByteComparableKey(
774                        "Boolean value should be exactly 1 byte".to_string(),
775                    )
776                    .into());
777                }
778            }
779            type_prefixes::TINYINT => {
780                if encoded.len() != 2 {
781                    return Err(BtiError::InvalidByteComparableKey(
782                        "TinyInt should be exactly 2 bytes".to_string(),
783                    )
784                    .into());
785                }
786            }
787            type_prefixes::SMALLINT => {
788                if encoded.len() != 3 {
789                    return Err(BtiError::InvalidByteComparableKey(
790                        "SmallInt should be exactly 3 bytes".to_string(),
791                    )
792                    .into());
793                }
794            }
795            type_prefixes::INTEGER => {
796                if encoded.len() != 5 {
797                    return Err(BtiError::InvalidByteComparableKey(
798                        "Integer should be exactly 5 bytes".to_string(),
799                    )
800                    .into());
801                }
802            }
803            type_prefixes::BIGINT | type_prefixes::TIMESTAMP => {
804                if encoded.len() != 9 {
805                    return Err(BtiError::InvalidByteComparableKey(
806                        "BigInt/Timestamp should be exactly 9 bytes".to_string(),
807                    )
808                    .into());
809                }
810            }
811            type_prefixes::FLOAT => {
812                if encoded.len() != 5 {
813                    return Err(BtiError::InvalidByteComparableKey(
814                        "Float should be exactly 5 bytes".to_string(),
815                    )
816                    .into());
817                }
818            }
819            type_prefixes::DOUBLE => {
820                if encoded.len() != 9 {
821                    return Err(BtiError::InvalidByteComparableKey(
822                        "Double should be exactly 9 bytes".to_string(),
823                    )
824                    .into());
825                }
826            }
827            type_prefixes::UUID => {
828                if encoded.len() != 17 {
829                    return Err(BtiError::InvalidByteComparableKey(
830                        "UUID should be exactly 17 bytes".to_string(),
831                    )
832                    .into());
833                }
834            }
835            type_prefixes::TEXT | type_prefixes::BLOB => {
836                if encoded.len() < 2 || encoded[encoded.len() - 1] != type_prefixes::TERMINATOR {
837                    return Err(BtiError::InvalidByteComparableKey(
838                        "Text/Blob should end with terminator".to_string(),
839                    )
840                    .into());
841                }
842            }
843            type_prefixes::LIST
844            | type_prefixes::SET
845            | type_prefixes::MAP
846            | type_prefixes::TUPLE
847            | type_prefixes::UDT => {
848                if encoded.len() < 2 || encoded[encoded.len() - 1] != type_prefixes::TERMINATOR {
849                    return Err(BtiError::InvalidByteComparableKey(
850                        "Collection/Complex type should end with terminator".to_string(),
851                    )
852                    .into());
853                }
854            }
855            _ => {
856                return Err(BtiError::InvalidByteComparableKey(format!(
857                    "Unknown type prefix: 0x{:02x}",
858                    type_prefix
859                ))
860                .into());
861            }
862        }
863
864        Ok(())
865    }
866}
867
868#[cfg(test)]
869mod tests {
870    use super::*;
871    use uuid::Uuid;
872
873    #[test]
874    fn test_text_encoding() {
875        let mut encoder = ByteComparableEncoder::new();
876
877        let encoded_a = encoder.encode_value(&Value::Text("a".to_string())).unwrap();
878        let encoded_b = encoder.encode_value(&Value::Text("b".to_string())).unwrap();
879        let encoded_aa = encoder
880            .encode_value(&Value::Text("aa".to_string()))
881            .unwrap();
882
883        // Lexicographic comparison should match string comparison
884        assert!(encoded_a < encoded_b);
885        assert!(encoded_a < encoded_aa);
886        assert!(encoded_aa < encoded_b);
887    }
888
889    #[test]
890    fn test_integer_encoding() {
891        let mut encoder = ByteComparableEncoder::new();
892
893        let encoded_neg = encoder.encode_value(&Value::Integer(-100)).unwrap();
894        let encoded_zero = encoder.encode_value(&Value::Integer(0)).unwrap();
895        let encoded_pos = encoder.encode_value(&Value::Integer(100)).unwrap();
896
897        // Proper numeric ordering
898        assert!(encoded_neg < encoded_zero);
899        assert!(encoded_zero < encoded_pos);
900    }
901
902    #[test]
903    fn test_boolean_encoding() {
904        let mut encoder = ByteComparableEncoder::new();
905
906        let encoded_false = encoder.encode_value(&Value::Boolean(false)).unwrap();
907        let encoded_true = encoder.encode_value(&Value::Boolean(true)).unwrap();
908
909        // false < true
910        assert!(encoded_false < encoded_true);
911    }
912
913    #[test]
914    fn test_uuid_encoding() {
915        let mut encoder = ByteComparableEncoder::new();
916
917        let uuid1 = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap();
918        let uuid2 = Uuid::parse_str("00000000-0000-0000-0000-000000000002").unwrap();
919
920        let encoded1 = encoder
921            .encode_value(&Value::Uuid(*uuid1.as_bytes()))
922            .unwrap();
923        let encoded2 = encoder
924            .encode_value(&Value::Uuid(*uuid2.as_bytes()))
925            .unwrap();
926
927        assert!(encoded1 < encoded2);
928    }
929
930    #[test]
931    fn test_composite_key_encoding() {
932        let mut encoder = ByteComparableEncoder::new();
933
934        let key1 = vec![Value::Text("partition1".to_string()), Value::Integer(1)];
935        let key2 = vec![Value::Text("partition1".to_string()), Value::Integer(2)];
936        let key3 = vec![Value::Text("partition2".to_string()), Value::Integer(1)];
937
938        let encoded1 = encoder.encode_composite_key(&key1).unwrap();
939        let encoded2 = encoder.encode_composite_key(&key2).unwrap();
940        let encoded3 = encoder.encode_composite_key(&key3).unwrap();
941
942        // Proper composite key ordering
943        assert!(encoded1 < encoded2); // Same partition, different clustering
944        assert!(encoded2 < encoded3); // Different partition
945    }
946
947    #[test]
948    fn test_list_encoding() {
949        let mut encoder = ByteComparableEncoder::new();
950
951        let list1 = Value::List(vec![Value::Integer(1), Value::Integer(2)]);
952        let list2 = Value::List(vec![
953            Value::Integer(1),
954            Value::Integer(2),
955            Value::Integer(3),
956        ]);
957
958        let encoded1 = encoder.encode_value(&list1).unwrap();
959        let encoded2 = encoder.encode_value(&list2).unwrap();
960
961        // Shorter list should come first
962        assert!(encoded1 < encoded2);
963    }
964
965    #[test]
966    fn test_float_special_values() {
967        let mut encoder = ByteComparableEncoder::new();
968
969        let neg_inf = encoder
970            .encode_value(&Value::Float(f64::NEG_INFINITY))
971            .unwrap();
972        let neg_one = encoder.encode_value(&Value::Float(-1.0)).unwrap();
973        let zero = encoder.encode_value(&Value::Float(0.0)).unwrap();
974        let one = encoder.encode_value(&Value::Float(1.0)).unwrap();
975        let pos_inf = encoder.encode_value(&Value::Float(f64::INFINITY)).unwrap();
976
977        // Proper float ordering
978        assert!(neg_inf < neg_one);
979        assert!(neg_one < zero);
980        assert!(zero < one);
981        assert!(one < pos_inf);
982    }
983
984    #[test]
985    fn test_decode_key_debug() {
986        let text_bytes = b"hello\0";
987        let decoded = ByteComparableDecoder::decode_key_debug(text_bytes);
988        assert!(decoded.contains("hello"));
989
990        let binary_bytes = &[0xFF, 0xFE, 0xFD];
991        let decoded = ByteComparableDecoder::decode_key_debug(binary_bytes);
992        assert!(decoded.starts_with("0x"));
993    }
994
995    #[test]
996    fn test_encoder_reuse() {
997        let mut encoder = ByteComparableEncoder::new();
998
999        let encoded1 = encoder
1000            .encode_value(&Value::Text("test1".to_string()))
1001            .unwrap();
1002        let encoded2 = encoder
1003            .encode_value(&Value::Text("test2".to_string()))
1004            .unwrap();
1005
1006        // Each encoding should be independent
1007        assert_ne!(encoded1, encoded2);
1008        assert!(encoded1 < encoded2);
1009    }
1010
1011    #[test]
1012    fn test_encoder_config() {
1013        let config = EncoderConfig {
1014            use_varint_encoding: false,
1015            max_nesting_depth: 16,
1016            enable_prefix_compression: false,
1017            strict_compliance: false,
1018        };
1019
1020        let encoder = ByteComparableEncoder::with_config(config);
1021        assert!(!encoder.config().use_varint_encoding);
1022        assert_eq!(encoder.config().max_nesting_depth, 16);
1023    }
1024
1025    #[test]
1026    fn test_max_nesting_depth() {
1027        let config = EncoderConfig {
1028            max_nesting_depth: 2,
1029            ..Default::default()
1030        };
1031
1032        let mut encoder = ByteComparableEncoder::with_config(config);
1033
1034        // Create deeply nested structure
1035        let deep_nested = Value::List(vec![Value::List(vec![Value::List(vec![Value::Integer(
1036            1,
1037        )])])]);
1038
1039        // Should fail due to depth limit
1040        let result = encoder.encode_value(&deep_nested);
1041        assert!(result.is_err());
1042    }
1043
1044    #[test]
1045    fn test_batch_encoder() {
1046        let mut batch_encoder = BatchEncoder::new();
1047
1048        let values = vec![
1049            Value::Integer(1),
1050            Value::Text("hello".to_string()),
1051            Value::Boolean(true),
1052        ];
1053
1054        let encoded_batch = batch_encoder.encode_batch(&values).unwrap();
1055        assert_eq!(encoded_batch.len(), 3);
1056
1057        // Verify individual encodings
1058        let mut single_encoder = ByteComparableEncoder::new();
1059        for (i, value) in values.iter().enumerate() {
1060            let single_encoded = single_encoder.encode_value(value).unwrap();
1061            assert_eq!(encoded_batch[i], single_encoded);
1062        }
1063    }
1064
1065    #[test]
1066    fn test_ordering_across_types() {
1067        let mut encoder = ByteComparableEncoder::new();
1068
1069        // Different types should have deterministic ordering based on type prefixes
1070        let null_val = encoder.encode_value(&Value::Null).unwrap();
1071        let bool_val = encoder.encode_value(&Value::Boolean(false)).unwrap();
1072        let int_val = encoder.encode_value(&Value::Integer(0)).unwrap();
1073        let text_val = encoder.encode_value(&Value::Text("".to_string())).unwrap();
1074
1075        // Null should come first, then booleans, then numbers, then text
1076        assert!(null_val < bool_val);
1077        assert!(bool_val < int_val);
1078        assert!(int_val < text_val);
1079    }
1080
1081    #[test]
1082    fn test_validation() {
1083        let encoder = ByteComparableEncoder::new();
1084
1085        // Test valid encodings
1086        assert!(encoder.validate_encoded_key(&[type_prefixes::NULL]).is_ok());
1087        assert!(encoder
1088            .validate_encoded_key(&[type_prefixes::BOOLEAN_TRUE])
1089            .is_ok());
1090
1091        // Test invalid encodings
1092        assert!(encoder.validate_encoded_key(&[]).is_err()); // Empty
1093        assert!(encoder
1094            .validate_encoded_key(&[type_prefixes::NULL, 0x00, 0x00, 0x00])
1095            .is_err()); // Null too long
1096    }
1097
1098    #[test]
1099    fn test_performance_stats() {
1100        let mut encoder = ByteComparableEncoder::new();
1101        encoder.reserve(1024);
1102
1103        let stats = encoder.get_stats();
1104        assert!(stats.buffer_capacity >= 1024);
1105        assert_eq!(stats.buffer_size, 0);
1106
1107        // Encode something
1108        encoder
1109            .encode_value(&Value::Text("test".to_string()))
1110            .unwrap();
1111        let stats_after = encoder.get_stats();
1112        assert!(stats_after.buffer_size > 0);
1113    }
1114
1115    #[test]
1116    fn test_timestamp_encoding() {
1117        let mut encoder = ByteComparableEncoder::new();
1118
1119        let past = encoder.encode_value(&Value::Timestamp(-1000)).unwrap();
1120        let epoch = encoder.encode_value(&Value::Timestamp(0)).unwrap();
1121        let future = encoder.encode_value(&Value::Timestamp(1000)).unwrap();
1122
1123        // All should start with timestamp prefix
1124        assert_eq!(past[0], type_prefixes::TIMESTAMP);
1125        assert_eq!(epoch[0], type_prefixes::TIMESTAMP);
1126        assert_eq!(future[0], type_prefixes::TIMESTAMP);
1127
1128        // Proper temporal ordering
1129        assert!(past < epoch);
1130        assert!(epoch < future);
1131    }
1132
1133    #[test]
1134    fn test_blob_encoding() {
1135        let mut encoder = ByteComparableEncoder::new();
1136
1137        let blob1 = encoder
1138            .encode_value(&Value::Blob(vec![0x01, 0x02]))
1139            .unwrap();
1140        let blob2 = encoder
1141            .encode_value(&Value::Blob(vec![0x01, 0x03]))
1142            .unwrap();
1143        let blob_with_null = encoder
1144            .encode_value(&Value::Blob(vec![0x01, 0x00, 0x02]))
1145            .unwrap();
1146
1147        // Should start with blob prefix
1148        assert_eq!(blob1[0], type_prefixes::BLOB);
1149        assert_eq!(blob2[0], type_prefixes::BLOB);
1150
1151        // Proper lexicographic ordering
1152        assert!(blob1 < blob2);
1153
1154        // Should contain escaped null sequence
1155        assert!(blob_with_null
1156            .windows(2)
1157            .any(|w| w == escape_sequences::ESCAPED_NULL));
1158    }
1159
1160    #[test]
1161    fn test_comprehensive_ordering() {
1162        let mut encoder = ByteComparableEncoder::new();
1163
1164        // Test a comprehensive set of values for ordering consistency
1165        let values = vec![
1166            Value::Null,
1167            Value::Boolean(false),
1168            Value::Boolean(true),
1169            Value::TinyInt(-1),
1170            Value::TinyInt(0),
1171            Value::TinyInt(1),
1172            Value::SmallInt(-100),
1173            Value::SmallInt(100),
1174            Value::Integer(-1000),
1175            Value::Integer(1000),
1176            Value::BigInt(-10000),
1177            Value::BigInt(10000),
1178            Value::Float32(-1.0),
1179            Value::Float32(1.0),
1180            Value::Float(-1.0),
1181            Value::Float(1.0),
1182            Value::Text("a".to_string()),
1183            Value::Text("z".to_string()),
1184            Value::Blob(vec![0x01]),
1185            Value::Blob(vec![0xFF]),
1186            Value::Uuid([0u8; 16]),
1187            Value::Uuid([0xFFu8; 16]),
1188            Value::Timestamp(-1000),
1189            Value::Timestamp(1000),
1190        ];
1191
1192        let encoded_values: Vec<_> = values
1193            .iter()
1194            .map(|v| encoder.encode_value(v).unwrap())
1195            .collect();
1196
1197        // Verify that lexicographic ordering of encoded values matches the input ordering
1198        for i in 0..encoded_values.len() - 1 {
1199            assert!(
1200                encoded_values[i] <= encoded_values[i + 1],
1201                "Ordering violation at index {} and {}",
1202                i,
1203                i + 1
1204            );
1205        }
1206    }
1207}