use super::{RangeTombstone, Record};
pub struct VisibilityFilter<I>
where
I: Iterator<Item = Record>,
{
input: I,
current_key: Option<Vec<u8>>,
active_ranges: Vec<RangeTombstone>,
}
impl<I> VisibilityFilter<I>
where
I: Iterator<Item = Record>,
{
pub fn new(input: I) -> Self {
Self {
input,
current_key: None,
active_ranges: Vec::new(),
}
}
}
impl<I> Iterator for VisibilityFilter<I>
where
I: Iterator<Item = Record>,
{
type Item = (Vec<u8>, Vec<u8>);
fn next(&mut self) -> Option<Self::Item> {
for record in self.input.by_ref() {
match record {
Record::RangeDelete {
start,
end,
lsn,
timestamp,
} => {
self.active_ranges.push(RangeTombstone {
start,
end,
lsn,
timestamp,
});
}
Record::Delete { key, .. } => {
self.current_key = Some(key.clone());
}
Record::Put {
key, value, lsn, ..
} => {
if self.current_key.as_deref() == Some(&key) {
continue;
}
let deleted = self.active_ranges.iter().any(|r| {
r.start.as_slice() <= key.as_slice()
&& key.as_slice() < r.end.as_slice()
&& r.lsn > lsn
});
self.current_key = Some(key.clone());
if deleted {
continue; }
return Some((key, value));
}
}
}
None
}
}