use crate::{SeqNo, UserKey, comparator::UserComparator};
use std::cmp::Reverse;
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct RangeTombstone {
pub start: UserKey,
pub end: UserKey,
pub seqno: SeqNo,
}
impl RangeTombstone {
#[must_use]
pub fn new(start: UserKey, end: UserKey, seqno: SeqNo) -> Self {
Self { start, end, seqno }
}
#[must_use]
pub fn contains_key(&self, key: &[u8]) -> bool {
self.contains_key_with(key, &crate::comparator::DefaultUserComparator)
}
#[must_use]
pub fn contains_key_with(&self, key: &[u8], comparator: &dyn UserComparator) -> bool {
comparator.compare(&self.start, key) != std::cmp::Ordering::Greater
&& comparator.compare(key, &self.end) == std::cmp::Ordering::Less
}
#[must_use]
pub fn visible_at(&self, read_seqno: SeqNo) -> bool {
self.seqno < read_seqno
}
#[must_use]
pub fn should_suppress(&self, key: &[u8], kv_seqno: SeqNo, read_seqno: SeqNo) -> bool {
self.should_suppress_with(
key,
kv_seqno,
read_seqno,
&crate::comparator::DefaultUserComparator,
)
}
#[must_use]
pub fn should_suppress_with(
&self,
key: &[u8],
kv_seqno: SeqNo,
read_seqno: SeqNo,
comparator: &dyn UserComparator,
) -> bool {
self.visible_at(read_seqno)
&& self.contains_key_with(key, comparator)
&& kv_seqno < self.seqno
}
#[must_use]
pub fn intersect_opt(&self, min: &[u8], max: &[u8]) -> Option<Self> {
self.intersect_opt_with(min, max, &crate::comparator::DefaultUserComparator)
}
#[must_use]
pub fn intersect_opt_with(
&self,
min: &[u8],
max: &[u8],
comparator: &dyn UserComparator,
) -> Option<Self> {
let new_start_ref = if comparator.compare(&self.start, min) == std::cmp::Ordering::Greater {
self.start.as_ref()
} else {
min
};
let new_end_ref = if comparator.compare(&self.end, max) == std::cmp::Ordering::Less {
self.end.as_ref()
} else {
max
};
if comparator.compare(new_start_ref, new_end_ref) == std::cmp::Ordering::Less {
Some(Self {
start: UserKey::from(new_start_ref),
end: UserKey::from(new_end_ref),
seqno: self.seqno,
})
} else {
None
}
}
#[must_use]
pub fn fully_covers(&self, min: &[u8], max: &[u8]) -> bool {
self.fully_covers_with(min, max, &crate::comparator::DefaultUserComparator)
}
#[must_use]
pub fn fully_covers_with(
&self,
min: &[u8],
max: &[u8],
comparator: &dyn UserComparator,
) -> bool {
comparator.compare(&self.start, min) != std::cmp::Ordering::Greater
&& comparator.compare(max, &self.end) == std::cmp::Ordering::Less
}
#[must_use]
pub fn cmp_with_comparator(
&self,
other: &Self,
comparator: &dyn UserComparator,
) -> std::cmp::Ordering {
comparator
.compare(&self.start, &other.start)
.then_with(|| other.seqno.cmp(&self.seqno))
.then_with(|| comparator.compare(&self.end, &other.end))
}
}
impl Ord for RangeTombstone {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
(&self.start, Reverse(self.seqno), &self.end).cmp(&(
&other.start,
Reverse(other.seqno),
&other.end,
))
}
}
impl PartialOrd for RangeTombstone {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
#[derive(Clone, Debug)]
pub struct CoveringRt {
pub start: UserKey,
pub end: UserKey,
pub seqno: SeqNo,
}
impl CoveringRt {
#[must_use]
#[cfg_attr(
not(test),
expect(dead_code, reason = "wired up in table-skip optimization")
)]
pub fn covers_table(&self, table_min: &[u8], table_max: &[u8], table_max_seqno: SeqNo) -> bool {
self.covers_table_with(
table_min,
table_max,
table_max_seqno,
&crate::comparator::DefaultUserComparator,
)
}
pub fn covers_table_with(
&self,
table_min: &[u8],
table_max: &[u8],
table_max_seqno: SeqNo,
comparator: &dyn UserComparator,
) -> bool {
comparator.compare(&self.start, table_min) != std::cmp::Ordering::Greater
&& comparator.compare(table_max, &self.end) == std::cmp::Ordering::Less
&& self.seqno > table_max_seqno
}
}
impl From<&RangeTombstone> for CoveringRt {
fn from(rt: &RangeTombstone) -> Self {
Self {
start: rt.start.clone(),
end: rt.end.clone(),
seqno: rt.seqno,
}
}
}
#[must_use]
pub fn upper_bound_exclusive(key: &[u8]) -> Option<UserKey> {
if key.len() < usize::from(u16::MAX) {
let mut result = Vec::with_capacity(key.len() + 1);
result.extend_from_slice(key);
result.push(0x00);
return Some(UserKey::from(result));
}
let mut result = key.to_vec();
for (idx, byte) in result.iter_mut().enumerate().rev() {
if *byte < 0xFF {
*byte += 1;
result.truncate(idx + 1);
return Some(UserKey::from(result));
}
}
None
}
#[cfg(test)]
#[expect(
clippy::unwrap_used,
clippy::expect_used,
reason = "tests use unwrap/expect on controlled fixtures for brevity"
)]
mod tests {
use super::*;
fn rt(start: &[u8], end: &[u8], seqno: SeqNo) -> RangeTombstone {
RangeTombstone::new(UserKey::from(start), UserKey::from(end), seqno)
}
#[test]
fn contains_key_inclusive_start() {
let t = rt(b"b", b"d", 10);
assert!(t.contains_key(b"b"));
}
#[test]
fn contains_key_exclusive_end() {
let t = rt(b"b", b"d", 10);
assert!(!t.contains_key(b"d"));
}
#[test]
fn contains_key_middle() {
let t = rt(b"b", b"d", 10);
assert!(t.contains_key(b"c"));
}
#[test]
fn contains_key_before_start() {
let t = rt(b"b", b"d", 10);
assert!(!t.contains_key(b"a"));
}
#[test]
fn not_visible_at_equal() {
let t = rt(b"a", b"z", 10);
assert!(!t.visible_at(10));
}
#[test]
fn visible_at_higher() {
let t = rt(b"a", b"z", 10);
assert!(t.visible_at(20));
}
#[test]
fn not_visible_at_lower() {
let t = rt(b"a", b"z", 10);
assert!(!t.visible_at(9));
}
#[test]
fn should_suppress_yes() {
let t = rt(b"b", b"d", 10);
assert!(t.should_suppress(b"c", 5, 11));
}
#[test]
fn should_suppress_no_at_equal_seqno() {
let t = rt(b"b", b"d", 10);
assert!(!t.should_suppress(b"c", 5, 10));
}
#[test]
fn should_suppress_no_newer_kv() {
let t = rt(b"b", b"d", 10);
assert!(!t.should_suppress(b"c", 15, 20));
}
#[test]
fn should_suppress_no_not_visible() {
let t = rt(b"b", b"d", 10);
assert!(!t.should_suppress(b"c", 5, 9));
}
#[test]
fn should_suppress_no_outside_range() {
let t = rt(b"b", b"d", 10);
assert!(!t.should_suppress(b"e", 5, 11));
}
#[test]
fn ordering_by_start_asc() {
let a = rt(b"a", b"z", 10);
let b = rt(b"b", b"z", 10);
assert!(a < b);
}
#[test]
fn ordering_by_seqno_desc() {
let a = rt(b"a", b"z", 20);
let b = rt(b"a", b"z", 10);
assert!(a < b); }
#[test]
fn ordering_by_end_asc_tiebreaker() {
let a = rt(b"a", b"m", 10);
let b = rt(b"a", b"z", 10);
assert!(a < b);
}
#[test]
fn intersect_overlap() {
let t = rt(b"b", b"y", 10);
let clipped = t.intersect_opt(b"d", b"g").unwrap();
assert_eq!(clipped.start.as_ref(), b"d");
assert_eq!(clipped.end.as_ref(), b"g");
assert_eq!(clipped.seqno, 10);
}
#[test]
fn intersect_no_overlap() {
let t = rt(b"b", b"d", 10);
assert!(t.intersect_opt(b"e", b"g").is_none());
}
#[test]
fn intersect_partial_left() {
let t = rt(b"b", b"f", 10);
let clipped = t.intersect_opt(b"a", b"d").unwrap();
assert_eq!(clipped.start.as_ref(), b"b");
assert_eq!(clipped.end.as_ref(), b"d");
}
#[test]
fn intersect_partial_right() {
let t = rt(b"b", b"f", 10);
let clipped = t.intersect_opt(b"d", b"z").unwrap();
assert_eq!(clipped.start.as_ref(), b"d");
assert_eq!(clipped.end.as_ref(), b"f");
}
#[test]
fn fully_covers_yes() {
let t = rt(b"a", b"z", 10);
assert!(t.fully_covers(b"b", b"y"));
}
#[test]
fn fully_covers_exact_start() {
let t = rt(b"a", b"z", 10);
assert!(t.fully_covers(b"a", b"y"));
}
#[test]
fn fully_covers_no_end_equal() {
let t = rt(b"a", b"z", 10);
assert!(!t.fully_covers(b"a", b"z"));
}
#[test]
fn fully_covers_no_start_before() {
let t = rt(b"b", b"z", 10);
assert!(!t.fully_covers(b"a", b"y"));
}
#[test]
fn covering_rt_covers_table() {
let crt = CoveringRt {
start: UserKey::from(b"a" as &[u8]),
end: UserKey::from(b"z" as &[u8]),
seqno: 100,
};
assert!(crt.covers_table(b"b", b"y", 50));
}
#[test]
fn covering_rt_no_cover_seqno_too_low() {
let crt = CoveringRt {
start: UserKey::from(b"a" as &[u8]),
end: UserKey::from(b"z" as &[u8]),
seqno: 50,
};
assert!(!crt.covers_table(b"b", b"y", 100));
}
#[test]
fn upper_bound_exclusive_appends_zero() {
let key = b"hello";
let result = upper_bound_exclusive(key).unwrap();
assert_eq!(result.as_ref(), b"hello\x00");
}
#[test]
fn upper_bound_exclusive_max_length_non_max_key_has_successor() {
let key = vec![0xAA; usize::from(u16::MAX)];
let successor = upper_bound_exclusive(&key).expect("non-max key should have successor");
assert!(key.as_slice() < successor.as_ref());
assert!(u16::try_from(successor.len()).is_ok());
}
#[test]
fn upper_bound_exclusive_true_max_returns_none() {
let key = vec![0xFF; usize::from(u16::MAX)];
assert!(upper_bound_exclusive(&key).is_none());
}
}