Skip to main content

kimberlite_query/
key_encoder.rs

1//! Lexicographic key encoding for B+tree lookups.
2//!
3//! This module provides encoding functions that preserve ordering when
4//! the encoded bytes are compared lexicographically. This enables efficient
5//! range scans on the B+tree store.
6//!
7//! # Encoding Strategies
8//!
9//! - **`BigInt`**: Sign-flip encoding (XOR with 0x80 on high byte)
10//! - **Text/Bytes**: Null-terminated with escape sequences (0x00 → 0x00 0xFF)
11//! - **Timestamp**: Big-endian u64 (naturally lexicographic)
12//! - **Boolean**: 0x00 for false, 0x01 for true
13//!
14//! The null-termination approach (inspired by `FoundationDB`) preserves lexicographic
15//! ordering while handling embedded nulls correctly. The escape sequence 0x00 0xFF
16//! sorts after the terminator 0x00, maintaining proper ordering.
17
18use bytes::Bytes;
19use kimberlite_store::Key;
20use kimberlite_types::Timestamp;
21
22use crate::value::Value;
23
24/// Encodes a `TinyInt` (i8) for lexicographic ordering.
25#[allow(clippy::cast_sign_loss)]
26pub fn encode_tinyint(value: i8) -> [u8; 1] {
27    let unsigned = (value as u8) ^ (1u8 << 7);
28    [unsigned]
29}
30
31/// Decodes a `TinyInt` from sign-flip encoding.
32#[allow(dead_code)]
33pub fn decode_tinyint(bytes: [u8; 1]) -> i8 {
34    let unsigned = bytes[0];
35    (unsigned ^ (1u8 << 7)) as i8
36}
37
38/// Encodes a `SmallInt` (i16) for lexicographic ordering.
39#[allow(clippy::cast_sign_loss)]
40pub fn encode_smallint(value: i16) -> [u8; 2] {
41    let unsigned = (value as u16) ^ (1u16 << 15);
42    unsigned.to_be_bytes()
43}
44
45/// Decodes a `SmallInt` from sign-flip encoding.
46#[allow(dead_code)]
47pub fn decode_smallint(bytes: [u8; 2]) -> i16 {
48    let unsigned = u16::from_be_bytes(bytes);
49    (unsigned ^ (1u16 << 15)) as i16
50}
51
52/// Encodes an `Integer` (i32) for lexicographic ordering.
53#[allow(clippy::cast_sign_loss)]
54pub fn encode_integer(value: i32) -> [u8; 4] {
55    let unsigned = (value as u32) ^ (1u32 << 31);
56    unsigned.to_be_bytes()
57}
58
59/// Decodes an `Integer` from sign-flip encoding.
60#[allow(dead_code)]
61pub fn decode_integer(bytes: [u8; 4]) -> i32 {
62    let unsigned = u32::from_be_bytes(bytes);
63    (unsigned ^ (1u32 << 31)) as i32
64}
65
66/// Encodes a `BigInt` for lexicographic ordering.
67///
68/// Uses sign-flip encoding: XOR the high byte with 0x80 so that
69/// negative numbers sort before positive numbers in byte order.
70///
71/// ```text
72/// i64::MIN (-9223372036854775808) -> 0x00_00_00_00_00_00_00_00
73/// -1                              -> 0x7F_FF_FF_FF_FF_FF_FF_FF
74///  0                              -> 0x80_00_00_00_00_00_00_00
75///  1                              -> 0x80_00_00_00_00_00_00_01
76/// i64::MAX (9223372036854775807)  -> 0xFF_FF_FF_FF_FF_FF_FF_FF
77/// ```
78#[allow(clippy::cast_sign_loss)]
79pub fn encode_bigint(value: i64) -> [u8; 8] {
80    // Convert to big-endian, then flip the sign bit
81    // The cast is intentional for sign-flip encoding.
82    let unsigned = (value as u64) ^ (1u64 << 63);
83    unsigned.to_be_bytes()
84}
85
86/// Decodes a `BigInt` from sign-flip encoding.
87#[allow(dead_code)]
88pub fn decode_bigint(bytes: [u8; 8]) -> i64 {
89    let unsigned = u64::from_be_bytes(bytes);
90    (unsigned ^ (1u64 << 63)) as i64
91}
92
93/// Encodes a Timestamp for lexicographic ordering.
94///
95/// Timestamps are u64 nanoseconds, which are naturally ordered
96/// when stored as big-endian bytes.
97pub fn encode_timestamp(ts: Timestamp) -> [u8; 8] {
98    ts.as_nanos().to_be_bytes()
99}
100
101/// Decodes a Timestamp from big-endian encoding.
102#[allow(dead_code)]
103pub fn decode_timestamp(bytes: [u8; 8]) -> Timestamp {
104    Timestamp::from_nanos(u64::from_be_bytes(bytes))
105}
106
107/// Encodes a `Real` (f64) for lexicographic ordering with total ordering.
108///
109/// NaN < -Inf < negative values < -0.0 < +0.0 < positive values < +Inf
110#[allow(clippy::cast_sign_loss)]
111pub fn encode_real(value: f64) -> [u8; 8] {
112    let bits = value.to_bits();
113
114    // Sign-flip encoding for total ordering
115    let key = if value.is_sign_negative() {
116        !bits // Flip all bits for negatives
117    } else {
118        bits ^ (1u64 << 63) // Flip only sign bit for positives
119    };
120
121    key.to_be_bytes()
122}
123
124/// Decodes a `Real` from sign-flip encoding.
125#[allow(dead_code)]
126pub fn decode_real(bytes: [u8; 8]) -> f64 {
127    let key = u64::from_be_bytes(bytes);
128
129    // Check if original was negative (MSB is 0 in key)
130    let bits = if (key & (1u64 << 63)) == 0 {
131        !key // Was negative, flip all bits back
132    } else {
133        key ^ (1u64 << 63) // Was positive, flip only sign bit
134    };
135
136    f64::from_bits(bits)
137}
138
139/// Encodes a `Decimal` (i128, u8) for lexicographic ordering.
140///
141/// Format: [sign-flipped i128 16 bytes][scale 1 byte]
142#[allow(clippy::cast_sign_loss)]
143pub fn encode_decimal(value: i128, scale: u8) -> [u8; 17] {
144    let unsigned = (value as u128) ^ (1u128 << 127);
145    let mut bytes = [0u8; 17];
146    bytes[0..16].copy_from_slice(&unsigned.to_be_bytes());
147    bytes[16] = scale;
148    bytes
149}
150
151/// Decodes a `Decimal` from sign-flip encoding.
152#[allow(dead_code)]
153pub fn decode_decimal(bytes: [u8; 17]) -> (i128, u8) {
154    let mut value_bytes = [0u8; 16];
155    value_bytes.copy_from_slice(&bytes[0..16]);
156    let unsigned = u128::from_be_bytes(value_bytes);
157    let value = (unsigned ^ (1u128 << 127)) as i128;
158    let scale = bytes[16];
159    (value, scale)
160}
161
162/// Encodes a `Date` (i32) for lexicographic ordering.
163#[allow(clippy::cast_sign_loss)]
164pub fn encode_date(value: i32) -> [u8; 4] {
165    encode_integer(value) // Same as Integer
166}
167
168/// Decodes a `Date` from sign-flip encoding.
169#[allow(dead_code)]
170pub fn decode_date(bytes: [u8; 4]) -> i32 {
171    decode_integer(bytes)
172}
173
174/// Encodes a `Time` (i64) for lexicographic ordering.
175///
176/// Time is nanoseconds within a day (always positive), so big-endian is sufficient.
177pub fn encode_time(value: i64) -> [u8; 8] {
178    value.to_be_bytes()
179}
180
181/// Decodes a `Time` from big-endian encoding.
182#[allow(dead_code)]
183pub fn decode_time(bytes: [u8; 8]) -> i64 {
184    i64::from_be_bytes(bytes)
185}
186
187/// Encodes a `Uuid` for lexicographic ordering.
188///
189/// UUIDs are already 16 bytes in a comparable format (RFC 4122).
190pub fn encode_uuid(value: [u8; 16]) -> [u8; 16] {
191    value
192}
193
194/// Decodes a `Uuid` from its encoding.
195#[allow(dead_code)]
196pub fn decode_uuid(bytes: [u8; 16]) -> [u8; 16] {
197    bytes
198}
199
200/// Encodes a boolean for lexicographic ordering.
201pub fn encode_boolean(value: bool) -> [u8; 1] {
202    [u8::from(value)]
203}
204
205/// Decodes a boolean from its encoded form.
206#[allow(dead_code)]
207pub fn decode_boolean(byte: u8) -> bool {
208    byte != 0
209}
210
211/// Encodes a composite key from multiple values.
212///
213/// Each value is tagged and encoded to enable unambiguous decoding
214/// and to preserve lexicographic ordering.
215///
216/// # Encoding Format
217///
218/// For each value:
219/// - 1 byte: Type tag (0x00=Null, 0x01=BigInt, 0x02=Text, 0x03=Boolean, 0x04=Timestamp, 0x05=Bytes,
220///   0x06=Integer, 0x07=SmallInt, 0x08=TinyInt, 0x09=Real, 0x0A=Decimal,
221///   0x0B=Uuid, 0x0C=Json, 0x0D=Date, 0x0E=Time)
222/// - Variable: Encoded value
223///
224/// For variable-length types (Text, Bytes):
225/// - N bytes: Data with embedded nulls escaped as 0x00 0xFF
226/// - 1 byte: Terminator 0x00
227///
228/// # Panics
229///
230/// Panics if the value is a `Placeholder` or `Json` (JSON is not indexable).
231pub fn encode_key(values: &[Value]) -> Key {
232    let mut buf = Vec::with_capacity(64);
233
234    for value in values {
235        match value {
236            Value::Null => {
237                buf.push(0x00); // Type tag for NULL
238            }
239            Value::BigInt(v) => {
240                buf.push(0x01); // Type tag for BigInt
241                buf.extend_from_slice(&encode_bigint(*v));
242            }
243            Value::Text(s) => {
244                buf.push(0x02); // Type tag for Text
245                for &byte in s.as_bytes() {
246                    if byte == 0x00 {
247                        buf.push(0x00);
248                        buf.push(0xFF); // Escape embedded null
249                    } else {
250                        buf.push(byte);
251                    }
252                }
253                buf.push(0x00); // Terminator
254            }
255            Value::Boolean(b) => {
256                buf.push(0x03); // Type tag for Boolean
257                buf.extend_from_slice(&encode_boolean(*b));
258            }
259            Value::Timestamp(ts) => {
260                buf.push(0x04); // Type tag for Timestamp
261                buf.extend_from_slice(&encode_timestamp(*ts));
262            }
263            Value::Bytes(b) => {
264                buf.push(0x05); // Type tag for Bytes
265                for &byte in b {
266                    if byte == 0x00 {
267                        buf.push(0x00);
268                        buf.push(0xFF); // Escape embedded null
269                    } else {
270                        buf.push(byte);
271                    }
272                }
273                buf.push(0x00); // Terminator
274            }
275            Value::Integer(v) => {
276                buf.push(0x06); // Type tag for Integer
277                buf.extend_from_slice(&encode_integer(*v));
278            }
279            Value::SmallInt(v) => {
280                buf.push(0x07); // Type tag for SmallInt
281                buf.extend_from_slice(&encode_smallint(*v));
282            }
283            Value::TinyInt(v) => {
284                buf.push(0x08); // Type tag for TinyInt
285                buf.extend_from_slice(&encode_tinyint(*v));
286            }
287            Value::Real(v) => {
288                buf.push(0x09); // Type tag for Real
289                buf.extend_from_slice(&encode_real(*v));
290            }
291            Value::Decimal(v, scale) => {
292                buf.push(0x0A); // Type tag for Decimal
293                buf.extend_from_slice(&encode_decimal(*v, *scale));
294            }
295            Value::Uuid(u) => {
296                buf.push(0x0B); // Type tag for Uuid
297                buf.extend_from_slice(&encode_uuid(*u));
298            }
299            Value::Json(_) => {
300                panic!(
301                    "JSON values cannot be used in primary keys or indexes - they are not orderable"
302                )
303            }
304            Value::Date(d) => {
305                buf.push(0x0D); // Type tag for Date
306                buf.extend_from_slice(&encode_date(*d));
307            }
308            Value::Time(t) => {
309                buf.push(0x0E); // Type tag for Time
310                buf.extend_from_slice(&encode_time(*t));
311            }
312            Value::Placeholder(idx) => {
313                panic!("Cannot encode unbound placeholder ${idx} - bind parameters first")
314            }
315        }
316    }
317
318    Key::from(buf)
319}
320
321/// Decodes a composite key back into values.
322///
323/// # Panics
324///
325/// Panics if the encoded data is malformed.
326#[allow(dead_code)]
327/// Decodes a `BigInt` value from key bytes.
328#[inline]
329fn decode_bigint_value(bytes: &[u8], pos: &mut usize) -> Value {
330    debug_assert!(
331        *pos + 8 <= bytes.len(),
332        "insufficient bytes for BigInt at position {pos}"
333    );
334    let arr: [u8; 8] = bytes[*pos..*pos + 8]
335        .try_into()
336        .expect("BigInt decode failed");
337    *pos += 8;
338    Value::BigInt(decode_bigint(arr))
339}
340
341/// Decodes a Text value from key bytes.
342#[inline]
343fn decode_text_value(bytes: &[u8], pos: &mut usize) -> Value {
344    let mut result = Vec::new();
345
346    while *pos < bytes.len() {
347        debug_assert!(*pos <= bytes.len(), "position out of bounds");
348        let byte = bytes[*pos];
349        *pos += 1;
350
351        if byte == 0x00 {
352            if *pos < bytes.len() && bytes[*pos] == 0xFF {
353                result.push(0x00); // Escaped null
354                *pos += 1;
355            } else {
356                break; // Terminator
357            }
358        } else {
359            result.push(byte);
360        }
361    }
362
363    let s = std::str::from_utf8(&result).expect("Text decode failed: invalid UTF-8");
364    debug_assert!(
365        std::str::from_utf8(&result).is_ok(),
366        "decoded text must be valid UTF-8"
367    );
368    Value::Text(s.to_string())
369}
370
371/// Decodes a Boolean value from key bytes.
372#[inline]
373fn decode_boolean_value(bytes: &[u8], pos: &mut usize) -> Value {
374    debug_assert!(
375        *pos < bytes.len(),
376        "insufficient bytes for Boolean at position {pos}"
377    );
378    let b = decode_boolean(bytes[*pos]);
379    *pos += 1;
380    Value::Boolean(b)
381}
382
383/// Decodes a Timestamp value from key bytes.
384#[inline]
385fn decode_timestamp_value(bytes: &[u8], pos: &mut usize) -> Value {
386    debug_assert!(
387        *pos + 8 <= bytes.len(),
388        "insufficient bytes for Timestamp at position {pos}"
389    );
390    let arr: [u8; 8] = bytes[*pos..*pos + 8]
391        .try_into()
392        .expect("Timestamp decode failed");
393    *pos += 8;
394    Value::Timestamp(decode_timestamp(arr))
395}
396
397/// Decodes a Bytes value from key bytes.
398#[inline]
399fn decode_bytes_value(bytes: &[u8], pos: &mut usize) -> Value {
400    let mut result = Vec::new();
401
402    while *pos < bytes.len() {
403        debug_assert!(*pos <= bytes.len(), "position out of bounds");
404        let byte = bytes[*pos];
405        *pos += 1;
406
407        if byte == 0x00 {
408            if *pos < bytes.len() && bytes[*pos] == 0xFF {
409                result.push(0x00); // Escaped null
410                *pos += 1;
411            } else {
412                break; // Terminator
413            }
414        } else {
415            result.push(byte);
416        }
417    }
418
419    Value::Bytes(Bytes::from(result))
420}
421
422/// Decodes an Integer value from key bytes.
423#[inline]
424fn decode_integer_value(bytes: &[u8], pos: &mut usize) -> Value {
425    debug_assert!(
426        *pos + 4 <= bytes.len(),
427        "insufficient bytes for Integer at position {pos}"
428    );
429    let arr: [u8; 4] = bytes[*pos..*pos + 4]
430        .try_into()
431        .expect("Integer decode failed");
432    *pos += 4;
433    Value::Integer(decode_integer(arr))
434}
435
436/// Decodes a `SmallInt` value from key bytes.
437#[inline]
438fn decode_smallint_value(bytes: &[u8], pos: &mut usize) -> Value {
439    debug_assert!(
440        *pos + 2 <= bytes.len(),
441        "insufficient bytes for SmallInt at position {pos}"
442    );
443    let arr: [u8; 2] = bytes[*pos..*pos + 2]
444        .try_into()
445        .expect("SmallInt decode failed");
446    *pos += 2;
447    Value::SmallInt(decode_smallint(arr))
448}
449
450/// Decodes a `TinyInt` value from key bytes.
451#[inline]
452fn decode_tinyint_value(bytes: &[u8], pos: &mut usize) -> Value {
453    debug_assert!(
454        *pos < bytes.len(),
455        "insufficient bytes for TinyInt at position {pos}"
456    );
457    let arr: [u8; 1] = [bytes[*pos]];
458    *pos += 1;
459    Value::TinyInt(decode_tinyint(arr))
460}
461
462/// Decodes a Real value from key bytes.
463#[inline]
464fn decode_real_value(bytes: &[u8], pos: &mut usize) -> Value {
465    debug_assert!(
466        *pos + 8 <= bytes.len(),
467        "insufficient bytes for Real at position {pos}"
468    );
469    let arr: [u8; 8] = bytes[*pos..*pos + 8]
470        .try_into()
471        .expect("Real decode failed");
472    *pos += 8;
473    Value::Real(decode_real(arr))
474}
475
476/// Decodes a Decimal value from key bytes.
477#[inline]
478fn decode_decimal_value(bytes: &[u8], pos: &mut usize) -> Value {
479    debug_assert!(
480        *pos + 17 <= bytes.len(),
481        "insufficient bytes for Decimal at position {pos}"
482    );
483    let arr: [u8; 17] = bytes[*pos..*pos + 17]
484        .try_into()
485        .expect("Decimal decode failed");
486    *pos += 17;
487    let (val, scale) = decode_decimal(arr);
488    Value::Decimal(val, scale)
489}
490
491/// Decodes a Uuid value from key bytes.
492#[inline]
493fn decode_uuid_value(bytes: &[u8], pos: &mut usize) -> Value {
494    debug_assert!(
495        *pos + 16 <= bytes.len(),
496        "insufficient bytes for Uuid at position {pos}"
497    );
498    let arr: [u8; 16] = bytes[*pos..*pos + 16]
499        .try_into()
500        .expect("Uuid decode failed");
501    *pos += 16;
502    Value::Uuid(decode_uuid(arr))
503}
504
505/// Decodes a Date value from key bytes.
506#[inline]
507fn decode_date_value(bytes: &[u8], pos: &mut usize) -> Value {
508    debug_assert!(
509        *pos + 4 <= bytes.len(),
510        "insufficient bytes for Date at position {pos}"
511    );
512    let arr: [u8; 4] = bytes[*pos..*pos + 4]
513        .try_into()
514        .expect("Date decode failed");
515    *pos += 4;
516    Value::Date(decode_date(arr))
517}
518
519/// Decodes a Time value from key bytes.
520#[inline]
521fn decode_time_value(bytes: &[u8], pos: &mut usize) -> Value {
522    debug_assert!(
523        *pos + 8 <= bytes.len(),
524        "insufficient bytes for Time at position {pos}"
525    );
526    let arr: [u8; 8] = bytes[*pos..*pos + 8]
527        .try_into()
528        .expect("Time decode failed");
529    *pos += 8;
530    Value::Time(decode_time(arr))
531}
532
533pub fn decode_key(key: &Key) -> Vec<Value> {
534    let bytes = key.as_bytes();
535    let mut values = Vec::new();
536    let mut pos = 0;
537
538    while pos < bytes.len() {
539        let tag = bytes[pos];
540        pos += 1;
541
542        let value = match tag {
543            0x00 => Value::Null,
544            0x01 => decode_bigint_value(bytes, &mut pos),
545            0x02 => decode_text_value(bytes, &mut pos),
546            0x03 => decode_boolean_value(bytes, &mut pos),
547            0x04 => decode_timestamp_value(bytes, &mut pos),
548            0x05 => decode_bytes_value(bytes, &mut pos),
549            0x06 => decode_integer_value(bytes, &mut pos),
550            0x07 => decode_smallint_value(bytes, &mut pos),
551            0x08 => decode_tinyint_value(bytes, &mut pos),
552            0x09 => decode_real_value(bytes, &mut pos),
553            0x0A => decode_decimal_value(bytes, &mut pos),
554            0x0B => decode_uuid_value(bytes, &mut pos),
555            0x0C => panic!("JSON values cannot be decoded from keys - they are not indexable"),
556            0x0D => decode_date_value(bytes, &mut pos),
557            0x0E => decode_time_value(bytes, &mut pos),
558            _ => panic!("unknown type tag {tag:#04x} at position {}", pos - 1),
559        };
560
561        values.push(value);
562    }
563
564    values
565}
566
567/// Creates a key that represents the minimum value for a type.
568///
569/// Useful for constructing range scan bounds.
570#[allow(dead_code)]
571pub fn min_key_for_type(count: usize) -> Key {
572    let values: Vec<Value> = (0..count).map(|_| Value::Null).collect();
573    encode_key(&values)
574}
575
576/// Creates a key that is greater than any key starting with the given prefix.
577///
578/// Used for exclusive upper bounds in range scans.
579pub fn successor_key(key: &Key) -> Key {
580    let bytes = key.as_bytes();
581    let mut result = bytes.to_vec();
582
583    // Increment the last byte, carrying as needed
584    for i in (0..result.len()).rev() {
585        if result[i] < 0xFF {
586            result[i] += 1;
587            return Key::from(result);
588        }
589        result[i] = 0x00;
590    }
591
592    // All bytes were 0xFF, append 0x00 to make it larger
593    result.push(0x00);
594    Key::from(result)
595}
596
597#[cfg(test)]
598mod tests {
599    use super::*;
600
601    #[test]
602    fn test_bigint_encoding_preserves_order() {
603        let values = [
604            i64::MIN,
605            i64::MIN + 1,
606            -1000,
607            -1,
608            0,
609            1,
610            1000,
611            i64::MAX - 1,
612            i64::MAX,
613        ];
614
615        let encoded: Vec<_> = values.iter().map(|&v| encode_bigint(v)).collect();
616        let mut sorted = encoded.clone();
617        sorted.sort_unstable();
618
619        assert_eq!(encoded, sorted, "BigInt encoding should preserve ordering");
620
621        // Verify decode round-trips
622        for &v in &values {
623            assert_eq!(decode_bigint(encode_bigint(v)), v);
624        }
625    }
626
627    #[test]
628    fn test_timestamp_encoding_preserves_order() {
629        let values = [0u64, 1, 1000, u64::MAX / 2, u64::MAX];
630
631        let encoded: Vec<_> = values
632            .iter()
633            .map(|&v| encode_timestamp(Timestamp::from_nanos(v)))
634            .collect();
635        let mut sorted = encoded.clone();
636        sorted.sort_unstable();
637
638        assert_eq!(
639            encoded, sorted,
640            "Timestamp encoding should preserve ordering"
641        );
642
643        // Verify decode round-trips
644        for &v in &values {
645            let ts = Timestamp::from_nanos(v);
646            assert_eq!(decode_timestamp(encode_timestamp(ts)), ts);
647        }
648    }
649
650    #[test]
651    fn test_composite_key_round_trip() {
652        let values = vec![
653            Value::BigInt(42),
654            Value::Text("hello".to_string()),
655            Value::Boolean(true),
656            Value::Timestamp(Timestamp::from_nanos(12345)),
657            Value::Bytes(Bytes::from_static(b"data")),
658        ];
659
660        let key = encode_key(&values);
661        let decoded = decode_key(&key);
662
663        assert_eq!(values, decoded);
664    }
665
666    #[test]
667    fn test_composite_key_ordering() {
668        // Keys with same first value, different second value
669        let key1 = encode_key(&[Value::BigInt(1), Value::BigInt(1)]);
670        let key2 = encode_key(&[Value::BigInt(1), Value::BigInt(2)]);
671        let key3 = encode_key(&[Value::BigInt(2), Value::BigInt(1)]);
672
673        assert!(key1 < key2, "key1 should be less than key2");
674        assert!(key2 < key3, "key2 should be less than key3");
675    }
676
677    #[test]
678    fn test_successor_key() {
679        let key = encode_key(&[Value::BigInt(42)]);
680        let succ = successor_key(&key);
681
682        assert!(key < succ, "successor should be greater");
683    }
684
685    #[test]
686    fn test_null_handling() {
687        let key = encode_key(&[Value::Null]);
688        let decoded = decode_key(&key);
689        assert_eq!(decoded, vec![Value::Null]);
690    }
691
692    #[test]
693    fn test_text_ordering_original_bug_case() {
694        // The exact bug case from the report
695        let short = encode_key(&[Value::Text("b".to_string())]);
696        let long = encode_key(&[Value::Text("aaaaaaa".to_string())]);
697        assert!(
698            long < short,
699            "aaaaaaa should be < b in lexicographic ordering"
700        );
701    }
702
703    #[test]
704    fn test_text_with_embedded_nulls() {
705        let cases = ["abc", "a\0bc", "a\0\0bc", "\0abc", "abc\0"];
706
707        // Round-trip
708        for s in &cases {
709            let key = encode_key(&[Value::Text((*s).to_string())]);
710            let decoded = decode_key(&key);
711            assert_eq!(
712                decoded,
713                vec![Value::Text((*s).to_string())],
714                "Failed to round-trip: {s:?}"
715            );
716        }
717
718        // Ordering preserved
719        let keys: Vec<_> = cases
720            .iter()
721            .map(|s| encode_key(&[Value::Text((*s).to_string())]))
722            .collect();
723        for i in 0..keys.len() - 1 {
724            assert_eq!(
725                cases[i].cmp(cases[i + 1]),
726                keys[i].cmp(&keys[i + 1]),
727                "Ordering not preserved between {:?} and {:?}",
728                cases[i],
729                cases[i + 1]
730            );
731        }
732    }
733
734    #[test]
735    fn test_bytes_with_embedded_nulls() {
736        let cases: &[&[u8]] = &[b"abc", b"a\0bc", b"a\0\0bc", b"\0abc", b"abc\0"];
737
738        // Round-trip
739        for &data in cases {
740            let key = encode_key(&[Value::Bytes(Bytes::from(data))]);
741            let decoded = decode_key(&key);
742            assert_eq!(
743                decoded,
744                vec![Value::Bytes(Bytes::from(data))],
745                "Failed to round-trip: {data:?}"
746            );
747        }
748
749        // Ordering preserved
750        let keys: Vec<_> = cases
751            .iter()
752            .map(|&data| encode_key(&[Value::Bytes(Bytes::from(data))]))
753            .collect();
754        for i in 0..keys.len() - 1 {
755            assert_eq!(
756                cases[i].cmp(cases[i + 1]),
757                keys[i].cmp(&keys[i + 1]),
758                "Ordering not preserved between {:?} and {:?}",
759                cases[i],
760                cases[i + 1]
761            );
762        }
763    }
764
765    #[test]
766    fn test_empty_text_and_bytes() {
767        let text = encode_key(&[Value::Text(String::new())]);
768        let bytes = encode_key(&[Value::Bytes(Bytes::new())]);
769
770        assert_eq!(decode_key(&text), vec![Value::Text(String::new())]);
771        assert_eq!(decode_key(&bytes), vec![Value::Bytes(Bytes::new())]);
772    }
773
774    #[test]
775    fn test_composite_key_text_ordering() {
776        let k1 = encode_key(&[Value::BigInt(1), Value::Text("aaa".to_string())]);
777        let k2 = encode_key(&[Value::BigInt(1), Value::Text("z".to_string())]);
778        let k3 = encode_key(&[Value::BigInt(2), Value::Text("a".to_string())]);
779
780        assert!(k1 < k2, "aaa should be < z");
781        assert!(k2 < k3, "1,z should be < 2,a");
782    }
783
784    #[test]
785    fn test_text_ordering_various_lengths() {
786        let cases = ["a", "aa", "aaa", "b", "ba", "baa"];
787
788        let keys: Vec<_> = cases
789            .iter()
790            .map(|s| encode_key(&[Value::Text((*s).to_string())]))
791            .collect();
792
793        for i in 0..keys.len() - 1 {
794            assert_eq!(
795                cases[i].cmp(cases[i + 1]),
796                keys[i].cmp(&keys[i + 1]),
797                "Ordering not preserved between {:?} and {:?}",
798                cases[i],
799                cases[i + 1]
800            );
801        }
802    }
803
804    #[test]
805    fn test_bytes_ordering_with_high_byte_values() {
806        let cases: &[&[u8]] = &[
807            &[0x00],
808            &[0x00, 0x00],
809            &[0x01],
810            &[0x7F],
811            &[0xFF],
812            &[0xFF, 0x00],
813            &[0xFF, 0xFE],
814            &[0xFF, 0xFF],
815        ];
816
817        let keys: Vec<_> = cases
818            .iter()
819            .map(|&data| encode_key(&[Value::Bytes(Bytes::from(data))]))
820            .collect();
821
822        for i in 0..keys.len() - 1 {
823            assert_eq!(
824                cases[i].cmp(cases[i + 1]),
825                keys[i].cmp(&keys[i + 1]),
826                "Ordering not preserved between {:?} and {:?}",
827                cases[i],
828                cases[i + 1]
829            );
830        }
831    }
832}