use crate::types::SequenceNumber;
pub(crate) struct RangeTombstone {
pub begin: Vec<u8>,
pub end: Vec<u8>,
pub seq: SequenceNumber,
}
pub(crate) struct RangeTombstoneTracker {
tombstones: Vec<RangeTombstone>,
sorted: bool,
next_idx: usize,
active: Vec<usize>,
}
impl RangeTombstoneTracker {
pub fn new() -> Self {
Self {
tombstones: Vec::new(),
sorted: false,
next_idx: 0,
active: Vec::new(),
}
}
pub fn add(&mut self, begin: Vec<u8>, end: Vec<u8>, seq: SequenceNumber) {
self.tombstones.push(RangeTombstone { begin, end, seq });
self.sorted = false;
}
pub fn reset(&mut self) {
if !self.sorted {
if self.tombstones.len() > 1 {
self.tombstones.sort_by(|a, b| a.begin.cmp(&b.begin));
}
self.sorted = true;
}
self.next_idx = 0;
self.active.clear();
}
pub fn is_deleted(
&mut self,
user_key: &[u8],
seq: SequenceNumber,
snapshot: SequenceNumber,
) -> bool {
if self.tombstones.len() <= 4 {
return self.linear_check(user_key, seq, snapshot);
}
if !self.sorted {
self.reset();
}
while self.next_idx < self.tombstones.len() {
if self.tombstones[self.next_idx].begin.as_slice() <= user_key {
self.active.push(self.next_idx);
self.next_idx += 1;
} else {
break;
}
}
let tombstones = &self.tombstones;
self.active
.retain(|&idx| tombstones[idx].end.as_slice() > user_key);
for &idx in &self.active {
let rt = &self.tombstones[idx];
if rt.seq <= snapshot && rt.seq > seq {
return true;
}
}
false
}
fn linear_check(&self, user_key: &[u8], seq: SequenceNumber, snapshot: SequenceNumber) -> bool {
for rt in &self.tombstones {
if rt.seq <= snapshot
&& user_key >= rt.begin.as_slice()
&& user_key < rt.end.as_slice()
&& rt.seq > seq
{
return true;
}
}
false
}
pub fn is_empty(&self) -> bool {
self.tombstones.is_empty()
}
}
struct TombstoneFragment {
begin: Vec<u8>,
end: Vec<u8>,
seq_levels: Vec<(SequenceNumber, usize)>,
}
pub(crate) struct FragmentedRangeTombstoneList {
fragments: Vec<TombstoneFragment>,
}
impl FragmentedRangeTombstoneList {
pub fn empty() -> Self {
Self {
fragments: Vec::new(),
}
}
pub fn new(raw: Vec<(Vec<u8>, Vec<u8>, SequenceNumber)>) -> Self {
let with_levels: Vec<_> = raw.into_iter().map(|(b, e, s)| (b, e, s, 0usize)).collect();
Self::new_with_levels(with_levels)
}
pub fn new_with_levels(mut raw: Vec<(Vec<u8>, Vec<u8>, SequenceNumber, usize)>) -> Self {
if raw.is_empty() {
return Self::empty();
}
let mut boundaries: Vec<Vec<u8>> = Vec::with_capacity(raw.len() * 2);
for (begin, end, _, _) in &raw {
boundaries.push(begin.clone());
boundaries.push(end.clone());
}
boundaries.sort();
boundaries.dedup();
raw.sort_by(|a, b| a.0.cmp(&b.0));
let mut fragments = Vec::new();
let mut tomb_idx = 0;
let mut active: Vec<usize> = Vec::new();
for w in boundaries.windows(2) {
let b_start = &w[0];
let b_end = &w[1];
while tomb_idx < raw.len() && raw[tomb_idx].0.as_slice() <= b_start.as_slice() {
active.push(tomb_idx);
tomb_idx += 1;
}
active.retain(|&idx| raw[idx].1.as_slice() > b_start.as_slice());
if active.is_empty() {
continue;
}
let mut seq_levels: Vec<(SequenceNumber, usize)> =
active.iter().map(|&idx| (raw[idx].2, raw[idx].3)).collect();
seq_levels.sort_unstable_by(|a, b| b.0.cmp(&a.0));
seq_levels.dedup_by_key(|sl| sl.0);
fragments.push(TombstoneFragment {
begin: b_start.clone(),
end: b_end.clone(),
seq_levels,
});
}
Self { fragments }
}
pub fn max_covering_tombstone_seq(
&self,
user_key: &[u8],
snapshot: SequenceNumber,
) -> SequenceNumber {
self.max_covering_tombstone_seq_for_level(user_key, snapshot, None)
}
pub fn max_covering_tombstone_seq_for_level(
&self,
user_key: &[u8],
snapshot: SequenceNumber,
source_level: Option<usize>,
) -> SequenceNumber {
if self.fragments.is_empty() {
return 0;
}
let idx = self
.fragments
.partition_point(|f| f.begin.as_slice() <= user_key);
if idx == 0 {
return 0;
}
let frag = &self.fragments[idx - 1];
if user_key >= frag.end.as_slice() {
return 0;
}
for &(seq, level) in &frag.seq_levels {
if seq > snapshot {
continue;
}
if let Some(src_lvl) = source_level
&& level >= src_lvl
{
continue;
}
return seq;
}
0
}
pub fn is_empty(&self) -> bool {
self.fragments.is_empty()
}
pub fn tombstones(&self) -> Vec<(Vec<u8>, Vec<u8>, SequenceNumber)> {
let mut result = Vec::new();
for frag in &self.fragments {
for &(seq, _level) in &frag.seq_levels {
result.push((frag.begin.clone(), frag.end.clone(), seq));
}
}
result
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_empty_tracker() {
let mut tracker = RangeTombstoneTracker::new();
assert!(!tracker.is_deleted(b"key", 1, 10));
}
#[test]
fn test_single_tombstone() {
let mut tracker = RangeTombstoneTracker::new();
tracker.add(b"aaa".to_vec(), b"zzz".to_vec(), 5);
tracker.reset();
assert!(tracker.is_deleted(b"bbb", 3, 10));
assert!(!tracker.is_deleted(b"bbb", 6, 10)); assert!(!tracker.is_deleted(b"000", 3, 10)); }
#[test]
fn test_same_seq_not_deleted() {
let mut tracker = RangeTombstoneTracker::new();
tracker.add(b"a".to_vec(), b"z".to_vec(), 5);
tracker.reset();
assert!(!tracker.is_deleted(b"m", 5, 10));
assert!(tracker.is_deleted(b"m", 4, 10));
}
#[test]
fn test_forward_sweep() {
let mut tracker = RangeTombstoneTracker::new();
tracker.add(b"b".to_vec(), b"d".to_vec(), 5);
tracker.add(b"f".to_vec(), b"h".to_vec(), 5);
tracker.reset();
assert!(!tracker.is_deleted(b"a", 1, 10));
assert!(tracker.is_deleted(b"b", 1, 10));
assert!(tracker.is_deleted(b"c", 1, 10));
assert!(!tracker.is_deleted(b"d", 1, 10));
assert!(!tracker.is_deleted(b"e", 1, 10));
assert!(tracker.is_deleted(b"f", 1, 10));
assert!(tracker.is_deleted(b"g", 1, 10));
assert!(!tracker.is_deleted(b"h", 1, 10));
}
#[test]
fn test_many_tombstones() {
let mut tracker = RangeTombstoneTracker::new();
for i in 0..100u32 {
let begin = format!("key_{:04}", i * 2);
let end = format!("key_{:04}", i * 2 + 1);
tracker.add(begin.into_bytes(), end.into_bytes(), 5);
}
tracker.reset();
for i in 0..200u32 {
let key = format!("key_{:04}", i);
let deleted = tracker.is_deleted(key.as_bytes(), 1, 10);
if i % 2 == 0 && i < 200 {
assert!(deleted, "key_{:04} should be deleted", i);
}
}
}
#[test]
fn test_fragmented_empty() {
let list = FragmentedRangeTombstoneList::empty();
assert!(list.is_empty());
assert_eq!(list.max_covering_tombstone_seq(b"any", 100), 0);
let list2 = FragmentedRangeTombstoneList::new(vec![]);
assert!(list2.is_empty());
}
#[test]
fn test_fragmented_single_tombstone() {
let list = FragmentedRangeTombstoneList::new(vec![(b"a".to_vec(), b"z".to_vec(), 5)]);
assert!(!list.is_empty());
assert_eq!(list.max_covering_tombstone_seq(b"a", 10), 5);
assert_eq!(list.max_covering_tombstone_seq(b"m", 10), 5);
assert_eq!(list.max_covering_tombstone_seq(b"y", 10), 5);
assert_eq!(list.max_covering_tombstone_seq(b"z", 10), 0); assert_eq!(list.max_covering_tombstone_seq(b"\0", 10), 0);
assert_eq!(list.max_covering_tombstone_seq(b"m", 3), 0);
assert_eq!(list.max_covering_tombstone_seq(b"m", 5), 5);
}
#[test]
fn test_fragmented_same_seq_not_deleted() {
let list = FragmentedRangeTombstoneList::new(vec![(b"a".to_vec(), b"z".to_vec(), 5)]);
let max_seq = list.max_covering_tombstone_seq(b"m", 10);
assert_eq!(max_seq, 5);
assert!(max_seq <= 5); assert!(max_seq > 4); }
#[test]
fn test_fragmented_overlapping() {
let list = FragmentedRangeTombstoneList::new(vec![
(b"a".to_vec(), b"m".to_vec(), 5),
(b"f".to_vec(), b"z".to_vec(), 8),
]);
assert_eq!(list.max_covering_tombstone_seq(b"a", 10), 5);
assert_eq!(list.max_covering_tombstone_seq(b"c", 10), 5);
assert_eq!(list.max_covering_tombstone_seq(b"e", 10), 5);
assert_eq!(list.max_covering_tombstone_seq(b"f", 10), 8);
assert_eq!(list.max_covering_tombstone_seq(b"h", 10), 8);
assert_eq!(list.max_covering_tombstone_seq(b"l", 10), 8);
assert_eq!(list.max_covering_tombstone_seq(b"m", 10), 8);
assert_eq!(list.max_covering_tombstone_seq(b"p", 10), 8);
assert_eq!(list.max_covering_tombstone_seq(b"y", 10), 8);
assert_eq!(list.max_covering_tombstone_seq(b"z", 10), 0);
assert_eq!(list.max_covering_tombstone_seq(b"h", 6), 5); assert_eq!(list.max_covering_tombstone_seq(b"p", 6), 0); }
#[test]
fn test_fragmented_nested() {
let list = FragmentedRangeTombstoneList::new(vec![
(b"a".to_vec(), b"z".to_vec(), 5),
(b"d".to_vec(), b"f".to_vec(), 8),
]);
assert_eq!(list.max_covering_tombstone_seq(b"b", 10), 5);
assert_eq!(list.max_covering_tombstone_seq(b"d", 10), 8); assert_eq!(list.max_covering_tombstone_seq(b"e", 10), 8);
assert_eq!(list.max_covering_tombstone_seq(b"f", 10), 5); assert_eq!(list.max_covering_tombstone_seq(b"x", 10), 5);
}
#[test]
fn test_fragmented_adjacent() {
let list = FragmentedRangeTombstoneList::new(vec![
(b"a".to_vec(), b"c".to_vec(), 5),
(b"c".to_vec(), b"f".to_vec(), 8),
]);
assert_eq!(list.max_covering_tombstone_seq(b"a", 10), 5);
assert_eq!(list.max_covering_tombstone_seq(b"b", 10), 5);
assert_eq!(list.max_covering_tombstone_seq(b"c", 10), 8); assert_eq!(list.max_covering_tombstone_seq(b"d", 10), 8);
assert_eq!(list.max_covering_tombstone_seq(b"f", 10), 0); }
#[test]
fn test_fragmented_many_tombstones() {
let raw: Vec<_> = (0..100u32)
.map(|i| {
let begin = format!("key_{:04}", i * 2);
let end = format!("key_{:04}", i * 2 + 1);
(begin.into_bytes(), end.into_bytes(), 5u64)
})
.collect();
let list = FragmentedRangeTombstoneList::new(raw);
for i in 0..200u32 {
let key = format!("key_{:04}", i);
let max_seq = list.max_covering_tombstone_seq(key.as_bytes(), 10);
if i % 2 == 0 {
assert_eq!(max_seq, 5, "key_{:04} should be covered", i);
} else {
assert_eq!(max_seq, 0, "key_{:04} should NOT be covered", i);
}
}
}
#[test]
fn test_fragmented_duplicate_seqs() {
let list = FragmentedRangeTombstoneList::new(vec![
(b"a".to_vec(), b"d".to_vec(), 5),
(b"c".to_vec(), b"f".to_vec(), 5),
]);
assert_eq!(list.max_covering_tombstone_seq(b"b", 10), 5);
assert_eq!(list.max_covering_tombstone_seq(b"c", 10), 5);
assert_eq!(list.max_covering_tombstone_seq(b"e", 10), 5);
}
}