use crate::types::{
key::InternalKey,
sequence::{OpType, SeqNum},
value::Row,
};
#[derive(Clone, Debug)]
pub struct CompactionEntry {
pub ikey: InternalKey,
pub row: Row,
pub source_file_idx: usize,
pub row_position: u32,
}
#[derive(Clone, Debug)]
pub struct FileEntries {
pub file_idx: usize,
pub entries: Vec<(InternalKey, Row, u32)>, }
pub struct CompactionIterator {
entries: Vec<CompactionEntry>,
pos: usize,
}
impl CompactionIterator {
pub fn new(
file_entries: Vec<FileEntries>,
oldest_snapshot_seq: SeqNum,
drop_tombstones: bool,
) -> Self {
let mut all: Vec<CompactionEntry> = Vec::new();
for fe in file_entries {
for (ikey, row, row_pos) in fe.entries {
all.push(CompactionEntry {
ikey,
row,
source_file_idx: fe.file_idx,
row_position: row_pos,
});
}
}
all.sort_by(|a, b| a.ikey.cmp(&b.ikey));
let mut deduped: Vec<CompactionEntry> = Vec::new();
let mut last_uk: Option<Vec<u8>> = None;
let mut seen_latest = false;
for entry in all {
let uk = entry.ikey.user_key_bytes().to_vec();
if let Some(ref last) = last_uk {
if *last == uk {
if entry.ikey.seq >= oldest_snapshot_seq {
deduped.push(entry);
}
continue;
}
}
last_uk = Some(uk);
seen_latest = true;
if drop_tombstones
&& entry.ikey.op_type == OpType::Delete
&& entry.ikey.seq < oldest_snapshot_seq
{
continue;
}
deduped.push(entry);
}
let _ = seen_latest;
Self {
entries: deduped,
pos: 0,
}
}
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
}
impl Iterator for CompactionIterator {
type Item = CompactionEntry;
fn next(&mut self) -> Option<Self::Item> {
if self.pos >= self.entries.len() {
return None;
}
let entry = self.entries[self.pos].clone();
self.pos += 1;
Some(entry)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::{
schema::{ColumnDef, ColumnType, TableSchema},
sequence::OpType,
value::FieldValue,
};
fn schema() -> TableSchema {
TableSchema {
table_name: "t".into(),
columns: vec![ColumnDef {
name: "id".into(),
col_type: ColumnType::Int64,
nullable: false,
..Default::default()
}],
primary_key: vec![0],
..Default::default()
}
}
fn make_ikey(pk: i64, seq: u64, op: OpType) -> InternalKey {
InternalKey::encode(&[FieldValue::Int64(pk)], SeqNum(seq), op, &schema()).unwrap()
}
#[test]
fn dedup_keeps_latest() {
let fe = vec![FileEntries {
file_idx: 0,
entries: vec![
(make_ikey(1, 10, OpType::Put), Row::default(), 0),
(make_ikey(1, 5, OpType::Put), Row::default(), 1),
(make_ikey(2, 8, OpType::Put), Row::default(), 2),
],
}];
let iter = CompactionIterator::new(fe, SeqNum(100), false);
let results: Vec<_> = iter.collect();
assert_eq!(results.len(), 2); assert_eq!(results[0].ikey.seq, SeqNum(10));
assert_eq!(results[1].ikey.seq, SeqNum(8));
}
#[test]
fn drop_tombstones() {
let fe = vec![FileEntries {
file_idx: 0,
entries: vec![
(make_ikey(1, 10, OpType::Delete), Row::default(), 0),
(make_ikey(2, 8, OpType::Put), Row::default(), 1),
],
}];
let iter = CompactionIterator::new(fe, SeqNum(100), true);
let results: Vec<_> = iter.collect();
assert_eq!(results.len(), 1); assert_eq!(results[0].ikey.seq, SeqNum(8));
}
#[test]
fn snapshot_aware_version_dropping() {
let fe = vec![FileEntries {
file_idx: 0,
entries: vec![
(make_ikey(1, 10, OpType::Put), Row::default(), 0),
(make_ikey(1, 5, OpType::Put), Row::default(), 1),
(make_ikey(2, 8, OpType::Put), Row::default(), 2),
],
}];
let iter = CompactionIterator::new(fe.clone(), SeqNum(100), false);
let results: Vec<_> = iter.collect();
assert_eq!(
results.len(),
2,
"old version should be dropped when no reader needs it"
);
let iter = CompactionIterator::new(fe, SeqNum(3), false);
let results: Vec<_> = iter.collect();
assert_eq!(
results.len(),
3,
"old version must be preserved when oldest_snapshot_seq is below it"
);
let key1_versions: Vec<_> = results
.iter()
.filter(|e| e.ikey.seq.0 == 10 || e.ikey.seq.0 == 5)
.collect();
assert_eq!(
key1_versions.len(),
2,
"both versions of key=1 must survive"
);
}
#[test]
fn merge_across_files() {
let fe = vec![
FileEntries {
file_idx: 0,
entries: vec![
(make_ikey(1, 10, OpType::Put), Row::default(), 0),
(make_ikey(3, 10, OpType::Put), Row::default(), 1),
],
},
FileEntries {
file_idx: 1,
entries: vec![
(make_ikey(1, 5, OpType::Put), Row::default(), 0),
(make_ikey(2, 8, OpType::Put), Row::default(), 1),
],
},
];
let iter = CompactionIterator::new(fe, SeqNum(100), false);
let results: Vec<_> = iter.collect();
assert_eq!(results.len(), 3); assert_eq!(results[0].ikey.seq, SeqNum(10));
assert_eq!(results[0].source_file_idx, 0);
}
#[test]
fn tombstone_preserved_when_snapshot_pins_older_put() {
let fe = vec![FileEntries {
file_idx: 0,
entries: vec![
(make_ikey(1, 15, OpType::Delete), Row::default(), 0),
(make_ikey(1, 10, OpType::Put), Row::default(), 1),
(make_ikey(1, 5, OpType::Put), Row::default(), 2),
],
}];
let iter = CompactionIterator::new(fe, SeqNum(3), true);
let results: Vec<_> = iter.collect();
assert_eq!(
results.len(),
3,
"tombstone must be preserved when snapshot-pinned older Puts survive"
);
assert_eq!(results[0].ikey.op_type, OpType::Delete);
assert_eq!(results[0].ikey.seq, SeqNum(15));
}
#[test]
fn tombstone_dropped_when_no_pinned_versions() {
let fe = vec![FileEntries {
file_idx: 0,
entries: vec![
(make_ikey(1, 15, OpType::Delete), Row::default(), 0),
(make_ikey(1, 10, OpType::Put), Row::default(), 1),
(make_ikey(2, 8, OpType::Put), Row::default(), 2),
],
}];
let iter = CompactionIterator::new(fe, SeqNum(100), true);
let results: Vec<_> = iter.collect();
assert_eq!(results.len(), 1, "only key=2 should survive");
assert_eq!(results[0].ikey.seq, SeqNum(8));
}
}