use crate::compaction::merge_iter::MergingIterator;
use crate::error::{Error, Result};
use crate::lsm::sstable::SSTableEntry;
use crate::types::Key;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct LeveledCompactionConfig {
pub l0_compaction_trigger: usize,
pub level_size_multiplier: usize,
pub max_levels: usize,
pub target_file_size: usize,
}
impl Default for LeveledCompactionConfig {
fn default() -> Self {
Self {
l0_compaction_trigger: 4,
level_size_multiplier: 10,
max_levels: 7,
target_file_size: 64 * 1024 * 1024,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct KeyRange {
pub first_key: Key,
pub last_key: Key,
}
impl KeyRange {
pub fn overlaps(&self, other: &KeyRange) -> bool {
!(self.last_key < other.first_key || other.last_key < self.first_key)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SSTableMeta {
pub id: u64,
pub level: usize,
pub size_bytes: u64,
pub key_range: KeyRange,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CompactionPlan {
pub input_level: usize,
pub output_level: usize,
pub input_ids: Vec<u64>,
pub overlapping_output_ids: Vec<u64>,
}
#[derive(Debug, Clone)]
pub struct LeveledCompaction {
config: LeveledCompactionConfig,
}
impl LeveledCompaction {
pub fn new(config: LeveledCompactionConfig) -> Result<Self> {
if config.max_levels < 2 {
return Err(Error::InvalidFormat("max_levels must be >= 2".into()));
}
if config.l0_compaction_trigger == 0 {
return Err(Error::InvalidFormat(
"l0_compaction_trigger must be >= 1".into(),
));
}
if config.level_size_multiplier < 2 {
return Err(Error::InvalidFormat(
"level_size_multiplier must be >= 2".into(),
));
}
if config.target_file_size == 0 {
return Err(Error::InvalidFormat("target_file_size must be > 0".into()));
}
Ok(Self { config })
}
pub fn pick_plan(&self, levels: &[Vec<SSTableMeta>]) -> Option<CompactionPlan> {
if levels.len() < 2 {
return None;
}
let l0 = &levels[0];
if l0.len() >= self.config.l0_compaction_trigger {
let input_ids: Vec<u64> = l0.iter().map(|m| m.id).collect();
let input_range = merge_key_ranges(l0.iter().map(|m| &m.key_range));
let overlaps = levels[1]
.iter()
.filter(|m| m.key_range.overlaps(&input_range))
.map(|m| m.id)
.collect();
return Some(CompactionPlan {
input_level: 0,
output_level: 1,
input_ids,
overlapping_output_ids: overlaps,
});
}
let max = self.config.max_levels.min(levels.len());
for level in 1..max.saturating_sub(1) {
let total: u64 = levels[level].iter().map(|m| m.size_bytes).sum();
let limit = self.level_size_limit(level);
if total > limit {
let input_meta = levels[level].first()?;
let input_ids = vec![input_meta.id];
let overlaps = levels[level + 1]
.iter()
.filter(|m| m.key_range.overlaps(&input_meta.key_range))
.map(|m| m.id)
.collect();
return Some(CompactionPlan {
input_level: level,
output_level: level + 1,
input_ids,
overlapping_output_ids: overlaps,
});
}
}
None
}
fn level_size_limit(&self, level: usize) -> u64 {
let mult = self.config.level_size_multiplier as u64;
let base = self.config.target_file_size as u64;
let pow = mult.saturating_pow(level as u32);
base.saturating_mul(pow)
}
pub fn compact_entries(
&self,
sources: Vec<Box<dyn Iterator<Item = SSTableEntry>>>,
output_level: usize,
) -> Result<Vec<Vec<SSTableEntry>>> {
if output_level >= self.config.max_levels {
return Err(Error::InvalidFormat("output_level out of range".into()));
}
let mut out_files: Vec<Vec<SSTableEntry>> = Vec::new();
let mut current: Vec<SSTableEntry> = Vec::new();
let mut current_bytes: usize = 0;
let mut iter = MergingIterator::new(sources).peekable();
let drop_tombstones = output_level + 1 >= self.config.max_levels;
while let Some(entry) = iter.next() {
if !drop_tombstones {
push_splitting(
&mut out_files,
&mut current,
&mut current_bytes,
entry,
self.config.target_file_size,
);
continue;
}
let key = entry.key.clone();
if entry.value.is_none() {
while let Some(next) = iter.peek() {
if next.key == key {
let _ = iter.next();
} else {
break;
}
}
continue;
}
push_splitting(
&mut out_files,
&mut current,
&mut current_bytes,
entry,
self.config.target_file_size,
);
}
if !current.is_empty() {
out_files.push(current);
}
Ok(out_files)
}
}
fn merge_key_ranges<'a, I>(ranges: I) -> KeyRange
where
I: IntoIterator<Item = &'a KeyRange>,
{
let mut it = ranges.into_iter();
let first = it.next().expect("non-empty ranges");
let mut min = first.first_key.clone();
let mut max = first.last_key.clone();
for r in it {
if r.first_key < min {
min = r.first_key.clone();
}
if r.last_key > max {
max = r.last_key.clone();
}
}
KeyRange {
first_key: min,
last_key: max,
}
}
fn estimate_entry_size(entry: &SSTableEntry) -> usize {
let val_len = entry.value.as_ref().map(|v| v.len()).unwrap_or(0);
1 + 8 + 8 + 4 + 4 + entry.key.len() + val_len
}
fn push_splitting(
out_files: &mut Vec<Vec<SSTableEntry>>,
current: &mut Vec<SSTableEntry>,
current_bytes: &mut usize,
entry: SSTableEntry,
target_bytes: usize,
) {
let size = estimate_entry_size(&entry);
if !current.is_empty() && current_bytes.saturating_add(size) > target_bytes {
out_files.push(std::mem::take(current));
*current_bytes = 0;
}
*current_bytes = current_bytes.saturating_add(size);
current.push(entry);
}
#[cfg(all(test, not(target_arch = "wasm32")))]
mod tests {
use super::*;
fn meta(id: u64, level: usize, size: u64, first: &[u8], last: &[u8]) -> SSTableMeta {
SSTableMeta {
id,
level,
size_bytes: size,
key_range: KeyRange {
first_key: first.to_vec(),
last_key: last.to_vec(),
},
}
}
fn put(key: &[u8], value: &[u8], ts: u64, seq: u64) -> SSTableEntry {
SSTableEntry {
key: key.to_vec(),
value: Some(value.to_vec()),
timestamp: ts,
sequence: seq,
}
}
fn del(key: &[u8], ts: u64, seq: u64) -> SSTableEntry {
SSTableEntry {
key: key.to_vec(),
value: None,
timestamp: ts,
sequence: seq,
}
}
#[test]
fn picks_l0_trigger() {
let comp = LeveledCompaction::new(LeveledCompactionConfig {
l0_compaction_trigger: 2,
..Default::default()
})
.unwrap();
let levels = vec![
vec![meta(1, 0, 10, b"a", b"b"), meta(2, 0, 10, b"c", b"d")],
vec![meta(10, 1, 10, b"b", b"c"), meta(11, 1, 10, b"x", b"z")],
];
let plan = comp.pick_plan(&levels).unwrap();
assert_eq!(plan.input_level, 0);
assert_eq!(plan.output_level, 1);
assert_eq!(plan.input_ids, vec![1, 2]);
assert_eq!(plan.overlapping_output_ids, vec![10]);
}
#[test]
fn picks_size_trigger_for_l1() {
let comp = LeveledCompaction::new(LeveledCompactionConfig {
l0_compaction_trigger: 100,
target_file_size: 10,
level_size_multiplier: 2,
max_levels: 4,
})
.unwrap();
let levels = vec![
vec![],
vec![meta(5, 1, 100, b"a", b"z")],
vec![meta(6, 2, 10, b"m", b"n")],
vec![],
];
let plan = comp.pick_plan(&levels).unwrap();
assert_eq!(plan.input_level, 1);
assert_eq!(plan.output_level, 2);
assert_eq!(plan.input_ids, vec![5]);
assert_eq!(plan.overlapping_output_ids, vec![6]);
}
#[test]
fn bottom_level_drops_tombstone_and_older_versions() {
let comp = LeveledCompaction::new(LeveledCompactionConfig {
max_levels: 3,
target_file_size: 1024,
..Default::default()
})
.unwrap();
let sources: Vec<Box<dyn Iterator<Item = SSTableEntry>>> = vec![
Box::new(
vec![
del(b"k", 20, 1),
put(b"k", b"old", 10, 1),
put(b"x", b"v", 1, 1),
]
.into_iter(),
),
Box::new(vec![put(b"a", b"1", 5, 1)].into_iter()),
];
let out = comp.compact_entries(sources, 2).unwrap();
let flat: Vec<_> = out.into_iter().flatten().collect();
assert!(flat.iter().all(|e| e.key != b"k".to_vec()));
assert!(flat.iter().any(|e| e.key == b"x".to_vec()));
}
}