alopex_sql/storage/
key.rs

1use std::convert::TryInto;
2
3use super::error::{Result, StorageError};
4use super::value::SqlValue;
5
6/// KeyEncoder generates lexicographically ordered keys for table rows and indexes.
7///
8/// Layouts:
9/// - Row key:   0x01 | table_id (u32 BE) | row_id (u64 BE)
10/// - Index key: 0x02 | index_id (u32 BE) | encoded_value(s) | row_id (u64 BE)
11/// - Sequence:  0x04 | table_id (u32 BE)
12pub struct KeyEncoder;
13
14impl KeyEncoder {
15    /// SQL row key.
16    pub fn row_key(table_id: u32, row_id: u64) -> Vec<u8> {
17        let mut buf = Vec::with_capacity(1 + 4 + 8);
18        buf.push(0x01);
19        buf.extend_from_slice(&table_id.to_be_bytes());
20        buf.extend_from_slice(&row_id.to_be_bytes());
21        buf
22    }
23
24    /// Decode SQL row key.
25    pub fn decode_row_key(key: &[u8]) -> Result<(u32, u64)> {
26        if key.len() != 1 + 4 + 8 || key[0] != 0x01 {
27            return Err(StorageError::InvalidKeyFormat);
28        }
29        let table_id = u32::from_be_bytes(key[1..5].try_into().unwrap());
30        let row_id = u64::from_be_bytes(key[5..].try_into().unwrap());
31        Ok((table_id, row_id))
32    }
33
34    /// Prefix for all rows of a table.
35    pub fn table_prefix(table_id: u32) -> Vec<u8> {
36        let mut buf = Vec::with_capacity(1 + 4);
37        buf.push(0x01);
38        buf.extend_from_slice(&table_id.to_be_bytes());
39        buf
40    }
41
42    /// Index key (single value).
43    pub fn index_key(index_id: u32, value: &SqlValue, row_id: u64) -> Result<Vec<u8>> {
44        let mut buf = Vec::with_capacity(1 + 4 + 16);
45        buf.push(0x02);
46        buf.extend_from_slice(&index_id.to_be_bytes());
47        encode_index_value(value, &mut buf)?;
48        buf.extend_from_slice(&row_id.to_be_bytes());
49        Ok(buf)
50    }
51
52    /// Index key (composite).
53    pub fn composite_index_key(index_id: u32, values: &[SqlValue], row_id: u64) -> Result<Vec<u8>> {
54        let mut buf = Vec::with_capacity(1 + 4 + values.len() * 16 + 8);
55        buf.push(0x02);
56        buf.extend_from_slice(&index_id.to_be_bytes());
57        for v in values {
58            encode_index_value(v, &mut buf)?;
59        }
60        buf.extend_from_slice(&row_id.to_be_bytes());
61        Ok(buf)
62    }
63
64    /// Prefix for all entries of an index.
65    pub fn index_prefix(index_id: u32) -> Vec<u8> {
66        let mut buf = Vec::with_capacity(1 + 4);
67        buf.push(0x02);
68        buf.extend_from_slice(&index_id.to_be_bytes());
69        buf
70    }
71
72    /// Prefix for equality lookups on a specific value within an index.
73    pub fn index_value_prefix(index_id: u32, value: &SqlValue) -> Result<Vec<u8>> {
74        let mut buf = Vec::with_capacity(1 + 4 + 16);
75        buf.push(0x02);
76        buf.extend_from_slice(&index_id.to_be_bytes());
77        encode_index_value(value, &mut buf)?;
78        Ok(buf)
79    }
80
81    /// Prefix for equality lookups on a composite value within an index.
82    pub fn composite_index_prefix(index_id: u32, values: &[SqlValue]) -> Result<Vec<u8>> {
83        let mut buf = Vec::with_capacity(1 + 4 + values.len() * 16);
84        buf.push(0x02);
85        buf.extend_from_slice(&index_id.to_be_bytes());
86        for v in values {
87            encode_index_value(v, &mut buf)?;
88        }
89        Ok(buf)
90    }
91
92    /// Sequence key for auto-increment RowID tracking.
93    pub fn sequence_key(table_id: u32) -> Vec<u8> {
94        let mut buf = Vec::with_capacity(1 + 4);
95        buf.push(0x04);
96        buf.extend_from_slice(&table_id.to_be_bytes());
97        buf
98    }
99}
100
101fn encode_index_value(value: &SqlValue, buf: &mut Vec<u8>) -> Result<()> {
102    match value {
103        SqlValue::Null => {
104            buf.push(0x00);
105        }
106        SqlValue::Integer(v) => {
107            buf.push(0x01);
108            let x = (*v as u32) ^ 0x8000_0000;
109            buf.extend_from_slice(&x.to_be_bytes());
110        }
111        SqlValue::BigInt(v) => {
112            buf.push(0x02);
113            let x = (*v as u64) ^ 0x8000_0000_0000_0000;
114            buf.extend_from_slice(&x.to_be_bytes());
115        }
116        SqlValue::Float(v) => {
117            buf.push(0x03);
118            buf.extend_from_slice(&encode_ordered_f32(*v).to_be_bytes());
119        }
120        SqlValue::Double(v) => {
121            buf.push(0x04);
122            buf.extend_from_slice(&encode_ordered_f64(*v).to_be_bytes());
123        }
124        SqlValue::Text(s) => {
125            buf.push(0x05);
126            buf.extend_from_slice(s.as_bytes());
127            buf.push(0x00); // terminator
128        }
129        SqlValue::Blob(bytes) => {
130            buf.push(0x06);
131            let len = u32::try_from(bytes.len())
132                .expect("blob length exceeds u32::MAX (index encoding limit)");
133            buf.extend_from_slice(&len.to_be_bytes());
134            buf.extend_from_slice(bytes);
135        }
136        SqlValue::Boolean(b) => {
137            buf.push(0x07);
138            buf.push(u8::from(*b));
139        }
140        SqlValue::Timestamp(v) => {
141            buf.push(0x08);
142            let x = (*v as u64) ^ 0x8000_0000_0000_0000;
143            buf.extend_from_slice(&x.to_be_bytes());
144        }
145        SqlValue::Vector(_values) => {
146            // Vector ordering is undefined for BTree lexicographic indexes.
147            return Err(StorageError::TypeMismatch {
148                expected: "indexable scalar type".into(),
149                actual: "Vector".into(),
150            });
151        }
152    }
153    Ok(())
154}
155
156fn encode_ordered_f32(v: f32) -> u32 {
157    let bits = v.to_bits();
158    if bits & 0x8000_0000 != 0 {
159        !bits
160    } else {
161        bits ^ 0x8000_0000
162    }
163}
164
165fn encode_ordered_f64(v: f64) -> u64 {
166    let bits = v.to_bits();
167    if bits & 0x8000_0000_0000_0000 != 0 {
168        !bits
169    } else {
170        bits ^ 0x8000_0000_0000_0000
171    }
172}
173
174#[cfg(test)]
175mod tests {
176    use super::*;
177    use proptest::prelude::*;
178    use std::cmp::Ordering;
179
180    #[test]
181    fn row_key_roundtrip() {
182        let key = KeyEncoder::row_key(10, 42);
183        assert_eq!(key.len(), 13);
184        let (t, r) = KeyEncoder::decode_row_key(&key).unwrap();
185        assert_eq!((t, r), (10, 42));
186    }
187
188    #[test]
189    fn table_prefix_matches_row_key_prefix() {
190        let prefix = KeyEncoder::table_prefix(7);
191        let key = KeyEncoder::row_key(7, 1);
192        assert!(key.starts_with(&prefix));
193    }
194
195    #[test]
196    fn index_prefix_matches_index_key_prefix() {
197        let prefix = KeyEncoder::index_prefix(5);
198        let key = KeyEncoder::index_key(5, &SqlValue::Integer(1), 99).unwrap();
199        assert!(key.starts_with(&prefix));
200    }
201
202    #[test]
203    fn integer_ordering_matches_lexicographic() {
204        assert_monotonic_ints((-128..=127).collect());
205    }
206
207    #[test]
208    fn bigint_ordering_matches_lexicographic() {
209        assert_monotonic_i64(vec![i64::MIN, -10, -1, 0, 1, 2, 100, i64::MAX]);
210    }
211
212    #[test]
213    fn float_ordering_matches_lexicographic_with_specials() {
214        let values = vec![
215            -f32::INFINITY,
216            -1.5,
217            -0.0,
218            0.0,
219            0.5,
220            f32::INFINITY,
221            f32::NAN,
222        ];
223        assert_monotonic_f32(values);
224    }
225
226    #[test]
227    fn double_ordering_matches_lexicographic_with_specials() {
228        let values = vec![
229            -f64::INFINITY,
230            -123.456,
231            -0.0,
232            0.0,
233            1.2345,
234            f64::INFINITY,
235            f64::NAN,
236        ];
237        assert_monotonic_f64(values);
238    }
239
240    #[test]
241    fn text_ordering_handles_ascii_and_multibyte() {
242        let values = vec!["", "a", "aa", "b", "é", "あ", "あい", "🍣"];
243        assert_monotonic_text(values);
244    }
245
246    #[test]
247    fn blob_ordering_respects_length_then_bytes() {
248        let values = vec![
249            vec![],
250            vec![0x00],
251            vec![0x00, 0x01],
252            vec![0x01],
253            vec![0x01, 0x00],
254            vec![0xFF],
255        ];
256        assert_monotonic_blob(values);
257    }
258
259    #[test]
260    fn boolean_ordering() {
261        assert_monotonic_bool();
262    }
263
264    #[test]
265    fn vector_is_rejected_for_index_key() {
266        let err = KeyEncoder::index_key(1, &SqlValue::Vector(vec![1.0, 2.0]), 0).unwrap_err();
267        match err {
268            StorageError::TypeMismatch { actual, .. } => {
269                assert_eq!(actual, "Vector");
270            }
271            other => panic!("expected TypeMismatch for Vector, got {other:?}"),
272        }
273    }
274
275    #[test]
276    fn timestamp_ordering() {
277        let values = vec![-5, -1, 0, 1, 10, i64::MAX];
278        assert_monotonic_timestamp(values);
279    }
280
281    #[test]
282    fn composite_key_maintains_lexicographic_tuple_order() {
283        let mut tuples = [(0, 0), (0, 1), (1, 0), (1, 2), (2, 0), (2, 2)].to_vec();
284        tuples.sort();
285
286        let mut prev: Option<Vec<u8>> = None;
287        for (i, (a, b)) in tuples.iter().enumerate() {
288            let key = KeyEncoder::composite_index_key(
289                9,
290                &[SqlValue::Integer(*a), SqlValue::Integer(*b)],
291                i as u64,
292            )
293            .unwrap();
294            if let Some(p) = prev {
295                assert!(
296                    p < key,
297                    "composite key ordering violated for ({a},{b}) at position {i}"
298                );
299            }
300            prev = Some(key);
301        }
302    }
303
304    fn assert_monotonic_ints(values: Vec<i32>) {
305        let mut sorted = values;
306        sorted.sort();
307        let mut prev: Option<(Vec<u8>, i32)> = None;
308        for (i, v) in sorted.iter().enumerate() {
309            let key = KeyEncoder::index_key(1, &SqlValue::Integer(*v), i as u64).unwrap();
310            if let Some((prev_key, prev_v)) = prev {
311                let ordering = prev_v.cmp(v);
312                let key_ord = prev_key.cmp(&key);
313                assert!(
314                    ordering == key_ord
315                        || (ordering == Ordering::Equal && key_ord == Ordering::Less),
316                    "integer ordering mismatch: prev={prev_v}, curr={v}"
317                );
318            }
319            prev = Some((key, *v));
320        }
321    }
322
323    fn assert_monotonic_i64(values: Vec<i64>) {
324        let mut sorted = values;
325        sorted.sort();
326        let mut prev: Option<(Vec<u8>, i64)> = None;
327        for (i, v) in sorted.iter().enumerate() {
328            let key = KeyEncoder::index_key(1, &SqlValue::BigInt(*v), i as u64).unwrap();
329            if let Some((prev_key, prev_v)) = prev {
330                let ordering = prev_v.cmp(v);
331                let key_ord = prev_key.cmp(&key);
332                assert!(
333                    ordering == key_ord
334                        || (ordering == Ordering::Equal && key_ord == Ordering::Less),
335                    "bigint ordering mismatch: prev={prev_v}, curr={v}"
336                );
337            }
338            prev = Some((key, *v));
339        }
340    }
341
342    fn assert_monotonic_f32(values: Vec<f32>) {
343        let mut sorted = values;
344        sorted.sort_by(|a, b| a.total_cmp(b));
345        let mut prev: Option<(Vec<u8>, f32)> = None;
346        for (i, v) in sorted.iter().enumerate() {
347            let key = KeyEncoder::index_key(1, &SqlValue::Float(*v), i as u64).unwrap();
348            if let Some((prev_key, prev_v)) = prev {
349                let ordering = prev_v.total_cmp(v);
350                let key_ord = prev_key.cmp(&key);
351                assert!(
352                    ordering == key_ord
353                        || (ordering == Ordering::Equal && key_ord == Ordering::Less),
354                    "float ordering mismatch: prev={prev_v}, curr={v}"
355                );
356            }
357            prev = Some((key, *v));
358        }
359    }
360
361    fn assert_monotonic_f64(values: Vec<f64>) {
362        let mut sorted = values;
363        sorted.sort_by(|a, b| a.total_cmp(b));
364        let mut prev: Option<(Vec<u8>, f64)> = None;
365        for (i, v) in sorted.iter().enumerate() {
366            let key = KeyEncoder::index_key(1, &SqlValue::Double(*v), i as u64).unwrap();
367            if let Some((prev_key, prev_v)) = prev {
368                let ordering = prev_v.total_cmp(v);
369                let key_ord = prev_key.cmp(&key);
370                assert!(
371                    ordering == key_ord
372                        || (ordering == Ordering::Equal && key_ord == Ordering::Less),
373                    "double ordering mismatch: prev={prev_v}, curr={v}"
374                );
375            }
376            prev = Some((key, *v));
377        }
378    }
379
380    fn assert_monotonic_text(values: Vec<&str>) {
381        let mut sorted = values;
382        sorted.sort();
383        let mut prev: Option<(Vec<u8>, &str)> = None;
384        for (i, v) in sorted.iter().enumerate() {
385            let key =
386                KeyEncoder::index_key(1, &SqlValue::Text((*v).to_string()), i as u64).unwrap();
387            if let Some((prev_key, prev_v)) = prev {
388                let ordering = prev_v.cmp(v);
389                let key_ord = prev_key.cmp(&key);
390                assert!(
391                    ordering == key_ord
392                        || (ordering == Ordering::Equal && key_ord == Ordering::Less),
393                    "text ordering mismatch: prev={prev_v}, curr={v}"
394                );
395            }
396            prev = Some((key, *v));
397        }
398    }
399
400    fn assert_monotonic_blob(values: Vec<Vec<u8>>) {
401        let mut sorted = values;
402        sorted.sort_by(|a, b| match a.len().cmp(&b.len()) {
403            Ordering::Equal => a.cmp(b),
404            other => other,
405        });
406        let mut prev: Option<(Vec<u8>, Vec<u8>)> = None;
407        for (i, v) in sorted.iter().enumerate() {
408            let key = KeyEncoder::index_key(1, &SqlValue::Blob(v.clone()), i as u64).unwrap();
409            if let Some((prev_key, prev_v)) = prev {
410                let ordering = match prev_v.len().cmp(&v.len()) {
411                    Ordering::Equal => prev_v.cmp(v),
412                    other => other,
413                };
414                let key_ord = prev_key.cmp(&key);
415                assert!(
416                    ordering == key_ord
417                        || (ordering == Ordering::Equal && key_ord == Ordering::Less),
418                    "blob ordering mismatch: prev={prev_v:?}, curr={v:?}"
419                );
420            }
421            prev = Some((key, v.clone()));
422        }
423    }
424
425    fn assert_monotonic_bool() {
426        let values = [false, true];
427        let mut prev: Option<(Vec<u8>, bool)> = None;
428        for (i, v) in values.iter().enumerate() {
429            let key = KeyEncoder::index_key(1, &SqlValue::Boolean(*v), i as u64).unwrap();
430            if let Some((prev_key, prev_v)) = prev {
431                let ordering = prev_v.cmp(v);
432                let key_ord = prev_key.cmp(&key);
433                assert_eq!(ordering, key_ord);
434            }
435            prev = Some((key, *v));
436        }
437    }
438
439    fn assert_monotonic_timestamp(values: Vec<i64>) {
440        let mut sorted = values;
441        sorted.sort();
442        let mut prev: Option<(Vec<u8>, i64)> = None;
443        for (i, v) in sorted.iter().enumerate() {
444            let key = KeyEncoder::index_key(1, &SqlValue::Timestamp(*v), i as u64).unwrap();
445            if let Some((prev_key, prev_v)) = prev {
446                let ordering = prev_v.cmp(v);
447                let key_ord = prev_key.cmp(&key);
448                assert!(
449                    ordering == key_ord
450                        || (ordering == Ordering::Equal && key_ord == Ordering::Less),
451                    "timestamp ordering mismatch: prev={prev_v}, curr={v}"
452                );
453            }
454            prev = Some((key, *v));
455        }
456    }
457
458    proptest! {
459        #[test]
460        fn prop_integer_order_matches_encoded(a in any::<i32>(), b in any::<i32>()) {
461            let va = SqlValue::Integer(a);
462            let vb = SqlValue::Integer(b);
463            let ka = KeyEncoder::index_key(1, &va, 0).unwrap();
464            let kb = KeyEncoder::index_key(1, &vb, 1).unwrap();
465            let ord = a.cmp(&b);
466            let kord = ka.cmp(&kb);
467            prop_assert!(ord == kord || (ord == Ordering::Equal && kord == Ordering::Less));
468        }
469
470        #[test]
471        fn prop_bigint_order_matches_encoded(a in any::<i64>(), b in any::<i64>()) {
472            let va = SqlValue::BigInt(a);
473            let vb = SqlValue::BigInt(b);
474            let ka = KeyEncoder::index_key(1, &va, 0).unwrap();
475            let kb = KeyEncoder::index_key(1, &vb, 1).unwrap();
476            let ord = a.cmp(&b);
477            let kord = ka.cmp(&kb);
478            prop_assert!(ord == kord || (ord == Ordering::Equal && kord == Ordering::Less));
479        }
480
481        #[test]
482        fn prop_float_order_matches_encoded(a in any::<f32>(), b in any::<f32>()) {
483            let va = SqlValue::Float(a);
484            let vb = SqlValue::Float(b);
485            let ka = KeyEncoder::index_key(1, &va, 0).unwrap();
486            let kb = KeyEncoder::index_key(1, &vb, 1).unwrap();
487            let ord = a.total_cmp(&b);
488            let kord = ka.cmp(&kb);
489            prop_assert!(ord == kord || (ord == Ordering::Equal && kord == Ordering::Less));
490        }
491
492        #[test]
493        fn prop_double_order_matches_encoded(a in any::<f64>(), b in any::<f64>()) {
494            let va = SqlValue::Double(a);
495            let vb = SqlValue::Double(b);
496            let ka = KeyEncoder::index_key(1, &va, 0).unwrap();
497            let kb = KeyEncoder::index_key(1, &vb, 1).unwrap();
498            let ord = a.total_cmp(&b);
499            let kord = ka.cmp(&kb);
500            prop_assert!(ord == kord || (ord == Ordering::Equal && kord == Ordering::Less));
501        }
502
503        #[test]
504        fn prop_text_order_matches_encoded(a in ".*", b in ".*") {
505            let va = SqlValue::Text(a.clone());
506            let vb = SqlValue::Text(b.clone());
507            let ka = KeyEncoder::index_key(1, &va, 0).unwrap();
508            let kb = KeyEncoder::index_key(1, &vb, 1).unwrap();
509            let ord = a.cmp(&b);
510            let kord = ka.cmp(&kb);
511            prop_assert!(ord == kord || (ord == Ordering::Equal && kord == Ordering::Less));
512        }
513
514        #[test]
515        fn prop_blob_order_matches_encoded(a in proptest::collection::vec(any::<u8>(), 0..32), b in proptest::collection::vec(any::<u8>(), 0..32)) {
516            let va = SqlValue::Blob(a.clone());
517            let vb = SqlValue::Blob(b.clone());
518            let ka = KeyEncoder::index_key(1, &va, 0).unwrap();
519            let kb = KeyEncoder::index_key(1, &vb, 1).unwrap();
520            let ord = match a.len().cmp(&b.len()) {
521                Ordering::Equal => a.cmp(&b),
522                other => other,
523            };
524            let kord = ka.cmp(&kb);
525            prop_assert!(ord == kord || (ord == Ordering::Equal && kord == Ordering::Less));
526        }
527    }
528}