use bytes::Bytes;
use kimberlite_store::Key;
use kimberlite_types::Timestamp;
use crate::value::Value;
#[allow(clippy::cast_sign_loss)]
pub fn encode_tinyint(value: i8) -> [u8; 1] {
let unsigned = (value as u8) ^ (1u8 << 7);
[unsigned]
}
#[allow(dead_code)]
pub fn decode_tinyint(bytes: [u8; 1]) -> i8 {
let unsigned = bytes[0];
(unsigned ^ (1u8 << 7)) as i8
}
#[allow(clippy::cast_sign_loss)]
pub fn encode_smallint(value: i16) -> [u8; 2] {
let unsigned = (value as u16) ^ (1u16 << 15);
unsigned.to_be_bytes()
}
#[allow(dead_code)]
pub fn decode_smallint(bytes: [u8; 2]) -> i16 {
let unsigned = u16::from_be_bytes(bytes);
(unsigned ^ (1u16 << 15)) as i16
}
#[allow(clippy::cast_sign_loss)]
pub fn encode_integer(value: i32) -> [u8; 4] {
let unsigned = (value as u32) ^ (1u32 << 31);
unsigned.to_be_bytes()
}
#[allow(dead_code)]
pub fn decode_integer(bytes: [u8; 4]) -> i32 {
let unsigned = u32::from_be_bytes(bytes);
(unsigned ^ (1u32 << 31)) as i32
}
#[allow(clippy::cast_sign_loss)]
pub fn encode_bigint(value: i64) -> [u8; 8] {
let unsigned = (value as u64) ^ (1u64 << 63);
unsigned.to_be_bytes()
}
#[allow(dead_code)]
pub fn decode_bigint(bytes: [u8; 8]) -> i64 {
let unsigned = u64::from_be_bytes(bytes);
(unsigned ^ (1u64 << 63)) as i64
}
pub fn encode_timestamp(ts: Timestamp) -> [u8; 8] {
ts.as_nanos().to_be_bytes()
}
#[allow(dead_code)]
pub fn decode_timestamp(bytes: [u8; 8]) -> Timestamp {
Timestamp::from_nanos(u64::from_be_bytes(bytes))
}
#[allow(clippy::cast_sign_loss)]
pub fn encode_real(value: f64) -> [u8; 8] {
let bits = value.to_bits();
let key = if value.is_sign_negative() {
!bits } else {
bits ^ (1u64 << 63) };
key.to_be_bytes()
}
#[allow(dead_code)]
pub fn decode_real(bytes: [u8; 8]) -> f64 {
let key = u64::from_be_bytes(bytes);
let bits = if (key & (1u64 << 63)) == 0 {
!key } else {
key ^ (1u64 << 63) };
f64::from_bits(bits)
}
#[allow(clippy::cast_sign_loss)]
pub fn encode_decimal(value: i128, scale: u8) -> [u8; 17] {
let unsigned = (value as u128) ^ (1u128 << 127);
let mut bytes = [0u8; 17];
bytes[0..16].copy_from_slice(&unsigned.to_be_bytes());
bytes[16] = scale;
bytes
}
#[allow(dead_code)]
pub fn decode_decimal(bytes: [u8; 17]) -> (i128, u8) {
let mut value_bytes = [0u8; 16];
value_bytes.copy_from_slice(&bytes[0..16]);
let unsigned = u128::from_be_bytes(value_bytes);
let value = (unsigned ^ (1u128 << 127)) as i128;
let scale = bytes[16];
(value, scale)
}
#[allow(clippy::cast_sign_loss)]
pub fn encode_date(value: i32) -> [u8; 4] {
encode_integer(value) }
#[allow(dead_code)]
pub fn decode_date(bytes: [u8; 4]) -> i32 {
decode_integer(bytes)
}
pub fn encode_time(value: i64) -> [u8; 8] {
value.to_be_bytes()
}
#[allow(dead_code)]
pub fn decode_time(bytes: [u8; 8]) -> i64 {
i64::from_be_bytes(bytes)
}
pub fn encode_uuid(value: [u8; 16]) -> [u8; 16] {
value
}
#[allow(dead_code)]
pub fn decode_uuid(bytes: [u8; 16]) -> [u8; 16] {
bytes
}
pub fn encode_boolean(value: bool) -> [u8; 1] {
[u8::from(value)]
}
#[allow(dead_code)]
pub fn decode_boolean(byte: u8) -> bool {
byte != 0
}
pub fn encode_key(values: &[Value]) -> Key {
let mut buf = Vec::with_capacity(64);
for value in values {
match value {
Value::Null => {
buf.push(0x00); }
Value::BigInt(v) => {
buf.push(0x01); buf.extend_from_slice(&encode_bigint(*v));
}
Value::Text(s) => {
buf.push(0x02); for &byte in s.as_bytes() {
if byte == 0x00 {
buf.push(0x00);
buf.push(0xFF); } else {
buf.push(byte);
}
}
buf.push(0x00); }
Value::Boolean(b) => {
buf.push(0x03); buf.extend_from_slice(&encode_boolean(*b));
}
Value::Timestamp(ts) => {
buf.push(0x04); buf.extend_from_slice(&encode_timestamp(*ts));
}
Value::Bytes(b) => {
buf.push(0x05); for &byte in b {
if byte == 0x00 {
buf.push(0x00);
buf.push(0xFF); } else {
buf.push(byte);
}
}
buf.push(0x00); }
Value::Integer(v) => {
buf.push(0x06); buf.extend_from_slice(&encode_integer(*v));
}
Value::SmallInt(v) => {
buf.push(0x07); buf.extend_from_slice(&encode_smallint(*v));
}
Value::TinyInt(v) => {
buf.push(0x08); buf.extend_from_slice(&encode_tinyint(*v));
}
Value::Real(v) => {
buf.push(0x09); buf.extend_from_slice(&encode_real(*v));
}
Value::Decimal(v, scale) => {
buf.push(0x0A); buf.extend_from_slice(&encode_decimal(*v, *scale));
}
Value::Uuid(u) => {
buf.push(0x0B); buf.extend_from_slice(&encode_uuid(*u));
}
Value::Json(_) => {
panic!(
"JSON values cannot be used in primary keys or indexes - they are not orderable"
)
}
Value::Date(d) => {
buf.push(0x0D); buf.extend_from_slice(&encode_date(*d));
}
Value::Time(t) => {
buf.push(0x0E); buf.extend_from_slice(&encode_time(*t));
}
Value::Placeholder(idx) => {
panic!("Cannot encode unbound placeholder ${idx} - bind parameters first")
}
}
}
Key::from(buf)
}
#[allow(dead_code)]
#[inline]
fn decode_bigint_value(bytes: &[u8], pos: &mut usize) -> Value {
debug_assert!(
*pos + 8 <= bytes.len(),
"insufficient bytes for BigInt at position {pos}"
);
let arr: [u8; 8] = bytes[*pos..*pos + 8]
.try_into()
.expect("BigInt decode failed");
*pos += 8;
Value::BigInt(decode_bigint(arr))
}
#[inline]
fn decode_text_value(bytes: &[u8], pos: &mut usize) -> Value {
let mut result = Vec::new();
while *pos < bytes.len() {
debug_assert!(*pos <= bytes.len(), "position out of bounds");
let byte = bytes[*pos];
*pos += 1;
if byte == 0x00 {
if *pos < bytes.len() && bytes[*pos] == 0xFF {
result.push(0x00); *pos += 1;
} else {
break; }
} else {
result.push(byte);
}
}
let s = std::str::from_utf8(&result).expect("Text decode failed: invalid UTF-8");
debug_assert!(
std::str::from_utf8(&result).is_ok(),
"decoded text must be valid UTF-8"
);
Value::Text(s.to_string())
}
#[inline]
fn decode_boolean_value(bytes: &[u8], pos: &mut usize) -> Value {
debug_assert!(
*pos < bytes.len(),
"insufficient bytes for Boolean at position {pos}"
);
let b = decode_boolean(bytes[*pos]);
*pos += 1;
Value::Boolean(b)
}
#[inline]
fn decode_timestamp_value(bytes: &[u8], pos: &mut usize) -> Value {
debug_assert!(
*pos + 8 <= bytes.len(),
"insufficient bytes for Timestamp at position {pos}"
);
let arr: [u8; 8] = bytes[*pos..*pos + 8]
.try_into()
.expect("Timestamp decode failed");
*pos += 8;
Value::Timestamp(decode_timestamp(arr))
}
#[inline]
fn decode_bytes_value(bytes: &[u8], pos: &mut usize) -> Value {
let mut result = Vec::new();
while *pos < bytes.len() {
debug_assert!(*pos <= bytes.len(), "position out of bounds");
let byte = bytes[*pos];
*pos += 1;
if byte == 0x00 {
if *pos < bytes.len() && bytes[*pos] == 0xFF {
result.push(0x00); *pos += 1;
} else {
break; }
} else {
result.push(byte);
}
}
Value::Bytes(Bytes::from(result))
}
#[inline]
fn decode_integer_value(bytes: &[u8], pos: &mut usize) -> Value {
debug_assert!(
*pos + 4 <= bytes.len(),
"insufficient bytes for Integer at position {pos}"
);
let arr: [u8; 4] = bytes[*pos..*pos + 4]
.try_into()
.expect("Integer decode failed");
*pos += 4;
Value::Integer(decode_integer(arr))
}
#[inline]
fn decode_smallint_value(bytes: &[u8], pos: &mut usize) -> Value {
debug_assert!(
*pos + 2 <= bytes.len(),
"insufficient bytes for SmallInt at position {pos}"
);
let arr: [u8; 2] = bytes[*pos..*pos + 2]
.try_into()
.expect("SmallInt decode failed");
*pos += 2;
Value::SmallInt(decode_smallint(arr))
}
#[inline]
fn decode_tinyint_value(bytes: &[u8], pos: &mut usize) -> Value {
debug_assert!(
*pos < bytes.len(),
"insufficient bytes for TinyInt at position {pos}"
);
let arr: [u8; 1] = [bytes[*pos]];
*pos += 1;
Value::TinyInt(decode_tinyint(arr))
}
#[inline]
fn decode_real_value(bytes: &[u8], pos: &mut usize) -> Value {
debug_assert!(
*pos + 8 <= bytes.len(),
"insufficient bytes for Real at position {pos}"
);
let arr: [u8; 8] = bytes[*pos..*pos + 8]
.try_into()
.expect("Real decode failed");
*pos += 8;
Value::Real(decode_real(arr))
}
#[inline]
fn decode_decimal_value(bytes: &[u8], pos: &mut usize) -> Value {
debug_assert!(
*pos + 17 <= bytes.len(),
"insufficient bytes for Decimal at position {pos}"
);
let arr: [u8; 17] = bytes[*pos..*pos + 17]
.try_into()
.expect("Decimal decode failed");
*pos += 17;
let (val, scale) = decode_decimal(arr);
Value::Decimal(val, scale)
}
#[inline]
fn decode_uuid_value(bytes: &[u8], pos: &mut usize) -> Value {
debug_assert!(
*pos + 16 <= bytes.len(),
"insufficient bytes for Uuid at position {pos}"
);
let arr: [u8; 16] = bytes[*pos..*pos + 16]
.try_into()
.expect("Uuid decode failed");
*pos += 16;
Value::Uuid(decode_uuid(arr))
}
#[inline]
fn decode_date_value(bytes: &[u8], pos: &mut usize) -> Value {
debug_assert!(
*pos + 4 <= bytes.len(),
"insufficient bytes for Date at position {pos}"
);
let arr: [u8; 4] = bytes[*pos..*pos + 4]
.try_into()
.expect("Date decode failed");
*pos += 4;
Value::Date(decode_date(arr))
}
#[inline]
fn decode_time_value(bytes: &[u8], pos: &mut usize) -> Value {
debug_assert!(
*pos + 8 <= bytes.len(),
"insufficient bytes for Time at position {pos}"
);
let arr: [u8; 8] = bytes[*pos..*pos + 8]
.try_into()
.expect("Time decode failed");
*pos += 8;
Value::Time(decode_time(arr))
}
pub fn decode_key(key: &Key) -> Vec<Value> {
let bytes = key.as_bytes();
let mut values = Vec::new();
let mut pos = 0;
while pos < bytes.len() {
let tag = bytes[pos];
pos += 1;
let value = match tag {
0x00 => Value::Null,
0x01 => decode_bigint_value(bytes, &mut pos),
0x02 => decode_text_value(bytes, &mut pos),
0x03 => decode_boolean_value(bytes, &mut pos),
0x04 => decode_timestamp_value(bytes, &mut pos),
0x05 => decode_bytes_value(bytes, &mut pos),
0x06 => decode_integer_value(bytes, &mut pos),
0x07 => decode_smallint_value(bytes, &mut pos),
0x08 => decode_tinyint_value(bytes, &mut pos),
0x09 => decode_real_value(bytes, &mut pos),
0x0A => decode_decimal_value(bytes, &mut pos),
0x0B => decode_uuid_value(bytes, &mut pos),
0x0C => panic!("JSON values cannot be decoded from keys - they are not indexable"),
0x0D => decode_date_value(bytes, &mut pos),
0x0E => decode_time_value(bytes, &mut pos),
_ => panic!("unknown type tag {tag:#04x} at position {}", pos - 1),
};
values.push(value);
}
values
}
#[allow(dead_code)]
pub fn min_key_for_type(count: usize) -> Key {
let values: Vec<Value> = (0..count).map(|_| Value::Null).collect();
encode_key(&values)
}
pub fn successor_key(key: &Key) -> Key {
let bytes = key.as_bytes();
let mut result = bytes.to_vec();
for i in (0..result.len()).rev() {
if result[i] < 0xFF {
result[i] += 1;
return Key::from(result);
}
result[i] = 0x00;
}
result.push(0x00);
Key::from(result)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bigint_encoding_preserves_order() {
let values = [
i64::MIN,
i64::MIN + 1,
-1000,
-1,
0,
1,
1000,
i64::MAX - 1,
i64::MAX,
];
let encoded: Vec<_> = values.iter().map(|&v| encode_bigint(v)).collect();
let mut sorted = encoded.clone();
sorted.sort_unstable();
assert_eq!(encoded, sorted, "BigInt encoding should preserve ordering");
for &v in &values {
assert_eq!(decode_bigint(encode_bigint(v)), v);
}
}
#[test]
fn test_timestamp_encoding_preserves_order() {
let values = [0u64, 1, 1000, u64::MAX / 2, u64::MAX];
let encoded: Vec<_> = values
.iter()
.map(|&v| encode_timestamp(Timestamp::from_nanos(v)))
.collect();
let mut sorted = encoded.clone();
sorted.sort_unstable();
assert_eq!(
encoded, sorted,
"Timestamp encoding should preserve ordering"
);
for &v in &values {
let ts = Timestamp::from_nanos(v);
assert_eq!(decode_timestamp(encode_timestamp(ts)), ts);
}
}
#[test]
fn test_composite_key_round_trip() {
let values = vec![
Value::BigInt(42),
Value::Text("hello".to_string()),
Value::Boolean(true),
Value::Timestamp(Timestamp::from_nanos(12345)),
Value::Bytes(Bytes::from_static(b"data")),
];
let key = encode_key(&values);
let decoded = decode_key(&key);
assert_eq!(values, decoded);
}
#[test]
fn test_composite_key_ordering() {
let key1 = encode_key(&[Value::BigInt(1), Value::BigInt(1)]);
let key2 = encode_key(&[Value::BigInt(1), Value::BigInt(2)]);
let key3 = encode_key(&[Value::BigInt(2), Value::BigInt(1)]);
assert!(key1 < key2, "key1 should be less than key2");
assert!(key2 < key3, "key2 should be less than key3");
}
#[test]
fn test_successor_key() {
let key = encode_key(&[Value::BigInt(42)]);
let succ = successor_key(&key);
assert!(key < succ, "successor should be greater");
}
#[test]
fn test_null_handling() {
let key = encode_key(&[Value::Null]);
let decoded = decode_key(&key);
assert_eq!(decoded, vec![Value::Null]);
}
#[test]
fn test_text_ordering_original_bug_case() {
let short = encode_key(&[Value::Text("b".to_string())]);
let long = encode_key(&[Value::Text("aaaaaaa".to_string())]);
assert!(
long < short,
"aaaaaaa should be < b in lexicographic ordering"
);
}
#[test]
fn test_text_with_embedded_nulls() {
let cases = ["abc", "a\0bc", "a\0\0bc", "\0abc", "abc\0"];
for s in &cases {
let key = encode_key(&[Value::Text((*s).to_string())]);
let decoded = decode_key(&key);
assert_eq!(
decoded,
vec![Value::Text((*s).to_string())],
"Failed to round-trip: {s:?}"
);
}
let keys: Vec<_> = cases
.iter()
.map(|s| encode_key(&[Value::Text((*s).to_string())]))
.collect();
for i in 0..keys.len() - 1 {
assert_eq!(
cases[i].cmp(cases[i + 1]),
keys[i].cmp(&keys[i + 1]),
"Ordering not preserved between {:?} and {:?}",
cases[i],
cases[i + 1]
);
}
}
#[test]
fn test_bytes_with_embedded_nulls() {
let cases: &[&[u8]] = &[b"abc", b"a\0bc", b"a\0\0bc", b"\0abc", b"abc\0"];
for &data in cases {
let key = encode_key(&[Value::Bytes(Bytes::from(data))]);
let decoded = decode_key(&key);
assert_eq!(
decoded,
vec![Value::Bytes(Bytes::from(data))],
"Failed to round-trip: {data:?}"
);
}
let keys: Vec<_> = cases
.iter()
.map(|&data| encode_key(&[Value::Bytes(Bytes::from(data))]))
.collect();
for i in 0..keys.len() - 1 {
assert_eq!(
cases[i].cmp(cases[i + 1]),
keys[i].cmp(&keys[i + 1]),
"Ordering not preserved between {:?} and {:?}",
cases[i],
cases[i + 1]
);
}
}
#[test]
fn test_empty_text_and_bytes() {
let text = encode_key(&[Value::Text(String::new())]);
let bytes = encode_key(&[Value::Bytes(Bytes::new())]);
assert_eq!(decode_key(&text), vec![Value::Text(String::new())]);
assert_eq!(decode_key(&bytes), vec![Value::Bytes(Bytes::new())]);
}
#[test]
fn test_composite_key_text_ordering() {
let k1 = encode_key(&[Value::BigInt(1), Value::Text("aaa".to_string())]);
let k2 = encode_key(&[Value::BigInt(1), Value::Text("z".to_string())]);
let k3 = encode_key(&[Value::BigInt(2), Value::Text("a".to_string())]);
assert!(k1 < k2, "aaa should be < z");
assert!(k2 < k3, "1,z should be < 2,a");
}
#[test]
fn test_text_ordering_various_lengths() {
let cases = ["a", "aa", "aaa", "b", "ba", "baa"];
let keys: Vec<_> = cases
.iter()
.map(|s| encode_key(&[Value::Text((*s).to_string())]))
.collect();
for i in 0..keys.len() - 1 {
assert_eq!(
cases[i].cmp(cases[i + 1]),
keys[i].cmp(&keys[i + 1]),
"Ordering not preserved between {:?} and {:?}",
cases[i],
cases[i + 1]
);
}
}
#[test]
fn test_bytes_ordering_with_high_byte_values() {
let cases: &[&[u8]] = &[
&[0x00],
&[0x00, 0x00],
&[0x01],
&[0x7F],
&[0xFF],
&[0xFF, 0x00],
&[0xFF, 0xFE],
&[0xFF, 0xFF],
];
let keys: Vec<_> = cases
.iter()
.map(|&data| encode_key(&[Value::Bytes(Bytes::from(data))]))
.collect();
for i in 0..keys.len() - 1 {
assert_eq!(
cases[i].cmp(cases[i + 1]),
keys[i].cmp(&keys[i + 1]),
"Ordering not preserved between {:?} and {:?}",
cases[i],
cases[i + 1]
);
}
}
}