#[derive(Debug, Clone)]
pub enum Record {
Put {
key: Vec<u8>,
value: Vec<u8>,
lsn: u64,
timestamp: u64,
},
Delete {
key: Vec<u8>,
lsn: u64,
timestamp: u64,
},
RangeDelete {
start: Vec<u8>,
end: Vec<u8>,
lsn: u64,
timestamp: u64,
},
}
impl Record {
pub fn lsn(&self) -> u64 {
match self {
Record::Put { lsn, .. } => *lsn,
Record::Delete { lsn, .. } => *lsn,
Record::RangeDelete { lsn, .. } => *lsn,
}
}
pub fn key(&self) -> &[u8] {
match self {
Record::Put { key, .. } => key,
Record::Delete { key, .. } => key,
Record::RangeDelete { start, .. } => start,
}
}
pub fn timestamp(&self) -> u64 {
match self {
Record::Put { timestamp, .. } => *timestamp,
Record::Delete { timestamp, .. } => *timestamp,
Record::RangeDelete { timestamp, .. } => *timestamp,
}
}
pub fn into_entry(self) -> RecordEntry {
match self {
Record::Put {
key,
value,
lsn,
timestamp,
} => RecordEntry::Point(PointEntry {
key,
value: Some(value),
lsn,
timestamp,
}),
Record::Delete {
key,
lsn,
timestamp,
} => RecordEntry::Point(PointEntry {
key,
value: None,
lsn,
timestamp,
}),
Record::RangeDelete {
start,
end,
lsn,
timestamp,
} => RecordEntry::Range(RangeTombstone {
start,
end,
lsn,
timestamp,
}),
}
}
}
pub enum RecordEntry {
Point(PointEntry),
Range(RangeTombstone),
}
impl PartialEq for Record {
fn eq(&self, other: &Self) -> bool {
self.key() == other.key() && self.lsn() == other.lsn()
}
}
impl Eq for Record {}
impl PartialOrd for Record {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for Record {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
match self.key().cmp(other.key()) {
std::cmp::Ordering::Equal => other.lsn().cmp(&self.lsn()),
ord => ord,
}
}
}
#[allow(dead_code)]
pub fn record_cmp(a: &Record, b: &Record) -> std::cmp::Ordering {
a.cmp(b)
}
#[derive(Debug, Clone)]
pub struct PointEntry {
pub key: Vec<u8>,
pub value: Option<Vec<u8>>,
pub lsn: u64,
pub timestamp: u64,
}
impl PointEntry {
pub fn new(
key: impl Into<Vec<u8>>,
value: impl Into<Vec<u8>>,
lsn: u64,
timestamp: u64,
) -> Self {
Self {
key: key.into(),
value: Some(value.into()),
lsn,
timestamp,
}
}
pub fn new_delete(key: impl Into<Vec<u8>>, lsn: u64, timestamp: u64) -> Self {
Self {
key: key.into(),
value: None,
lsn,
timestamp,
}
}
}
#[derive(Clone, Debug)]
pub struct RangeTombstone {
pub start: Vec<u8>,
pub end: Vec<u8>,
pub lsn: u64,
pub timestamp: u64,
}
impl RangeTombstone {
pub fn new(
start: impl Into<Vec<u8>>,
end: impl Into<Vec<u8>>,
lsn: u64,
timestamp: u64,
) -> Self {
Self {
start: start.into(),
end: end.into(),
lsn,
timestamp,
}
}
}
use std::cmp::Ordering;
use std::collections::BinaryHeap;
pub struct MergeIterator<'a> {
iters: Vec<Box<dyn Iterator<Item = Record> + 'a>>,
heap: BinaryHeap<MergeHeapEntry<'a>>,
}
struct MergeHeapEntry<'a> {
record: Record,
source_idx: usize,
_marker: std::marker::PhantomData<&'a ()>,
}
impl Ord for MergeHeapEntry<'_> {
fn cmp(&self, other: &Self) -> Ordering {
self.record.cmp(&other.record).reverse()
}
}
impl PartialOrd for MergeHeapEntry<'_> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for MergeHeapEntry<'_> {
fn eq(&self, other: &Self) -> bool {
self.record == other.record
}
}
impl Eq for MergeHeapEntry<'_> {}
impl<'a> MergeIterator<'a> {
pub fn new(mut iters: Vec<Box<dyn Iterator<Item = Record> + 'a>>) -> Self {
let mut heap = BinaryHeap::new();
for (idx, iter) in iters.iter_mut().enumerate() {
if let Some(record) = iter.next() {
heap.push(MergeHeapEntry {
record,
source_idx: idx,
_marker: std::marker::PhantomData,
});
}
}
Self { iters, heap }
}
}
impl Iterator for MergeIterator<'_> {
type Item = Record;
fn next(&mut self) -> Option<Self::Item> {
let entry = self.heap.pop()?;
let result = entry.record;
let idx = entry.source_idx;
if let Some(next_record) = self.iters[idx].next() {
self.heap.push(MergeHeapEntry {
record: next_record,
source_idx: idx,
_marker: std::marker::PhantomData,
});
}
Some(result)
}
}