fn packed_int_len(value: usize) -> usize {
if value <= 119 {
1
} else if value - 119 <= 0xFF {
2
} else if value - 119 <= 0xFFFF {
3
} else if value - 119 <= 0xFFFFFF {
4
} else {
5
}
}
fn write_packed_int_at(buf: &mut Vec<u8>, start_off: usize, value: usize) {
let len = packed_int_len(value);
if buf.len() < start_off + len {
buf.resize(start_off + len, 0);
}
if value <= 119 {
buf[start_off] = value as u8;
return;
}
let abs_val = (value - 119) as u64;
let marker_off = start_off + len - 1;
match len {
2 => {
buf[start_off] = abs_val as u8;
buf[marker_off] = 120;
}
3 => {
buf[start_off] = (abs_val >> 8) as u8;
buf[start_off + 1] = abs_val as u8;
buf[marker_off] = 121;
}
4 => {
buf[start_off] = (abs_val >> 16) as u8;
buf[start_off + 1] = (abs_val >> 8) as u8;
buf[start_off + 2] = abs_val as u8;
buf[marker_off] = 122;
}
5 => {
buf[start_off] = (abs_val >> 24) as u8;
buf[start_off + 1] = (abs_val >> 16) as u8;
buf[start_off + 2] = (abs_val >> 8) as u8;
buf[start_off + 3] = abs_val as u8;
buf[marker_off] = 123;
}
_ => unreachable!(),
}
}
fn read_packed_int_from_end(buf: &[u8]) -> Option<(usize, usize)> {
if buf.is_empty() {
return None;
}
let marker = buf[buf.len() - 1];
let marker_i = marker as i8;
if (0..=119).contains(&marker_i) {
return Some((marker as usize, 1));
}
if !(120..=123).contains(&marker) {
return None; }
let byte_len = (marker - 119) as usize; let total_len = byte_len + 1;
if buf.len() < total_len {
return None;
}
let val_start = buf.len() - total_len;
let mut abs_val: u64 = 0;
for i in 0..byte_len {
abs_val = (abs_val << 8) | (buf[val_start + i] as u64);
}
let key_size = abs_val as usize + 119;
Some((key_size, total_len))
}
pub fn combine(key: &[u8], data: &[u8]) -> Vec<u8> {
let size_len = packed_int_len(key.len());
let total = key.len() + data.len() + size_len;
let mut buf = Vec::with_capacity(total);
buf.extend_from_slice(key);
buf.extend_from_slice(data);
buf.resize(total, 0);
write_packed_int_at(&mut buf, key.len() + data.len(), key.len());
buf
}
pub fn split(two_part_key: &[u8]) -> Option<(Vec<u8>, Vec<u8>)> {
let (key_size, size_len) = read_packed_int_from_end(two_part_key)?;
let data_end = two_part_key.len().checked_sub(size_len)?;
if key_size > data_end {
return None;
}
let key = two_part_key[..key_size].to_vec();
let data = two_part_key[key_size..data_end].to_vec();
Some((key, data))
}
pub fn get_key(two_part_key: &[u8]) -> Option<Vec<u8>> {
let (key_size, size_len) = read_packed_int_from_end(two_part_key)?;
let data_end = two_part_key.len().checked_sub(size_len)?;
if key_size > data_end {
return None;
}
Some(two_part_key[..key_size].to_vec())
}
pub fn lower_bound(key: &[u8]) -> Vec<u8> {
combine(key, b"")
}
pub fn matches_key(two_part_key: &[u8], primary_key: &[u8]) -> bool {
get_key(two_part_key).map(|k| k == primary_key).unwrap_or(false)
}
pub fn cmp_two_part_keys<K, D>(
a: &[u8],
b: &[u8],
key_cmp: K,
data_cmp: D,
) -> std::cmp::Ordering
where
K: Fn(&[u8], &[u8]) -> std::cmp::Ordering,
D: Fn(&[u8], &[u8]) -> std::cmp::Ordering,
{
let (a_key, a_data) = match split(a) {
Some(kd) => kd,
None => return std::cmp::Ordering::Equal,
};
let (b_key, b_data) = match split(b) {
Some(kd) => kd,
None => return std::cmp::Ordering::Equal,
};
match key_cmp(&a_key, &b_key) {
std::cmp::Ordering::Equal => data_cmp(&a_data, &b_data),
ord => ord,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_combine_split_round_trip() {
let key = b"hello";
let data = b"world";
let combined = combine(key, data);
let (k, d) = split(&combined).unwrap();
assert_eq!(k, key);
assert_eq!(d, data);
}
#[test]
fn test_combine_empty_data() {
let key = b"abc";
let combined = combine(key, b"");
let (k, d) = split(&combined).unwrap();
assert_eq!(k, key);
assert_eq!(d, b"");
}
#[test]
fn test_combine_empty_key() {
let combined = combine(b"", b"data");
let (k, d) = split(&combined).unwrap();
assert_eq!(k, b"");
assert_eq!(d, b"data");
}
#[test]
fn test_lower_bound() {
let lb = lower_bound(b"abc");
let (k, d) = split(&lb).unwrap();
assert_eq!(k, b"abc");
assert_eq!(d, b"");
}
#[test]
fn test_matches_key() {
let two_part = combine(b"abc", b"xyz");
assert!(matches_key(&two_part, b"abc"));
assert!(!matches_key(&two_part, b"ab"));
assert!(!matches_key(&two_part, b"abcd"));
}
#[test]
fn test_cmp_two_part_keys_different_keys() {
let a = combine(b"aaa", b"xyz");
let b = combine(b"bbb", b"abc");
let cmp = cmp_two_part_keys(&a, &b, |x, y| x.cmp(y), |x, y| x.cmp(y));
assert_eq!(cmp, std::cmp::Ordering::Less);
}
#[test]
fn test_cmp_two_part_keys_same_key_diff_data() {
let a = combine(b"key", b"aaa");
let b = combine(b"key", b"bbb");
let cmp = cmp_two_part_keys(&a, &b, |x, y| x.cmp(y), |x, y| x.cmp(y));
assert_eq!(cmp, std::cmp::Ordering::Less);
}
#[test]
fn test_cmp_two_part_keys_equal() {
let a = combine(b"key", b"data");
let b = combine(b"key", b"data");
let cmp = cmp_two_part_keys(&a, &b, |x, y| x.cmp(y), |x, y| x.cmp(y));
assert_eq!(cmp, std::cmp::Ordering::Equal);
}
#[test]
fn test_cmp_correctness_prefix_ambiguity() {
let a = combine(b"a", b"bc");
let b = combine(b"ab", b"c");
let cmp = cmp_two_part_keys(&a, &b, |x, y| x.cmp(y), |x, y| x.cmp(y));
assert_eq!(cmp, std::cmp::Ordering::Less);
}
#[test]
fn test_large_key_round_trip() {
let key = vec![b'k'; 200];
let data = b"data";
let combined = combine(&key, data);
let (k, d) = split(&combined).unwrap();
assert_eq!(k, key);
assert_eq!(d, data);
}
}