1use std::convert::TryInto;
2
3use super::error::{Result, StorageError};
4use super::value::SqlValue;
5
6pub struct KeyEncoder;
13
14impl KeyEncoder {
15 pub fn row_key(table_id: u32, row_id: u64) -> Vec<u8> {
17 let mut buf = Vec::with_capacity(1 + 4 + 8);
18 buf.push(0x01);
19 buf.extend_from_slice(&table_id.to_be_bytes());
20 buf.extend_from_slice(&row_id.to_be_bytes());
21 buf
22 }
23
24 pub fn decode_row_key(key: &[u8]) -> Result<(u32, u64)> {
26 if key.len() != 1 + 4 + 8 || key[0] != 0x01 {
27 return Err(StorageError::InvalidKeyFormat);
28 }
29 let table_id = u32::from_be_bytes(key[1..5].try_into().unwrap());
30 let row_id = u64::from_be_bytes(key[5..].try_into().unwrap());
31 Ok((table_id, row_id))
32 }
33
34 pub fn table_prefix(table_id: u32) -> Vec<u8> {
36 let mut buf = Vec::with_capacity(1 + 4);
37 buf.push(0x01);
38 buf.extend_from_slice(&table_id.to_be_bytes());
39 buf
40 }
41
42 pub fn index_key(index_id: u32, value: &SqlValue, row_id: u64) -> Result<Vec<u8>> {
44 let mut buf = Vec::with_capacity(1 + 4 + 16);
45 buf.push(0x02);
46 buf.extend_from_slice(&index_id.to_be_bytes());
47 encode_index_value(value, &mut buf)?;
48 buf.extend_from_slice(&row_id.to_be_bytes());
49 Ok(buf)
50 }
51
52 pub fn composite_index_key(index_id: u32, values: &[SqlValue], row_id: u64) -> Result<Vec<u8>> {
54 let mut buf = Vec::with_capacity(1 + 4 + values.len() * 16 + 8);
55 buf.push(0x02);
56 buf.extend_from_slice(&index_id.to_be_bytes());
57 for v in values {
58 encode_index_value(v, &mut buf)?;
59 }
60 buf.extend_from_slice(&row_id.to_be_bytes());
61 Ok(buf)
62 }
63
64 pub fn index_prefix(index_id: u32) -> Vec<u8> {
66 let mut buf = Vec::with_capacity(1 + 4);
67 buf.push(0x02);
68 buf.extend_from_slice(&index_id.to_be_bytes());
69 buf
70 }
71
72 pub fn index_value_prefix(index_id: u32, value: &SqlValue) -> Result<Vec<u8>> {
74 let mut buf = Vec::with_capacity(1 + 4 + 16);
75 buf.push(0x02);
76 buf.extend_from_slice(&index_id.to_be_bytes());
77 encode_index_value(value, &mut buf)?;
78 Ok(buf)
79 }
80
81 pub fn composite_index_prefix(index_id: u32, values: &[SqlValue]) -> Result<Vec<u8>> {
83 let mut buf = Vec::with_capacity(1 + 4 + values.len() * 16);
84 buf.push(0x02);
85 buf.extend_from_slice(&index_id.to_be_bytes());
86 for v in values {
87 encode_index_value(v, &mut buf)?;
88 }
89 Ok(buf)
90 }
91
92 pub fn sequence_key(table_id: u32) -> Vec<u8> {
94 let mut buf = Vec::with_capacity(1 + 4);
95 buf.push(0x04);
96 buf.extend_from_slice(&table_id.to_be_bytes());
97 buf
98 }
99}
100
101fn encode_index_value(value: &SqlValue, buf: &mut Vec<u8>) -> Result<()> {
102 match value {
103 SqlValue::Null => {
104 buf.push(0x00);
105 }
106 SqlValue::Integer(v) => {
107 buf.push(0x01);
108 let x = (*v as u32) ^ 0x8000_0000;
109 buf.extend_from_slice(&x.to_be_bytes());
110 }
111 SqlValue::BigInt(v) => {
112 buf.push(0x02);
113 let x = (*v as u64) ^ 0x8000_0000_0000_0000;
114 buf.extend_from_slice(&x.to_be_bytes());
115 }
116 SqlValue::Float(v) => {
117 buf.push(0x03);
118 buf.extend_from_slice(&encode_ordered_f32(*v).to_be_bytes());
119 }
120 SqlValue::Double(v) => {
121 buf.push(0x04);
122 buf.extend_from_slice(&encode_ordered_f64(*v).to_be_bytes());
123 }
124 SqlValue::Text(s) => {
125 buf.push(0x05);
126 buf.extend_from_slice(s.as_bytes());
127 buf.push(0x00); }
129 SqlValue::Blob(bytes) => {
130 buf.push(0x06);
131 let len = u32::try_from(bytes.len())
132 .expect("blob length exceeds u32::MAX (index encoding limit)");
133 buf.extend_from_slice(&len.to_be_bytes());
134 buf.extend_from_slice(bytes);
135 }
136 SqlValue::Boolean(b) => {
137 buf.push(0x07);
138 buf.push(u8::from(*b));
139 }
140 SqlValue::Timestamp(v) => {
141 buf.push(0x08);
142 let x = (*v as u64) ^ 0x8000_0000_0000_0000;
143 buf.extend_from_slice(&x.to_be_bytes());
144 }
145 SqlValue::Vector(_values) => {
146 return Err(StorageError::TypeMismatch {
148 expected: "indexable scalar type".into(),
149 actual: "Vector".into(),
150 });
151 }
152 }
153 Ok(())
154}
155
156fn encode_ordered_f32(v: f32) -> u32 {
157 let bits = v.to_bits();
158 if bits & 0x8000_0000 != 0 {
159 !bits
160 } else {
161 bits ^ 0x8000_0000
162 }
163}
164
165fn encode_ordered_f64(v: f64) -> u64 {
166 let bits = v.to_bits();
167 if bits & 0x8000_0000_0000_0000 != 0 {
168 !bits
169 } else {
170 bits ^ 0x8000_0000_0000_0000
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use super::*;
177 use proptest::prelude::*;
178 use std::cmp::Ordering;
179
180 #[test]
181 fn row_key_roundtrip() {
182 let key = KeyEncoder::row_key(10, 42);
183 assert_eq!(key.len(), 13);
184 let (t, r) = KeyEncoder::decode_row_key(&key).unwrap();
185 assert_eq!((t, r), (10, 42));
186 }
187
188 #[test]
189 fn table_prefix_matches_row_key_prefix() {
190 let prefix = KeyEncoder::table_prefix(7);
191 let key = KeyEncoder::row_key(7, 1);
192 assert!(key.starts_with(&prefix));
193 }
194
195 #[test]
196 fn index_prefix_matches_index_key_prefix() {
197 let prefix = KeyEncoder::index_prefix(5);
198 let key = KeyEncoder::index_key(5, &SqlValue::Integer(1), 99).unwrap();
199 assert!(key.starts_with(&prefix));
200 }
201
202 #[test]
203 fn integer_ordering_matches_lexicographic() {
204 assert_monotonic_ints((-128..=127).collect());
205 }
206
207 #[test]
208 fn bigint_ordering_matches_lexicographic() {
209 assert_monotonic_i64(vec![i64::MIN, -10, -1, 0, 1, 2, 100, i64::MAX]);
210 }
211
212 #[test]
213 fn float_ordering_matches_lexicographic_with_specials() {
214 let values = vec![
215 -f32::INFINITY,
216 -1.5,
217 -0.0,
218 0.0,
219 0.5,
220 f32::INFINITY,
221 f32::NAN,
222 ];
223 assert_monotonic_f32(values);
224 }
225
226 #[test]
227 fn double_ordering_matches_lexicographic_with_specials() {
228 let values = vec![
229 -f64::INFINITY,
230 -123.456,
231 -0.0,
232 0.0,
233 1.2345,
234 f64::INFINITY,
235 f64::NAN,
236 ];
237 assert_monotonic_f64(values);
238 }
239
240 #[test]
241 fn text_ordering_handles_ascii_and_multibyte() {
242 let values = vec!["", "a", "aa", "b", "é", "あ", "あい", "🍣"];
243 assert_monotonic_text(values);
244 }
245
246 #[test]
247 fn blob_ordering_respects_length_then_bytes() {
248 let values = vec![
249 vec![],
250 vec![0x00],
251 vec![0x00, 0x01],
252 vec![0x01],
253 vec![0x01, 0x00],
254 vec![0xFF],
255 ];
256 assert_monotonic_blob(values);
257 }
258
259 #[test]
260 fn boolean_ordering() {
261 assert_monotonic_bool();
262 }
263
264 #[test]
265 fn vector_is_rejected_for_index_key() {
266 let err = KeyEncoder::index_key(1, &SqlValue::Vector(vec![1.0, 2.0]), 0).unwrap_err();
267 match err {
268 StorageError::TypeMismatch { actual, .. } => {
269 assert_eq!(actual, "Vector");
270 }
271 other => panic!("expected TypeMismatch for Vector, got {other:?}"),
272 }
273 }
274
275 #[test]
276 fn timestamp_ordering() {
277 let values = vec![-5, -1, 0, 1, 10, i64::MAX];
278 assert_monotonic_timestamp(values);
279 }
280
281 #[test]
282 fn composite_key_maintains_lexicographic_tuple_order() {
283 let mut tuples = [(0, 0), (0, 1), (1, 0), (1, 2), (2, 0), (2, 2)].to_vec();
284 tuples.sort();
285
286 let mut prev: Option<Vec<u8>> = None;
287 for (i, (a, b)) in tuples.iter().enumerate() {
288 let key = KeyEncoder::composite_index_key(
289 9,
290 &[SqlValue::Integer(*a), SqlValue::Integer(*b)],
291 i as u64,
292 )
293 .unwrap();
294 if let Some(p) = prev {
295 assert!(
296 p < key,
297 "composite key ordering violated for ({a},{b}) at position {i}"
298 );
299 }
300 prev = Some(key);
301 }
302 }
303
304 fn assert_monotonic_ints(values: Vec<i32>) {
305 let mut sorted = values;
306 sorted.sort();
307 let mut prev: Option<(Vec<u8>, i32)> = None;
308 for (i, v) in sorted.iter().enumerate() {
309 let key = KeyEncoder::index_key(1, &SqlValue::Integer(*v), i as u64).unwrap();
310 if let Some((prev_key, prev_v)) = prev {
311 let ordering = prev_v.cmp(v);
312 let key_ord = prev_key.cmp(&key);
313 assert!(
314 ordering == key_ord
315 || (ordering == Ordering::Equal && key_ord == Ordering::Less),
316 "integer ordering mismatch: prev={prev_v}, curr={v}"
317 );
318 }
319 prev = Some((key, *v));
320 }
321 }
322
323 fn assert_monotonic_i64(values: Vec<i64>) {
324 let mut sorted = values;
325 sorted.sort();
326 let mut prev: Option<(Vec<u8>, i64)> = None;
327 for (i, v) in sorted.iter().enumerate() {
328 let key = KeyEncoder::index_key(1, &SqlValue::BigInt(*v), i as u64).unwrap();
329 if let Some((prev_key, prev_v)) = prev {
330 let ordering = prev_v.cmp(v);
331 let key_ord = prev_key.cmp(&key);
332 assert!(
333 ordering == key_ord
334 || (ordering == Ordering::Equal && key_ord == Ordering::Less),
335 "bigint ordering mismatch: prev={prev_v}, curr={v}"
336 );
337 }
338 prev = Some((key, *v));
339 }
340 }
341
342 fn assert_monotonic_f32(values: Vec<f32>) {
343 let mut sorted = values;
344 sorted.sort_by(|a, b| a.total_cmp(b));
345 let mut prev: Option<(Vec<u8>, f32)> = None;
346 for (i, v) in sorted.iter().enumerate() {
347 let key = KeyEncoder::index_key(1, &SqlValue::Float(*v), i as u64).unwrap();
348 if let Some((prev_key, prev_v)) = prev {
349 let ordering = prev_v.total_cmp(v);
350 let key_ord = prev_key.cmp(&key);
351 assert!(
352 ordering == key_ord
353 || (ordering == Ordering::Equal && key_ord == Ordering::Less),
354 "float ordering mismatch: prev={prev_v}, curr={v}"
355 );
356 }
357 prev = Some((key, *v));
358 }
359 }
360
361 fn assert_monotonic_f64(values: Vec<f64>) {
362 let mut sorted = values;
363 sorted.sort_by(|a, b| a.total_cmp(b));
364 let mut prev: Option<(Vec<u8>, f64)> = None;
365 for (i, v) in sorted.iter().enumerate() {
366 let key = KeyEncoder::index_key(1, &SqlValue::Double(*v), i as u64).unwrap();
367 if let Some((prev_key, prev_v)) = prev {
368 let ordering = prev_v.total_cmp(v);
369 let key_ord = prev_key.cmp(&key);
370 assert!(
371 ordering == key_ord
372 || (ordering == Ordering::Equal && key_ord == Ordering::Less),
373 "double ordering mismatch: prev={prev_v}, curr={v}"
374 );
375 }
376 prev = Some((key, *v));
377 }
378 }
379
380 fn assert_monotonic_text(values: Vec<&str>) {
381 let mut sorted = values;
382 sorted.sort();
383 let mut prev: Option<(Vec<u8>, &str)> = None;
384 for (i, v) in sorted.iter().enumerate() {
385 let key =
386 KeyEncoder::index_key(1, &SqlValue::Text((*v).to_string()), i as u64).unwrap();
387 if let Some((prev_key, prev_v)) = prev {
388 let ordering = prev_v.cmp(v);
389 let key_ord = prev_key.cmp(&key);
390 assert!(
391 ordering == key_ord
392 || (ordering == Ordering::Equal && key_ord == Ordering::Less),
393 "text ordering mismatch: prev={prev_v}, curr={v}"
394 );
395 }
396 prev = Some((key, *v));
397 }
398 }
399
400 fn assert_monotonic_blob(values: Vec<Vec<u8>>) {
401 let mut sorted = values;
402 sorted.sort_by(|a, b| match a.len().cmp(&b.len()) {
403 Ordering::Equal => a.cmp(b),
404 other => other,
405 });
406 let mut prev: Option<(Vec<u8>, Vec<u8>)> = None;
407 for (i, v) in sorted.iter().enumerate() {
408 let key = KeyEncoder::index_key(1, &SqlValue::Blob(v.clone()), i as u64).unwrap();
409 if let Some((prev_key, prev_v)) = prev {
410 let ordering = match prev_v.len().cmp(&v.len()) {
411 Ordering::Equal => prev_v.cmp(v),
412 other => other,
413 };
414 let key_ord = prev_key.cmp(&key);
415 assert!(
416 ordering == key_ord
417 || (ordering == Ordering::Equal && key_ord == Ordering::Less),
418 "blob ordering mismatch: prev={prev_v:?}, curr={v:?}"
419 );
420 }
421 prev = Some((key, v.clone()));
422 }
423 }
424
425 fn assert_monotonic_bool() {
426 let values = [false, true];
427 let mut prev: Option<(Vec<u8>, bool)> = None;
428 for (i, v) in values.iter().enumerate() {
429 let key = KeyEncoder::index_key(1, &SqlValue::Boolean(*v), i as u64).unwrap();
430 if let Some((prev_key, prev_v)) = prev {
431 let ordering = prev_v.cmp(v);
432 let key_ord = prev_key.cmp(&key);
433 assert_eq!(ordering, key_ord);
434 }
435 prev = Some((key, *v));
436 }
437 }
438
439 fn assert_monotonic_timestamp(values: Vec<i64>) {
440 let mut sorted = values;
441 sorted.sort();
442 let mut prev: Option<(Vec<u8>, i64)> = None;
443 for (i, v) in sorted.iter().enumerate() {
444 let key = KeyEncoder::index_key(1, &SqlValue::Timestamp(*v), i as u64).unwrap();
445 if let Some((prev_key, prev_v)) = prev {
446 let ordering = prev_v.cmp(v);
447 let key_ord = prev_key.cmp(&key);
448 assert!(
449 ordering == key_ord
450 || (ordering == Ordering::Equal && key_ord == Ordering::Less),
451 "timestamp ordering mismatch: prev={prev_v}, curr={v}"
452 );
453 }
454 prev = Some((key, *v));
455 }
456 }
457
458 proptest! {
459 #[test]
460 fn prop_integer_order_matches_encoded(a in any::<i32>(), b in any::<i32>()) {
461 let va = SqlValue::Integer(a);
462 let vb = SqlValue::Integer(b);
463 let ka = KeyEncoder::index_key(1, &va, 0).unwrap();
464 let kb = KeyEncoder::index_key(1, &vb, 1).unwrap();
465 let ord = a.cmp(&b);
466 let kord = ka.cmp(&kb);
467 prop_assert!(ord == kord || (ord == Ordering::Equal && kord == Ordering::Less));
468 }
469
470 #[test]
471 fn prop_bigint_order_matches_encoded(a in any::<i64>(), b in any::<i64>()) {
472 let va = SqlValue::BigInt(a);
473 let vb = SqlValue::BigInt(b);
474 let ka = KeyEncoder::index_key(1, &va, 0).unwrap();
475 let kb = KeyEncoder::index_key(1, &vb, 1).unwrap();
476 let ord = a.cmp(&b);
477 let kord = ka.cmp(&kb);
478 prop_assert!(ord == kord || (ord == Ordering::Equal && kord == Ordering::Less));
479 }
480
481 #[test]
482 fn prop_float_order_matches_encoded(a in any::<f32>(), b in any::<f32>()) {
483 let va = SqlValue::Float(a);
484 let vb = SqlValue::Float(b);
485 let ka = KeyEncoder::index_key(1, &va, 0).unwrap();
486 let kb = KeyEncoder::index_key(1, &vb, 1).unwrap();
487 let ord = a.total_cmp(&b);
488 let kord = ka.cmp(&kb);
489 prop_assert!(ord == kord || (ord == Ordering::Equal && kord == Ordering::Less));
490 }
491
492 #[test]
493 fn prop_double_order_matches_encoded(a in any::<f64>(), b in any::<f64>()) {
494 let va = SqlValue::Double(a);
495 let vb = SqlValue::Double(b);
496 let ka = KeyEncoder::index_key(1, &va, 0).unwrap();
497 let kb = KeyEncoder::index_key(1, &vb, 1).unwrap();
498 let ord = a.total_cmp(&b);
499 let kord = ka.cmp(&kb);
500 prop_assert!(ord == kord || (ord == Ordering::Equal && kord == Ordering::Less));
501 }
502
503 #[test]
504 fn prop_text_order_matches_encoded(a in ".*", b in ".*") {
505 let va = SqlValue::Text(a.clone());
506 let vb = SqlValue::Text(b.clone());
507 let ka = KeyEncoder::index_key(1, &va, 0).unwrap();
508 let kb = KeyEncoder::index_key(1, &vb, 1).unwrap();
509 let ord = a.cmp(&b);
510 let kord = ka.cmp(&kb);
511 prop_assert!(ord == kord || (ord == Ordering::Equal && kord == Ordering::Less));
512 }
513
514 #[test]
515 fn prop_blob_order_matches_encoded(a in proptest::collection::vec(any::<u8>(), 0..32), b in proptest::collection::vec(any::<u8>(), 0..32)) {
516 let va = SqlValue::Blob(a.clone());
517 let vb = SqlValue::Blob(b.clone());
518 let ka = KeyEncoder::index_key(1, &va, 0).unwrap();
519 let kb = KeyEncoder::index_key(1, &vb, 1).unwrap();
520 let ord = match a.len().cmp(&b.len()) {
521 Ordering::Equal => a.cmp(&b),
522 other => other,
523 };
524 let kord = ka.cmp(&kb);
525 prop_assert!(ord == kord || (ord == Ordering::Equal && kord == Ordering::Less));
526 }
527 }
528}