use std::cmp::Ordering;
use std::sync::Arc;
use crate::{InternalKey, InternalKeyKind, INTERNAL_KEY_SEQ_NUM_MAX, INTERNAL_KEY_TIMESTAMP_MAX};
pub trait Comparator: Send + Sync {
fn compare(&self, a: &[u8], b: &[u8]) -> Ordering;
fn separator(&self, from: &[u8], to: &[u8]) -> Vec<u8>;
fn successor(&self, key: &[u8]) -> Vec<u8>;
fn name(&self) -> &str;
}
#[derive(Default, Clone, Copy)]
pub struct BytewiseComparator {}
impl BytewiseComparator {}
impl Comparator for BytewiseComparator {
#[inline]
fn compare(&self, a: &[u8], b: &[u8]) -> Ordering {
a.cmp(b)
}
#[inline]
fn name(&self) -> &'static str {
"leveldb.BytewiseComparator"
}
#[inline]
fn separator(&self, a: &[u8], b: &[u8]) -> Vec<u8> {
let min_length = std::cmp::min(a.len(), b.len());
let mut diff_index = 0;
while diff_index < min_length && a[diff_index] == b[diff_index] {
diff_index += 1;
}
if diff_index >= min_length {
return a.to_vec();
}
let start_byte = a[diff_index];
let limit_byte = b[diff_index];
if start_byte >= limit_byte {
return a.to_vec();
}
if diff_index < b.len() - 1 || start_byte + 1 < limit_byte {
let mut result = Vec::from(&a[..=diff_index]);
result[diff_index] += 1;
debug_assert!(self.compare(&result, b) == Ordering::Less);
return result;
}
diff_index += 1;
while diff_index < a.len() {
if a[diff_index] < 0xff {
let mut result = Vec::from(&a[..=diff_index]);
result[diff_index] += 1;
debug_assert!(self.compare(&result, b) == Ordering::Less);
return result;
}
diff_index += 1;
}
a.to_vec()
}
#[inline]
fn successor(&self, key: &[u8]) -> Vec<u8> {
let mut result = key.to_vec();
for i in 0..key.len() {
if key[i] != 0xff {
result[i] += 1;
result.resize(i + 1, 0);
return result;
}
}
result
}
}
#[derive(Clone)]
pub struct InternalKeyComparator {
user_comparator: Arc<dyn Comparator>,
}
impl InternalKeyComparator {
pub fn new(user_comparator: Arc<dyn Comparator>) -> Self {
Self {
user_comparator,
}
}
}
impl Comparator for InternalKeyComparator {
fn name(&self) -> &'static str {
"surrealkv.InternalKeyComparator"
}
fn compare(&self, a: &[u8], b: &[u8]) -> Ordering {
let user_key_a = InternalKey::user_key_from_encoded(a);
let user_key_b = InternalKey::user_key_from_encoded(b);
match self.user_comparator.compare(user_key_a, user_key_b) {
Ordering::Equal => {
let seq_a = InternalKey::seq_num_from_encoded(a);
let seq_b = InternalKey::seq_num_from_encoded(b);
seq_b.cmp(&seq_a)
}
ord => ord,
}
}
fn separator(&self, a: &[u8], b: &[u8]) -> Vec<u8> {
if a == b {
return a.to_vec();
}
let key_a = InternalKey::decode(a);
let key_b = InternalKey::decode(b);
if self.user_comparator.compare(key_a.user_key.as_ref(), key_b.user_key.as_ref())
!= Ordering::Equal
{
let sep =
self.user_comparator.separator(key_a.user_key.as_ref(), key_b.user_key.as_ref());
if sep.len() <= key_a.user_key.len()
&& self.user_comparator.compare(key_a.user_key.as_ref(), &sep) == Ordering::Less
{
let result = InternalKey::new(
sep,
INTERNAL_KEY_SEQ_NUM_MAX,
InternalKeyKind::Separator,
INTERNAL_KEY_TIMESTAMP_MAX,
);
return result.encode();
}
}
a.to_vec()
}
fn successor(&self, key: &[u8]) -> Vec<u8> {
let internal_key = InternalKey::decode(key);
let user_key_succ = self.user_comparator.successor(internal_key.user_key.as_ref());
if user_key_succ.len() <= internal_key.user_key.len()
&& self.user_comparator.compare(internal_key.user_key.as_ref(), &user_key_succ)
== Ordering::Less
{
let result = InternalKey::new(
user_key_succ,
INTERNAL_KEY_SEQ_NUM_MAX,
InternalKeyKind::Separator,
INTERNAL_KEY_TIMESTAMP_MAX,
);
return result.encode();
}
key.to_vec()
}
}
#[derive(Clone)]
pub struct TimestampComparator {
user_comparator: Arc<dyn Comparator>,
}
impl TimestampComparator {
pub fn new(user_comparator: Arc<dyn Comparator>) -> Self {
Self {
user_comparator,
}
}
}
impl Comparator for TimestampComparator {
fn name(&self) -> &'static str {
"surrealkv.TimestampComparator"
}
fn compare(&self, a: &[u8], b: &[u8]) -> Ordering {
let key_a = InternalKey::decode(a);
let key_b = InternalKey::decode(b);
key_a.cmp_by_timestamp(&key_b)
}
fn separator(&self, a: &[u8], b: &[u8]) -> Vec<u8> {
if a == b {
return a.to_vec();
}
let key_a = InternalKey::decode(a);
let key_b = InternalKey::decode(b);
if self.user_comparator.compare(key_a.user_key.as_ref(), key_b.user_key.as_ref())
!= Ordering::Equal
{
let sep =
self.user_comparator.separator(key_a.user_key.as_ref(), key_b.user_key.as_ref());
if sep.len() <= key_a.user_key.len()
&& self.user_comparator.compare(key_a.user_key.as_ref(), &sep) == Ordering::Less
{
let result = InternalKey::new(
sep,
INTERNAL_KEY_SEQ_NUM_MAX,
InternalKeyKind::Separator,
INTERNAL_KEY_TIMESTAMP_MAX,
);
return result.encode();
}
}
a.to_vec()
}
fn successor(&self, key: &[u8]) -> Vec<u8> {
let internal_key = InternalKey::decode(key);
let user_key_succ = self.user_comparator.successor(internal_key.user_key.as_ref());
if user_key_succ.len() <= internal_key.user_key.len()
&& self.user_comparator.compare(internal_key.user_key.as_ref(), &user_key_succ)
== Ordering::Less
{
let result = InternalKey::new(
user_key_succ,
INTERNAL_KEY_SEQ_NUM_MAX,
InternalKeyKind::Separator,
INTERNAL_KEY_TIMESTAMP_MAX,
);
return result.encode();
}
key.to_vec()
}
}
#[cfg(test)]
mod tests {
use rand::{Rng, SeedableRng};
use super::*;
#[test]
fn test_find_shortest_separator_basic() {
let cmp = BytewiseComparator::default();
let s1 = b"abc1xyz";
let s2 = b"abc3xy";
let sep = cmp.separator(s1, s2);
assert_eq!(sep, b"abc2");
let sep = cmp.separator(b"abcd", b"abcf");
assert_eq!(sep, b"abce");
}
#[test]
fn test_separator_equal_strings() {
let cmp = BytewiseComparator::default();
let s = b"abcdef";
let sep = cmp.separator(s, s);
assert_eq!(sep, s.to_vec());
}
#[test]
fn test_separator_prefix_cases() {
let cmp = BytewiseComparator::default();
let sep = cmp.separator(b"abc", b"abcdef");
assert_eq!(sep, b"abc".to_vec());
let sep = cmp.separator(b"abcdef", b"abc");
assert_eq!(sep, b"abcdef".to_vec());
}
#[test]
fn test_separator_start_greater_than_limit() {
let cmp = BytewiseComparator::default();
let sep = cmp.separator(b"abd", b"abc");
assert_eq!(sep, b"abd".to_vec());
}
#[test]
fn test_separator_adjacent_bytes() {
let cmp = BytewiseComparator::default();
let sep = cmp.separator(b"abc", b"abd");
assert_eq!(sep, b"abc".to_vec());
let sep = cmp.separator(b"abc", b"abe");
assert_eq!(sep, b"abd");
}
#[test]
fn test_separator_scan_forward() {
let cmp = BytewiseComparator::default();
let sep = cmp.separator(b"AA1AAA", b"AA2");
assert_eq!(sep, b"AA1B");
}
#[test]
fn test_separator_all_0xff_suffix() {
let cmp = BytewiseComparator::default();
let start = &[b'A', b'A', 0x01, 0xff, 0xff, 0xff];
let limit = &[b'A', b'A', 0x02];
let sep = cmp.separator(start, limit);
assert_eq!(sep, start.to_vec());
}
#[test]
fn test_separator_empty_strings() {
let cmp = BytewiseComparator::default();
let sep = cmp.separator(b"", b"");
assert_eq!(sep, b"".to_vec());
let sep = cmp.separator(b"", b"abc");
assert_eq!(sep, b"".to_vec());
}
#[test]
fn test_separator_wide_gap() {
let cmp = BytewiseComparator::default();
let sep = cmp.separator(b"abc", b"zzz");
assert_eq!(sep, b"b");
}
#[test]
fn test_find_short_successor_basic() {
let cmp = BytewiseComparator::default();
assert_eq!(cmp.successor(b"abcd"), b"b");
assert_eq!(cmp.successor(b"zzzz"), b"{"); }
#[test]
fn test_successor_all_0xff() {
let cmp = BytewiseComparator::default();
let key = vec![0xff, 0xff, 0xff];
let succ = cmp.successor(&key);
assert_eq!(succ, key);
}
#[test]
fn test_successor_empty() {
let cmp = BytewiseComparator::default();
let succ = cmp.successor(b"");
assert_eq!(succ, b"".to_vec());
}
#[test]
fn test_successor_with_0xff_prefix() {
let cmp = BytewiseComparator::default();
let key = vec![0xff, 0xff, b'a'];
let succ = cmp.successor(&key);
assert_eq!(succ, vec![0xff, 0xff, b'b']);
}
#[test]
fn test_successor_single_byte() {
let cmp = BytewiseComparator::default();
assert_eq!(cmp.successor(b"a"), b"b");
assert_eq!(cmp.successor(&[0x00]), vec![0x01]);
assert_eq!(cmp.successor(&[0xfe]), vec![0xff]);
assert_eq!(cmp.successor(&[0xff]), vec![0xff]); }
#[test]
fn test_separator_successor_randomized() {
let cmp = BytewiseComparator::default();
let char_list: [u8; 6] = [0, 1, 2, 253, 254, 255];
let mut rng = rand::rngs::StdRng::seed_from_u64(301);
for _ in 0..1000 {
let size1 = rng.random_range(0..16);
let size2 = if rng.random_bool(0.5) {
rng.random_range(0..16)
} else {
let diff = rng.random_range(0..5) - 2;
let tmp = size1 as i32 + diff;
tmp.max(0) as usize
};
let mut s1 = Vec::with_capacity(size1);
for _ in 0..size1 {
if rng.random_bool(0.5) {
s1.push(rng.random::<u8>());
} else {
s1.push(char_list[rng.random_range(0..char_list.len())]);
}
}
let mut s2 = s1.clone();
s2.resize(size2, 0);
if !s2.is_empty() {
let mut pos = s2.len() - 1;
loop {
if pos >= s1.len() || rng.random_ratio(1, 4) {
s2[pos] = rng.random::<u8>();
} else if rng.random_ratio(1, 4) {
break;
} else {
let diff = rng.random_range(0..5) - 2;
let s1_char = s1[pos] as i32;
let s2_char = (s1_char + diff).clamp(0, 255) as u8;
s2[pos] = s2_char;
}
if pos == 0 {
break;
}
pos -= 1;
}
}
for rev in 0..2 {
if rev == 1 {
std::mem::swap(&mut s1, &mut s2);
}
let separator = cmp.separator(&s1, &s2);
if s1 == s2 {
assert_eq!(s1, separator, "Equal strings should return unchanged");
} else if s1 < s2 {
assert!(
s1 <= separator,
"Separator must be >= start. s1={:?}, sep={:?}",
s1,
separator
);
assert!(
separator < s2,
"Separator must be < limit. sep={:?}, s2={:?}",
separator,
s2
);
assert!(
separator.len() <= s1.len().max(s2.len()),
"Separator should not be longer than max(s1, s2)"
);
} else {
assert_eq!(s1, separator, "When s1 > s2, separator should equal s1");
}
}
let succ = cmp.successor(&s1);
assert!(succ >= s1, "Successor must be >= original. s1={:?}, succ={:?}", s1, succ);
}
}
#[test]
fn test_separator_binary_data() {
let cmp = BytewiseComparator::default();
let s1 = &[0x00, 0x01, 0x02];
let s2 = &[0x00, 0x01, 0x05];
let sep = cmp.separator(s1, s2);
assert_eq!(sep, vec![0x00, 0x01, 0x03]);
let s1 = &[0x00, 0x00, 0x00];
let s2 = &[0x00, 0x00, 0x10];
let sep = cmp.separator(s1, s2);
assert_eq!(sep, vec![0x00, 0x00, 0x01]);
}
#[test]
fn test_separator_long_common_prefix() {
let cmp = BytewiseComparator::default();
let s1 = b"aaaaaaaaaaaaaaaaaaaab";
let s2 = b"aaaaaaaaaaaaaaaaaaaad";
let sep = cmp.separator(s1, s2);
assert_eq!(sep, b"aaaaaaaaaaaaaaaaaaaac");
}
#[test]
fn test_compare_basic() {
let cmp = BytewiseComparator::default();
assert_eq!(cmp.compare(b"abc", b"abc"), Ordering::Equal);
assert_eq!(cmp.compare(b"abc", b"abd"), Ordering::Less);
assert_eq!(cmp.compare(b"abd", b"abc"), Ordering::Greater);
assert_eq!(cmp.compare(b"abc", b"abcd"), Ordering::Less);
assert_eq!(cmp.compare(b"abcd", b"abc"), Ordering::Greater);
assert_eq!(cmp.compare(b"", b""), Ordering::Equal);
assert_eq!(cmp.compare(b"", b"a"), Ordering::Less);
}
#[test]
fn test_comparator_name() {
let cmp = BytewiseComparator::default();
assert_eq!(cmp.name(), "leveldb.BytewiseComparator");
}
fn ikey(user_key: &[u8], seq: u64, kind: InternalKeyKind) -> Vec<u8> {
InternalKey::new(user_key.to_vec(), seq, kind, 0).encode()
}
fn ikey_max_seq(user_key: &[u8]) -> Vec<u8> {
InternalKey::new(
user_key.to_vec(),
INTERNAL_KEY_SEQ_NUM_MAX,
InternalKeyKind::Separator,
INTERNAL_KEY_TIMESTAMP_MAX,
)
.encode()
}
#[test]
fn test_internal_key_short_separator_same_user_key() {
let cmp = InternalKeyComparator::new(Arc::new(BytewiseComparator::default()));
assert_eq!(
ikey(b"foo", 100, InternalKeyKind::Set),
cmp.separator(
&ikey(b"foo", 100, InternalKeyKind::Set),
&ikey(b"foo", 99, InternalKeyKind::Set)
)
);
assert_eq!(
ikey(b"foo", 100, InternalKeyKind::Set),
cmp.separator(
&ikey(b"foo", 100, InternalKeyKind::Set),
&ikey(b"foo", 101, InternalKeyKind::Set)
)
);
assert_eq!(
ikey(b"foo", 100, InternalKeyKind::Set),
cmp.separator(
&ikey(b"foo", 100, InternalKeyKind::Set),
&ikey(b"foo", 100, InternalKeyKind::Set)
)
);
assert_eq!(
ikey(b"foo", 100, InternalKeyKind::Set),
cmp.separator(
&ikey(b"foo", 100, InternalKeyKind::Set),
&ikey(b"foo", 100, InternalKeyKind::Delete)
)
);
}
#[test]
fn test_internal_key_short_separator_misordered() {
let cmp = InternalKeyComparator::new(Arc::new(BytewiseComparator::default()));
assert_eq!(
ikey(b"foo", 100, InternalKeyKind::Set),
cmp.separator(
&ikey(b"foo", 100, InternalKeyKind::Set),
&ikey(b"bar", 99, InternalKeyKind::Set)
)
);
}
#[test]
fn test_internal_key_short_separator_different_user_keys() {
let cmp = InternalKeyComparator::new(Arc::new(BytewiseComparator::default()));
let result = cmp.separator(
&ikey(b"foo", 100, InternalKeyKind::Set),
&ikey(b"hello", 200, InternalKeyKind::Set),
);
assert_eq!(result, ikey_max_seq(b"g"));
let result = cmp.separator(
&ikey(b"ABC1AAAAA", 100, InternalKeyKind::Set),
&ikey(b"ABC2ABB", 200, InternalKeyKind::Set),
);
assert_eq!(result, ikey_max_seq(b"ABC2"));
let result = cmp.separator(
&ikey(b"AAA1AAA", 100, InternalKeyKind::Set),
&ikey(b"AAA2AA", 200, InternalKeyKind::Set),
);
assert_eq!(result, ikey_max_seq(b"AAA2"));
let result = cmp.separator(
&ikey(b"AAA1AAA", 100, InternalKeyKind::Set),
&ikey(b"AAA4", 200, InternalKeyKind::Set),
);
assert_eq!(result, ikey_max_seq(b"AAA2"));
let result = cmp.separator(
&ikey(b"AAA1AAA", 100, InternalKeyKind::Set),
&ikey(b"AAA2", 200, InternalKeyKind::Set),
);
assert_eq!(result, ikey_max_seq(b"AAA1B"));
let result = cmp.separator(
&ikey(b"AAA1AAA", 100, InternalKeyKind::Set),
&ikey(b"AAA2A", 200, InternalKeyKind::Set),
);
assert_eq!(result, ikey_max_seq(b"AAA2"));
}
#[test]
fn test_internal_key_short_separator_adjacent() {
let cmp = InternalKeyComparator::new(Arc::new(BytewiseComparator::default()));
let result = cmp.separator(
&ikey(b"AAA1", 100, InternalKeyKind::Set),
&ikey(b"AAA2", 200, InternalKeyKind::Set),
);
assert_eq!(result, ikey(b"AAA1", 100, InternalKeyKind::Set));
}
#[test]
fn test_internal_key_short_separator_prefix_cases() {
let cmp = InternalKeyComparator::new(Arc::new(BytewiseComparator::default()));
assert_eq!(
ikey(b"foo", 100, InternalKeyKind::Set),
cmp.separator(
&ikey(b"foo", 100, InternalKeyKind::Set),
&ikey(b"foobar", 200, InternalKeyKind::Set)
)
);
assert_eq!(
ikey(b"foobar", 100, InternalKeyKind::Set),
cmp.separator(
&ikey(b"foobar", 100, InternalKeyKind::Set),
&ikey(b"foo", 200, InternalKeyKind::Set)
)
);
}
#[test]
fn test_internal_key_shortest_successor() {
let cmp = InternalKeyComparator::new(Arc::new(BytewiseComparator::default()));
let result = cmp.successor(&ikey(b"foo", 100, InternalKeyKind::Set));
assert_eq!(result, ikey_max_seq(b"g"));
}
#[test]
fn test_internal_key_shortest_successor_all_0xff() {
let cmp = InternalKeyComparator::new(Arc::new(BytewiseComparator::default()));
let input = ikey(&[0xff, 0xff], 100, InternalKeyKind::Set);
let result = cmp.successor(&input);
assert_eq!(result, input);
}
#[test]
fn test_internal_key_separator_adjacent_user_keys() {
let cmp = InternalKeyComparator::new(Arc::new(BytewiseComparator::default()));
let result = cmp.separator(
&ikey(b"a", 100, InternalKeyKind::Set),
&ikey(b"b", 200, InternalKeyKind::Set),
);
assert_eq!(result, ikey(b"a", 100, InternalKeyKind::Set));
let result = cmp.separator(
&ikey(b"aaa", 100, InternalKeyKind::Set),
&ikey(b"aab", 200, InternalKeyKind::Set),
);
assert_eq!(result, ikey(b"aaa", 100, InternalKeyKind::Set));
}
#[test]
fn test_internal_key_separator_with_max_seq() {
let cmp = InternalKeyComparator::new(Arc::new(BytewiseComparator::default()));
let result = cmp.separator(
&ikey(b"apple", 100, InternalKeyKind::Set),
&ikey(b"banana", 200, InternalKeyKind::Set),
);
assert_eq!(result, ikey_max_seq(b"b"));
let sep_ikey = InternalKey::decode(&result);
assert_eq!(sep_ikey.seq_num(), INTERNAL_KEY_SEQ_NUM_MAX);
assert!(
cmp.compare(&ikey(b"apple", 100, InternalKeyKind::Set), &result) != Ordering::Greater
);
assert!(
cmp.compare(&result, &ikey(b"banana", 200, InternalKeyKind::Set)) == Ordering::Less
);
}
#[test]
fn test_internal_key_ordering_with_seq_nums() {
let cmp = InternalKeyComparator::new(Arc::new(BytewiseComparator::default()));
let key_100 = ikey(b"foo", 100, InternalKeyKind::Set);
let key_50 = ikey(b"foo", 50, InternalKeyKind::Set);
assert_eq!(
cmp.compare(&key_100, &key_50),
Ordering::Less,
"(foo, 100) should be < (foo, 50) due to descending seq_num"
);
let key_max = InternalKey::new(
b"foo".to_vec(),
INTERNAL_KEY_SEQ_NUM_MAX,
InternalKeyKind::Separator,
INTERNAL_KEY_TIMESTAMP_MAX,
)
.encode();
assert_eq!(
cmp.compare(&key_max, &key_100),
Ordering::Less,
"(foo, MAX) should be < (foo, 100)"
);
assert_eq!(
cmp.compare(&key_max, &key_50),
Ordering::Less,
"(foo, MAX) should be < (foo, 50)"
);
}
}