use std::cmp::Ordering as CmpOrdering;
use std::collections::HashSet;
use std::fs::remove_file;
use std::path::Path;
use std::sync::{
Arc,
atomic::{AtomicU64, Ordering},
};
use std::thread::scope;
use ruc::*;
use crate::cache::block_cache::BlockCache;
use crate::cache::table_cache::TableCache;
use crate::error::Result;
use crate::iterator::merge::{IterSource, MergingIterator};
use crate::iterator::range_del::RangeTombstoneTracker;
use crate::manifest::version::{TableFile, Version};
use crate::manifest::version_edit::{FileMetaData, VersionEdit};
use crate::manifest::version_set::VersionSet;
use crate::options::{CompactionFilterDecision, DbOptions};
use crate::rate_limiter::RateLimiter;
use crate::sst::table_builder::{TableBuildOptions, TableBuilder};
use crate::sst::table_reader::TableIterator;
use crate::stats::DbStats;
use crate::types::{
InternalKey, InternalKeyRef, LazyValue, MAX_SEQUENCE_NUMBER, SequenceNumber, ValueType,
compare_internal_key, user_key,
};
#[derive(Debug, Clone)]
pub struct CompactionHint {
pub level: usize,
pub read_count: u64,
}
pub(crate) struct CompactionContext<'a> {
pub db_path: &'a Path,
pub options: &'a DbOptions,
pub rate_limiter: Option<&'a Arc<RateLimiter>>,
pub stats: Option<&'a Arc<DbStats>>,
pub active_snapshots: &'a [SequenceNumber],
}
pub struct CompactionTask {
pub level: usize,
pub input_files_level: Vec<TableFile>,
pub input_files_next: Vec<TableFile>,
}
pub struct CompactionOutput {
pub edit: VersionEdit,
pub input_file_numbers: HashSet<u64>,
pub files_produced: u64,
pub next_file_number_hint: u64,
}
pub struct PostCompactionCleanup {
pub files_to_delete: HashSet<u64>,
}
impl CompactionTask {
pub fn total_input_size(&self) -> u64 {
self.input_files_level
.iter()
.chain(self.input_files_next.iter())
.map(|f| f.meta.file_size)
.sum()
}
}
pub struct LeveledCompaction;
fn collect_range_del_entries(files: &[TableFile]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
let mut entries = Vec::new();
for tf in files {
if tf.meta.has_range_deletions {
let tombstones = tf.reader.get_range_tombstones().c(d!())?;
for (begin, end, seq) in tombstones {
let ikey = InternalKey::new(&begin, seq, ValueType::RangeDeletion);
entries.push((ikey.as_bytes().to_vec(), end));
}
}
}
entries.sort_by(|a, b| compare_internal_key(&a.0, &b.0));
Ok(entries)
}
struct SubCompactionTask {
lower_bound: Option<Vec<u8>>,
upper_bound: Option<Vec<u8>>,
input_files_level: Vec<TableFile>,
input_files_next: Vec<TableFile>,
}
struct SubCompactionParams<'a> {
target_level: usize,
is_bottommost: bool,
build_opts: &'a TableBuildOptions,
oldest_snapshot_seq: SequenceNumber,
file_number_counter: &'a AtomicU64,
all_range_del_entries: &'a [(Vec<u8>, Vec<u8>)],
all_raw_tombstones: &'a [(Vec<u8>, Vec<u8>, SequenceNumber)],
}
struct SubCompactionOutput {
new_files: Vec<(u32, FileMetaData)>,
files_produced: u64,
}
fn compute_split_points(task: &CompactionTask, max_subs: usize) -> Vec<Vec<u8>> {
if max_subs <= 1 {
return Vec::new();
}
let next_files = &task.input_files_next;
if next_files.len() <= 1 {
return Vec::new(); }
let boundaries: Vec<Vec<u8>> = next_files[..next_files.len() - 1]
.iter()
.map(|tf| user_key(&tf.meta.largest_key).to_vec())
.collect();
if boundaries.is_empty() {
return Vec::new();
}
let desired_splits = (max_subs - 1).min(boundaries.len());
if desired_splits >= boundaries.len() {
return boundaries;
}
let mut splits = Vec::with_capacity(desired_splits);
for i in 1..=desired_splits {
let idx = i * boundaries.len() / (desired_splits + 1);
splits.push(boundaries[idx.min(boundaries.len() - 1)].clone());
}
splits
}
fn build_sub_tasks(task: &CompactionTask, split_points: &[Vec<u8>]) -> Vec<SubCompactionTask> {
let n = split_points.len() + 1;
let mut sub_tasks = Vec::with_capacity(n);
let next_files = &task.input_files_next;
for i in 0..n {
let lower = if i == 0 {
None
} else {
Some(split_points[i - 1].clone())
};
let upper = if i == n - 1 {
None
} else {
Some(split_points[i].clone())
};
let sub_next: Vec<TableFile> = next_files
.iter()
.filter(|tf| {
let file_smallest = user_key(&tf.meta.smallest_key);
let file_largest = user_key(&tf.meta.largest_key);
let above_lower = match &lower {
Some(lo) => file_largest >= lo.as_slice(),
None => true,
};
let below_upper = match &upper {
Some(hi) => file_smallest < hi.as_slice(),
None => true,
};
above_lower && below_upper
})
.cloned()
.collect();
let sub_level: Vec<TableFile> = if task.level == 0 {
task.input_files_level.clone()
} else {
task.input_files_level
.iter()
.filter(|tf| {
let file_smallest = user_key(&tf.meta.smallest_key);
let file_largest = user_key(&tf.meta.largest_key);
let above_lower = match &lower {
Some(lo) => file_largest >= lo.as_slice(),
None => true,
};
let below_upper = match &upper {
Some(hi) => file_smallest < hi.as_slice(),
None => true,
};
above_lower && below_upper
})
.cloned()
.collect()
};
sub_tasks.push(SubCompactionTask {
lower_bound: lower,
upper_bound: upper,
input_files_level: sub_level,
input_files_next: sub_next,
});
}
sub_tasks
}
type RawTombstone = (Vec<u8>, Vec<u8>, SequenceNumber);
fn collect_raw_tombstones(files: &[TableFile]) -> Result<Vec<RawTombstone>> {
let mut tombstones = Vec::new();
for tf in files {
if tf.meta.has_range_deletions {
let ts = tf.reader.get_range_tombstones().c(d!())?;
tombstones.extend(ts);
}
}
Ok(tombstones)
}
fn execute_sub_compaction_io(
ctx: &CompactionContext<'_>,
sub: &SubCompactionTask,
params: &SubCompactionParams<'_>,
) -> Result<SubCompactionOutput> {
let mut sources: Vec<IterSource> = Vec::new();
for tf in &sub.input_files_level {
let iter = TableIterator::new(tf.reader.clone());
sources.push(IterSource::from_boxed(Box::new(iter)));
}
for tf in &sub.input_files_next {
let iter = TableIterator::new(tf.reader.clone());
sources.push(IterSource::from_boxed(Box::new(iter)));
}
if !params.all_range_del_entries.is_empty() {
sources.push(IterSource::new(params.all_range_del_entries.to_vec()));
}
let mut merger = MergingIterator::new(sources, compare_internal_key);
if let Some(ref lo) = sub.lower_bound {
let seek_key = InternalKey::new(lo, MAX_SEQUENCE_NUMBER, ValueType::Value)
.as_bytes()
.to_vec();
merger.seek(&seek_key);
}
let mut new_files: Vec<(u32, FileMetaData)> = Vec::new();
let mut builder: Option<TableBuilder> = None;
let mut current_file_number = 0u64;
let mut next_file_idx = 0u64;
let mut current_size = 0usize;
let mut last_point_key: Option<Vec<u8>> = None;
let mut last_range_del_key: Option<Vec<u8>> = None;
let mut last_written_seq: SequenceNumber = 0;
let mut snapshot_idx: usize = ctx.active_snapshots.len();
let mut range_tombstones = RangeTombstoneTracker::new();
for (begin, end, seq) in params.all_raw_tombstones {
range_tombstones.add(begin.clone(), end.clone(), *seq);
}
range_tombstones.reset();
while let Some((ikey, value)) = merger.next_entry() {
if ikey.len() < 8 {
continue;
}
let ikr = InternalKeyRef::new(&ikey);
let user_key = ikr.user_key();
if let Some(ref hi) = sub.upper_bound
&& user_key >= hi.as_slice()
{
break;
}
if ikr.value_type() == ValueType::RangeDeletion {
range_tombstones.add(user_key.to_vec(), value.as_slice().to_vec(), ikr.sequence());
range_tombstones.reset();
if let Some(ref last) = last_range_del_key
&& last.as_slice() == ikey.as_slice()
{
continue;
}
last_range_del_key = Some(ikey.clone());
if params.is_bottommost && ikr.sequence() < params.oldest_snapshot_seq {
continue;
}
} else if let Some(ref last) = last_point_key
&& last.as_slice() == user_key
{
while snapshot_idx > 0 && ctx.active_snapshots[snapshot_idx - 1] >= last_written_seq {
snapshot_idx -= 1;
}
if snapshot_idx > 0 && ctx.active_snapshots[snapshot_idx - 1] >= ikr.sequence() {
last_written_seq = ikr.sequence();
} else {
continue;
}
} else {
last_point_key = Some(user_key.to_vec());
last_written_seq = ikr.sequence();
snapshot_idx = ctx.active_snapshots.len();
if ikr.value_type() == ValueType::Deletion
&& params.is_bottommost
&& ikr.sequence() < params.oldest_snapshot_seq
{
continue;
}
if ikr.value_type() == ValueType::Value && !range_tombstones.is_empty() {
let entry_seq = ikr.sequence();
if range_tombstones.is_deleted(user_key, entry_seq, params.oldest_snapshot_seq) {
continue;
}
}
}
let mut final_value = value;
if let Some(ref filter) = ctx.options.compaction_filter
&& ikr.value_type() == ValueType::Value
{
match filter.filter(params.target_level, user_key, final_value.as_slice()) {
CompactionFilterDecision::Keep => {}
CompactionFilterDecision::Remove => continue,
CompactionFilterDecision::ChangeValue(new_val) => {
final_value = LazyValue::Inline(new_val);
}
}
}
if builder.is_none() {
current_file_number = params.file_number_counter.fetch_add(1, Ordering::Relaxed);
next_file_idx += 1;
let sst_path = ctx.db_path.join(format!("{:06}.sst", current_file_number));
let mut opts = params.build_opts.clone();
opts.block_property_collectors = ctx
.options
.block_property_collectors
.iter()
.map(|f| f())
.collect();
builder = Some(TableBuilder::new(&sst_path, opts).c(d!())?);
current_size = 0;
}
let final_ikey;
let ikey_ref = if params.is_bottommost
&& ikr.sequence() > 0
&& ikr.sequence() < params.oldest_snapshot_seq
&& ikr.value_type() == ValueType::Value
{
final_ikey = InternalKey::new(user_key, 0, ikr.value_type())
.as_bytes()
.to_vec();
&final_ikey
} else {
&ikey
};
let entry_bytes = ikey_ref.len() + final_value.len();
builder
.as_mut()
.unwrap()
.add(ikey_ref, final_value.as_slice())
.c(d!())?;
current_size += entry_bytes;
if let Some(rl) = ctx.rate_limiter {
rl.request(entry_bytes);
}
if current_size >= ctx.options.target_file_size_base as usize {
let result = builder.take().unwrap().finish().c(d!())?;
if let Some(s) = ctx.stats {
s.record_compaction_bytes(result.file_size);
}
new_files.push((
params.target_level as u32,
FileMetaData {
number: current_file_number,
file_size: result.file_size,
smallest_key: result.smallest_key.unwrap_or_default(),
largest_key: result.largest_key.unwrap_or_default(),
has_range_deletions: result.has_range_deletions,
},
));
}
}
if let Some(b) = builder {
let result = b.finish().c(d!())?;
if let Some(s) = ctx.stats {
s.record_compaction_bytes(result.file_size);
}
new_files.push((
params.target_level as u32,
FileMetaData {
number: current_file_number,
file_size: result.file_size,
smallest_key: result.smallest_key.unwrap_or_default(),
largest_key: result.largest_key.unwrap_or_default(),
has_range_deletions: result.has_range_deletions,
},
));
}
if let Some(e) = merger.error() {
for (_, meta) in &new_files {
let orphan = ctx.db_path.join(format!("{:06}.sst", meta.number));
let _ = remove_file(&orphan);
}
return Err(eg!("sub-compaction merge error: {}", e));
}
Ok(SubCompactionOutput {
new_files,
files_produced: next_file_idx,
})
}
impl LeveledCompaction {
pub fn pick_compaction(version: &Version, options: &DbOptions) -> Option<CompactionTask> {
if version.l0_file_count() >= options.l0_compaction_trigger {
return Self::pick_l0_compaction(version);
}
for level in 1..version.num_levels - 1 {
let level_size: u64 = version
.level_files(level)
.iter()
.map(|f| f.meta.file_size)
.sum();
let max_size = Self::max_bytes_for_level(options, level);
if level_size > max_size {
return Self::pick_level_compaction(version, level);
}
}
None
}
fn pick_l0_compaction(version: &Version) -> Option<CompactionTask> {
let l0_files = version.level_files(0);
if l0_files.is_empty() {
return None;
}
let input_l0: Vec<TableFile> = l0_files.to_vec();
let (smallest, largest) = Self::total_key_range(&input_l0);
let input_l1 = Self::overlapping_files(version.level_files(1), &smallest, &largest);
Some(CompactionTask {
level: 0,
input_files_level: input_l0,
input_files_next: input_l1,
})
}
fn pick_level_compaction(version: &Version, level: usize) -> Option<CompactionTask> {
let files = version.level_files(level);
if files.is_empty() {
return None;
}
let target = files.iter().max_by_key(|f| f.meta.file_size)?;
let input_level = vec![target.clone()];
let (smallest, largest) = Self::total_key_range(&input_level);
let next_level = level + 1;
let input_next = if next_level < version.num_levels {
Self::overlapping_files(version.level_files(next_level), &smallest, &largest)
} else {
Vec::new()
};
Some(CompactionTask {
level,
input_files_level: input_level,
input_files_next: input_next,
})
}
pub fn pick_compaction_for_range(
version: &Version,
begin: Option<&[u8]>,
end: Option<&[u8]>,
) -> Option<CompactionTask> {
let l0_files = version.level_files(0);
let mut input_l0: Vec<TableFile> = Vec::new();
for tf in l0_files {
let file_smallest = user_key(&tf.meta.smallest_key);
let file_largest = user_key(&tf.meta.largest_key);
let overlaps_begin = begin.is_none_or(|b| file_largest >= b);
let overlaps_end = end.is_none_or(|e| file_smallest < e);
if overlaps_begin && overlaps_end {
input_l0.push(tf.clone());
}
}
if !input_l0.is_empty() {
let (smallest, largest) = Self::total_key_range(&input_l0);
let input_l1 = Self::overlapping_files(version.level_files(1), &smallest, &largest);
return Some(CompactionTask {
level: 0,
input_files_level: input_l0,
input_files_next: input_l1,
});
}
for level in 1..version.num_levels - 1 {
let files = version.level_files(level);
let mut input_level: Vec<TableFile> = Vec::new();
for tf in files {
let file_smallest = user_key(&tf.meta.smallest_key);
let file_largest = user_key(&tf.meta.largest_key);
let overlaps_begin = begin.is_none_or(|b| file_largest >= b);
let overlaps_end = end.is_none_or(|e| file_smallest < e);
if overlaps_begin && overlaps_end {
input_level.push(tf.clone());
}
}
if !input_level.is_empty() {
let (smallest, largest) = Self::total_key_range(&input_level);
let next_level = level + 1;
let input_next = if next_level < version.num_levels {
Self::overlapping_files(version.level_files(next_level), &smallest, &largest)
} else {
Vec::new()
};
return Some(CompactionTask {
level,
input_files_level: input_level,
input_files_next: input_next,
});
}
}
None
}
pub fn execute_compaction(
task: &CompactionTask,
versions: &mut VersionSet,
db_path: &Path,
options: &DbOptions,
) -> Result<()> {
let ctx = CompactionContext {
db_path,
options,
rate_limiter: None,
stats: None,
active_snapshots: &[],
};
Self::execute_compaction_with_cache(&ctx, task, versions, None)
}
pub(crate) fn execute_compaction_with_cache(
ctx: &CompactionContext<'_>,
task: &CompactionTask,
versions: &mut VersionSet,
table_cache: Option<&Arc<TableCache>>,
) -> Result<()> {
let target_level = task.level + 1;
if task.input_files_level.len() == 1
&& task.input_files_next.is_empty()
&& ctx.options.compaction_filter.is_none()
{
let tf = &task.input_files_level[0];
let mut edit = VersionEdit::new();
edit.delete_file(task.level as u32, tf.meta.number);
edit.add_file(target_level as u32, tf.meta.clone());
edit.set_next_file_number(versions.next_file_number());
versions.log_and_apply(edit).c(d!())?;
if let Some(s) = ctx.stats {
s.record_compaction_completed();
}
return Ok(());
}
let max_outputs = task.total_input_size() / ctx.options.target_file_size_base
+ ctx.options.max_subcompactions.max(1) as u64;
let file_number_start = versions.reserve_file_numbers(max_outputs);
let version = versions.current();
let all_inputs: Vec<_> = task
.input_files_level
.iter()
.chain(task.input_files_next.iter())
.cloned()
.collect();
let is_bottom =
Self::is_bottommost_level(&version, target_level, ctx.options.num_levels, &all_inputs);
let output =
Self::execute_compaction_io(ctx, task, file_number_start, is_bottom).c(d!())?;
let cleanup =
Self::install_compaction(output, versions, table_cache, None, ctx.db_path, ctx.stats)
.c(d!())?;
versions.sync_manifest().c(d!())?;
Self::run_post_compaction_cleanup(&cleanup, ctx.db_path);
Ok(())
}
pub(crate) fn execute_compaction_io(
ctx: &CompactionContext<'_>,
task: &CompactionTask,
file_number_start: u64,
is_bottommost: bool,
) -> Result<CompactionOutput> {
let target_level = task.level + 1;
let oldest_snapshot_seq = ctx
.active_snapshots
.iter()
.min()
.copied()
.unwrap_or(SequenceNumber::MAX);
let target_compression = if !ctx.options.compression_per_level.is_empty()
&& target_level < ctx.options.compression_per_level.len()
{
ctx.options.compression_per_level[target_level]
} else {
ctx.options.compression
};
let build_opts = TableBuildOptions {
block_size: ctx.options.block_size,
block_restart_interval: ctx.options.block_restart_interval,
bloom_bits_per_key: ctx.options.bloom_bits_per_key,
internal_keys: true,
compression: target_compression,
prefix_len: ctx.options.prefix_len,
block_property_collectors: Vec::new(),
};
let max_subs = if task.level == 0 {
1
} else {
ctx.options.max_subcompactions.max(1)
};
let split_points = compute_split_points(task, max_subs);
let actual_subs = split_points.len() + 1;
let sub_tasks = build_sub_tasks(task, &split_points);
let all_input_files: Vec<TableFile> = task
.input_files_level
.iter()
.chain(task.input_files_next.iter())
.cloned()
.collect();
let all_range_del_entries = collect_range_del_entries(&all_input_files).c(d!())?;
let all_raw_tombstones = collect_raw_tombstones(&all_input_files).c(d!())?;
let file_counter = AtomicU64::new(file_number_start);
let sub_params = SubCompactionParams {
target_level,
is_bottommost,
build_opts: &build_opts,
oldest_snapshot_seq,
file_number_counter: &file_counter,
all_range_del_entries: &all_range_del_entries,
all_raw_tombstones: &all_raw_tombstones,
};
let sub_outputs = if actual_subs <= 1 {
vec![execute_sub_compaction_io(ctx, &sub_tasks[0], &sub_params).c(d!())?]
} else {
let thread_results: Vec<Result<SubCompactionOutput>> = scope(|s| {
let handles: Vec<_> = sub_tasks
.iter()
.map(|sub| s.spawn(|| execute_sub_compaction_io(ctx, sub, &sub_params)))
.collect();
handles
.into_iter()
.map(|h| match h.join() {
Ok(r) => r,
Err(_) => Err(eg!("sub-compaction thread panicked")),
})
.collect()
});
let mut outputs = Vec::with_capacity(thread_results.len());
let mut first_err: Option<Box<dyn ruc::RucError>> = None;
for r in thread_results {
match r {
Ok(sub_out) => outputs.push(sub_out),
Err(e) => {
if first_err.is_none() {
first_err = Some(e);
}
}
}
}
if let Some(e) = first_err {
for sub_out in &outputs {
for (_, meta) in &sub_out.new_files {
let orphan = ctx.db_path.join(format!("{:06}.sst", meta.number));
let _ = remove_file(&orphan);
}
}
return Err(e);
}
outputs
};
let mut edit = VersionEdit::new();
let mut total_files_produced = 0u64;
for sub_out in sub_outputs {
for file_entry in sub_out.new_files {
edit.new_files.push(file_entry);
}
total_files_produced += sub_out.files_produced;
}
let input_file_numbers: HashSet<u64> = task
.input_files_level
.iter()
.map(|f| f.meta.number)
.chain(task.input_files_next.iter().map(|f| f.meta.number))
.collect();
for tf in &task.input_files_level {
edit.delete_file(task.level as u32, tf.meta.number);
}
for tf in &task.input_files_next {
edit.delete_file(target_level as u32, tf.meta.number);
}
Ok(CompactionOutput {
edit,
input_file_numbers,
files_produced: total_files_produced,
next_file_number_hint: file_counter.load(Ordering::Relaxed),
})
}
pub fn install_compaction(
mut output: CompactionOutput,
versions: &mut VersionSet,
table_cache: Option<&Arc<TableCache>>,
block_cache: Option<&Arc<BlockCache>>,
db_path: &Path,
stats: Option<&Arc<DbStats>>,
) -> Result<PostCompactionCleanup> {
{
let version = versions.current();
let all_file_numbers: HashSet<u64> = (0..version.num_levels)
.flat_map(|l| version.level_files(l).iter().map(|f| f.meta.number))
.collect();
let stale = output
.input_file_numbers
.iter()
.any(|n| !all_file_numbers.contains(n));
if stale {
for (_, meta) in &output.edit.new_files {
let orphan = db_path.join(format!("{:06}.sst", meta.number));
let _ = remove_file(&orphan);
}
if let Some(cache) = table_cache {
for (_, meta) in &output.edit.new_files {
cache.evict(meta.number);
}
}
return Ok(PostCompactionCleanup {
files_to_delete: HashSet::new(),
});
}
}
versions.ensure_file_number_at_least(output.next_file_number_hint);
output
.edit
.set_next_file_number(versions.next_file_number());
versions.log_and_apply(output.edit).c(d!())?;
for num in &output.input_file_numbers {
if let Some(cache) = table_cache {
cache.evict(*num);
}
if let Some(bc) = block_cache {
bc.invalidate_file(*num);
}
}
if let Some(s) = stats {
s.record_compaction_completed();
}
Ok(PostCompactionCleanup {
files_to_delete: output.input_file_numbers,
})
}
pub fn run_post_compaction_cleanup(cleanup: &PostCompactionCleanup, db_path: &Path) {
for num in &cleanup.files_to_delete {
let old_path = db_path.join(format!("{:06}.sst", num));
if let Err(e) = remove_file(&old_path) {
tracing::warn!("failed to remove old SST {}: {}", old_path.display(), e);
}
}
}
pub(crate) fn force_merge_level(
ctx: &CompactionContext<'_>,
level: usize,
versions: &mut VersionSet,
table_cache: Option<&Arc<TableCache>>,
block_cache: Option<&Arc<BlockCache>>,
) -> Result<()> {
let oldest_snapshot_seq = ctx
.active_snapshots
.iter()
.min()
.copied()
.unwrap_or(SequenceNumber::MAX);
let version = versions.current();
let files = version.level_files(level);
if files.len() <= 1 {
return Ok(());
}
let is_bottommost =
Self::is_bottommost_level(&version, level, ctx.options.num_levels, files);
let mut sources: Vec<IterSource> = Vec::new();
for tf in files {
let iter = TableIterator::new(tf.reader.clone());
sources.push(IterSource::from_boxed(Box::new(iter)));
}
let range_del_entries = collect_range_del_entries(files).c(d!())?;
if !range_del_entries.is_empty() {
sources.push(IterSource::new(range_del_entries));
}
let mut merger = MergingIterator::new(sources, compare_internal_key);
let compression = if !ctx.options.compression_per_level.is_empty()
&& level < ctx.options.compression_per_level.len()
{
ctx.options.compression_per_level[level]
} else {
ctx.options.compression
};
let build_opts = TableBuildOptions {
block_size: ctx.options.block_size,
block_restart_interval: ctx.options.block_restart_interval,
bloom_bits_per_key: ctx.options.bloom_bits_per_key,
internal_keys: true,
compression,
prefix_len: ctx.options.prefix_len,
block_property_collectors: Vec::new(),
};
let mut edit = VersionEdit::new();
let mut builder: Option<TableBuilder> = None;
let mut current_file_number = 0u64;
let mut current_size = 0usize;
let mut last_point_key: Option<Vec<u8>> = None;
let mut last_range_del_key: Option<Vec<u8>> = None;
let mut last_written_seq: SequenceNumber = 0;
let mut snapshot_idx: usize = ctx.active_snapshots.len();
let mut range_tombstones = RangeTombstoneTracker::new();
while let Some((ikey, value)) = merger.next_entry() {
if ikey.len() < 8 {
continue;
}
let ikr = InternalKeyRef::new(&ikey);
let user_key = ikr.user_key();
if ikr.value_type() == ValueType::RangeDeletion {
range_tombstones.add(user_key.to_vec(), value.as_slice().to_vec(), ikr.sequence());
range_tombstones.reset();
if let Some(ref last) = last_range_del_key
&& last.as_slice() == ikey.as_slice()
{
continue;
}
last_range_del_key = Some(ikey.clone());
if is_bottommost && ikr.sequence() < oldest_snapshot_seq {
continue;
}
} else if let Some(ref last) = last_point_key
&& last.as_slice() == user_key
{
while snapshot_idx > 0 && ctx.active_snapshots[snapshot_idx - 1] >= last_written_seq
{
snapshot_idx -= 1;
}
if snapshot_idx > 0 && ctx.active_snapshots[snapshot_idx - 1] >= ikr.sequence() {
last_written_seq = ikr.sequence();
} else {
continue;
}
} else {
last_point_key = Some(user_key.to_vec());
last_written_seq = ikr.sequence();
snapshot_idx = ctx.active_snapshots.len();
if ikr.value_type() == ValueType::Deletion
&& is_bottommost
&& ikr.sequence() < oldest_snapshot_seq
{
continue;
}
if ikr.value_type() == ValueType::Value && !range_tombstones.is_empty() {
let entry_seq = ikr.sequence();
if range_tombstones.is_deleted(user_key, entry_seq, oldest_snapshot_seq) {
continue;
}
}
}
let mut final_value = value;
if let Some(ref filter) = ctx.options.compaction_filter
&& ikr.value_type() == ValueType::Value
{
match filter.filter(level, user_key, final_value.as_slice()) {
CompactionFilterDecision::Keep => {}
CompactionFilterDecision::Remove => continue,
CompactionFilterDecision::ChangeValue(new_val) => {
final_value = LazyValue::Inline(new_val);
}
}
}
if builder.is_none() {
current_file_number = versions.new_file_number();
let sst_path = ctx.db_path.join(format!("{:06}.sst", current_file_number));
let mut opts = build_opts.clone();
opts.block_property_collectors = ctx
.options
.block_property_collectors
.iter()
.map(|f| f())
.collect();
builder = Some(TableBuilder::new(&sst_path, opts).c(d!())?);
current_size = 0;
}
let final_ikey;
let ikey_ref = if is_bottommost
&& ikr.sequence() > 0
&& ikr.sequence() < oldest_snapshot_seq
&& ikr.value_type() == ValueType::Value
{
final_ikey = InternalKey::new(user_key, 0, ikr.value_type())
.as_bytes()
.to_vec();
&final_ikey
} else {
&ikey
};
builder
.as_mut()
.unwrap()
.add(ikey_ref, final_value.as_slice())
.c(d!())?;
let entry_bytes = ikey_ref.len() + final_value.len();
current_size += entry_bytes;
if let Some(rl) = ctx.rate_limiter {
rl.request(entry_bytes);
}
if current_size >= ctx.options.target_file_size_base as usize {
let result = builder.take().unwrap().finish().c(d!())?;
if let Some(s) = ctx.stats {
s.record_compaction_bytes(result.file_size);
}
edit.add_file(
level as u32,
FileMetaData {
number: current_file_number,
file_size: result.file_size,
smallest_key: result.smallest_key.unwrap_or_default(),
largest_key: result.largest_key.unwrap_or_default(),
has_range_deletions: result.has_range_deletions,
},
);
}
}
if let Some(b) = builder {
let result = b.finish().c(d!())?;
if let Some(s) = ctx.stats {
s.record_compaction_bytes(result.file_size);
}
edit.add_file(
level as u32,
FileMetaData {
number: current_file_number,
file_size: result.file_size,
smallest_key: result.smallest_key.unwrap_or_default(),
largest_key: result.largest_key.unwrap_or_default(),
has_range_deletions: result.has_range_deletions,
},
);
}
if let Some(e) = merger.error() {
for (_, meta) in &edit.new_files {
let orphan = ctx.db_path.join(format!("{:06}.sst", meta.number));
let _ = remove_file(&orphan);
}
return Err(eg!("force_merge iterator error: {}", e));
}
let input_file_numbers: HashSet<u64> = files.iter().map(|f| f.meta.number).collect();
for tf in files {
edit.delete_file(level as u32, tf.meta.number);
}
edit.set_next_file_number(versions.next_file_number());
versions.log_and_apply(edit).c(d!())?;
versions.sync_manifest().c(d!())?;
if let Some(cache) = table_cache {
for num in &input_file_numbers {
cache.evict(*num);
}
}
if let Some(bc) = block_cache {
for num in &input_file_numbers {
bc.invalidate_file(*num);
}
}
for num in &input_file_numbers {
let old_path = ctx.db_path.join(format!("{:06}.sst", num));
if let Err(e) = remove_file(&old_path) {
tracing::warn!("failed to remove old SST {}: {}", old_path.display(), e);
}
}
if let Some(s) = ctx.stats {
s.record_compaction_completed();
}
Ok(())
}
pub fn pick_compaction_for_hint(
version: &Version,
hint: &CompactionHint,
) -> Option<CompactionTask> {
let level = hint.level;
if level == 0 || level >= version.num_levels.saturating_sub(1) {
return None;
}
let files = version.level_files(level);
if files.len() <= 1 {
return None;
}
Self::pick_level_compaction(version, level)
}
fn max_bytes_for_level(options: &DbOptions, level: usize) -> u64 {
let mut result = options.max_bytes_for_level_base;
for _ in 1..level {
result = (result as f64 * options.max_bytes_for_level_multiplier) as u64;
}
result
}
fn total_key_range(files: &[TableFile]) -> (Vec<u8>, Vec<u8>) {
let mut smallest = Vec::new();
let mut largest = Vec::new();
for f in files {
if smallest.is_empty()
|| compare_internal_key(&f.meta.smallest_key, &smallest) == CmpOrdering::Less
{
smallest = f.meta.smallest_key.clone();
}
if largest.is_empty()
|| compare_internal_key(&f.meta.largest_key, &largest) == CmpOrdering::Greater
{
largest = f.meta.largest_key.clone();
}
}
(smallest, largest)
}
pub(crate) fn is_bottommost_level(
version: &Version,
target_level: usize,
num_levels: usize,
all_inputs: &[TableFile],
) -> bool {
if target_level >= num_levels - 1 {
return true;
}
let (smallest, largest) = Self::total_key_range(all_inputs);
if smallest.is_empty() {
return true;
}
let smallest_uk = user_key(&smallest);
let largest_uk = user_key(&largest);
for level in (target_level + 1)..num_levels {
for f in version.level_files(level) {
let f_smallest = user_key(&f.meta.smallest_key);
let f_largest = user_key(&f.meta.largest_key);
if f_largest >= smallest_uk && f_smallest <= largest_uk {
return false;
}
}
}
true
}
fn overlapping_files(files: &[TableFile], smallest: &[u8], largest: &[u8]) -> Vec<TableFile> {
let smallest_uk = user_key(smallest);
let largest_uk = user_key(largest);
files
.iter()
.filter(|f| {
let file_largest_uk = user_key(&f.meta.largest_key);
let file_smallest_uk = user_key(&f.meta.smallest_key);
file_largest_uk >= smallest_uk && file_smallest_uk <= largest_uk
})
.cloned()
.collect()
}
}
#[cfg(test)]
mod tests {
use crate::db::DB;
use crate::options::DbOptions;
#[test]
fn test_compaction_trigger() {
let dir = tempfile::tempdir().unwrap();
let opts = DbOptions {
create_if_missing: true,
write_buffer_size: 512,
l0_compaction_trigger: 4,
target_file_size_base: 1024 * 1024,
..Default::default()
};
let db = DB::open(opts.clone(), dir.path()).unwrap();
for i in 0..200 {
let key = format!("key_{:06}", i);
let val = format!("value_{:040}", i);
db.put(key.as_bytes(), val.as_bytes()).unwrap();
}
let sst_count = std::fs::read_dir(dir.path())
.unwrap()
.filter(|e| {
e.as_ref()
.unwrap()
.file_name()
.to_string_lossy()
.ends_with(".sst")
})
.count();
assert!(sst_count > 0, "should have SST files");
for i in 0..200 {
let key = format!("key_{:06}", i);
let val = format!("value_{:040}", i);
assert_eq!(
db.get(key.as_bytes()).unwrap(),
Some(val.into_bytes()),
"failed at key {}",
i
);
}
}
#[test]
fn test_manual_compaction() {
let dir = tempfile::tempdir().unwrap();
let opts = DbOptions {
create_if_missing: true,
write_buffer_size: 512,
l0_compaction_trigger: 100, ..Default::default()
};
let db = DB::open(opts.clone(), dir.path()).unwrap();
for batch in 0..5 {
for i in 0..20 {
let key = format!("key_{:04}", batch * 20 + i);
let val = format!("val_{}", batch * 20 + i);
db.put(key.as_bytes(), val.as_bytes()).unwrap();
}
db.flush().unwrap();
}
db.compact().unwrap();
for i in 0..100 {
let key = format!("key_{:04}", i);
let val = format!("val_{}", i);
assert_eq!(
db.get(key.as_bytes()).unwrap(),
Some(val.into_bytes()),
"failed at key {} after compaction",
i
);
}
}
#[test]
fn test_compaction_removes_tombstones() {
let dir = tempfile::tempdir().unwrap();
let opts = DbOptions {
create_if_missing: true,
write_buffer_size: 512,
l0_compaction_trigger: 100,
num_levels: 2, ..Default::default()
};
let db = DB::open(opts, dir.path()).unwrap();
for i in 0..20 {
let key = format!("key_{:04}", i);
db.put(key.as_bytes(), b"value").unwrap();
}
db.flush().unwrap();
for i in 0..10 {
let key = format!("key_{:04}", i);
db.delete(key.as_bytes()).unwrap();
}
db.flush().unwrap();
db.compact().unwrap();
for i in 0..10 {
let key = format!("key_{:04}", i);
assert_eq!(db.get(key.as_bytes()).unwrap(), None);
}
for i in 10..20 {
let key = format!("key_{:04}", i);
assert_eq!(db.get(key.as_bytes()).unwrap(), Some(b"value".to_vec()));
}
}
#[test]
fn test_compaction_with_overwrites() {
let dir = tempfile::tempdir().unwrap();
let opts = DbOptions {
create_if_missing: true,
write_buffer_size: 512,
l0_compaction_trigger: 100,
..Default::default()
};
let db = DB::open(opts, dir.path()).unwrap();
for i in 0..20 {
let key = format!("key_{:04}", i);
db.put(key.as_bytes(), b"v1").unwrap();
}
db.flush().unwrap();
for i in 0..20 {
let key = format!("key_{:04}", i);
db.put(key.as_bytes(), b"v2").unwrap();
}
db.flush().unwrap();
db.compact().unwrap();
for i in 0..20 {
let key = format!("key_{:04}", i);
assert_eq!(
db.get(key.as_bytes()).unwrap(),
Some(b"v2".to_vec()),
"key {} should have value v2 after compaction",
i
);
}
}
#[test]
fn test_sub_compaction_correctness() {
let dir = tempfile::tempdir().unwrap();
let opts = DbOptions {
create_if_missing: true,
write_buffer_size: 512,
l0_compaction_trigger: 100,
target_file_size_base: 512, max_subcompactions: 4,
..Default::default()
};
let db = DB::open(opts, dir.path()).unwrap();
for batch in 0..8 {
for i in 0..50 {
let key = format!("key_{:06}", batch * 50 + i);
let val = format!("value_{:040}", batch * 50 + i);
db.put(key.as_bytes(), val.as_bytes()).unwrap();
}
db.flush().unwrap();
}
db.compact().unwrap();
for i in 0..400 {
let key = format!("key_{:06}", i);
let val = format!("value_{:040}", i);
assert_eq!(
db.get(key.as_bytes()).unwrap(),
Some(val.into_bytes()),
"key {} missing after sub-compaction",
i
);
}
}
#[test]
fn test_sub_compaction_with_deletions() {
let dir = tempfile::tempdir().unwrap();
let opts = DbOptions {
create_if_missing: true,
write_buffer_size: 512,
l0_compaction_trigger: 100,
target_file_size_base: 512,
max_subcompactions: 4,
num_levels: 2, ..Default::default()
};
let db = DB::open(opts, dir.path()).unwrap();
for i in 0..100 {
let key = format!("key_{:06}", i);
db.put(key.as_bytes(), b"v1").unwrap();
}
db.flush().unwrap();
for i in (0..100).step_by(2) {
let key = format!("key_{:06}", i);
db.delete(key.as_bytes()).unwrap();
}
db.flush().unwrap();
for i in (1..100).step_by(2) {
let key = format!("key_{:06}", i);
db.put(key.as_bytes(), b"v2").unwrap();
}
db.flush().unwrap();
db.compact().unwrap();
for i in (0..100).step_by(2) {
let key = format!("key_{:06}", i);
assert_eq!(
db.get(key.as_bytes()).unwrap(),
None,
"key {} should be deleted",
i
);
}
for i in (1..100).step_by(2) {
let key = format!("key_{:06}", i);
assert_eq!(
db.get(key.as_bytes()).unwrap(),
Some(b"v2".to_vec()),
"key {} should be v2",
i
);
}
}
#[test]
fn test_sub_compaction_with_range_tombstones() {
let dir = tempfile::tempdir().unwrap();
let opts = DbOptions {
create_if_missing: true,
write_buffer_size: 512,
l0_compaction_trigger: 100,
target_file_size_base: 512,
max_subcompactions: 4,
num_levels: 2,
..Default::default()
};
let db = DB::open(opts, dir.path()).unwrap();
for i in 0..200 {
let key = format!("key_{:06}", i);
db.put(key.as_bytes(), b"val").unwrap();
}
db.flush().unwrap();
db.delete_range(b"key_000050", b"key_000150").unwrap();
db.flush().unwrap();
db.compact().unwrap();
for i in 0..50 {
let key = format!("key_{:06}", i);
assert_eq!(
db.get(key.as_bytes()).unwrap(),
Some(b"val".to_vec()),
"key {} should survive",
i
);
}
for i in 150..200 {
let key = format!("key_{:06}", i);
assert_eq!(
db.get(key.as_bytes()).unwrap(),
Some(b"val".to_vec()),
"key {} should survive",
i
);
}
for i in 50..150 {
let key = format!("key_{:06}", i);
assert_eq!(
db.get(key.as_bytes()).unwrap(),
None,
"key {} should be deleted",
i
);
}
}
#[test]
fn test_sub_compaction_with_snapshots() {
let dir = tempfile::tempdir().unwrap();
let opts = DbOptions {
create_if_missing: true,
write_buffer_size: 512,
l0_compaction_trigger: 100,
target_file_size_base: 512,
max_subcompactions: 4,
..Default::default()
};
let db = DB::open(opts, dir.path()).unwrap();
for i in 0..100 {
let key = format!("key_{:06}", i);
db.put(key.as_bytes(), b"v1").unwrap();
}
db.flush().unwrap();
let snap = db.snapshot();
let snap_opts = snap.read_options();
for i in 0..100 {
let key = format!("key_{:06}", i);
db.put(key.as_bytes(), b"v2").unwrap();
}
db.flush().unwrap();
db.compact().unwrap();
for i in 0..100 {
let key = format!("key_{:06}", i);
assert_eq!(
db.get(key.as_bytes()).unwrap(),
Some(b"v2".to_vec()),
"current key {} should be v2",
i
);
}
for i in 0..100 {
let key = format!("key_{:06}", i);
assert_eq!(
db.get_with_options(&snap_opts, key.as_bytes()).unwrap(),
Some(b"v1".to_vec()),
"snapshot key {} should be v1",
i
);
}
drop(snap);
}
#[test]
fn test_sub_compaction_end_to_end() {
let dir = tempfile::tempdir().unwrap();
let opts = DbOptions {
create_if_missing: true,
write_buffer_size: 512,
l0_compaction_trigger: 100,
target_file_size_base: 512,
max_subcompactions: 4,
..Default::default()
};
let db = DB::open(opts, dir.path()).unwrap();
for batch in 0..10 {
for i in 0..50 {
let key = format!("key_{:06}", batch * 50 + i);
let val = format!("value_{:040}", batch * 50 + i);
db.put(key.as_bytes(), val.as_bytes()).unwrap();
}
db.flush().unwrap();
}
db.compact().unwrap();
for batch in 0..5 {
for i in 0..50 {
let key = format!("key_{:06}", batch * 50 + i);
let val = format!("updated_{:040}", batch * 50 + i);
db.put(key.as_bytes(), val.as_bytes()).unwrap();
}
db.flush().unwrap();
}
db.compact().unwrap();
for i in 0..250 {
let key = format!("key_{:06}", i);
let val = format!("updated_{:040}", i);
assert_eq!(
db.get(key.as_bytes()).unwrap(),
Some(val.into_bytes()),
"key {} should have updated value",
i
);
}
for i in 250..500 {
let key = format!("key_{:06}", i);
let val = format!("value_{:040}", i);
assert_eq!(
db.get(key.as_bytes()).unwrap(),
Some(val.into_bytes()),
"key {} should have original value",
i
);
}
}
}