use super::{CompactionAction, CompactionResult, CompactionStrategy, Input as CompactionPayload};
use crate::{
BlobFile, Config, HashSet, InternalValue, SeqNo, SequenceNumberCounter,
SharedSequenceNumberGenerator, Table, TableId, UserKey,
blob_tree::FragmentationMap,
compaction::{
Choice,
filter::{Context, StreamFilterAdapter},
flavour::{RelocatingCompaction, StandardCompaction},
state::CompactionState,
stream::CompactionStream,
},
file::BLOBS_FOLDER,
merge::Merger,
run_scanner::RunScanner,
stop_signal::StopSignal,
tree::inner::TreeId,
version::{Run, SuperVersions, Version},
vlog::{BlobFileMergeScanner, BlobFileScanner, BlobFileWriter},
};
use std::{
sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard},
time::Instant,
};
#[cfg(feature = "metrics")]
use crate::metrics::Metrics;
pub type CompactionReader<'a> = Box<dyn Iterator<Item = crate::Result<InternalValue>> + 'a>;
#[cfg(feature = "std")]
pub const SUBCOMPACTION_MIN_INPUT_BYTES: u64 = 8 * 1024 * 1024;
#[derive(Clone)]
pub struct Options {
pub tree_id: TreeId,
pub global_seqno: SharedSequenceNumberGenerator,
pub visible_seqno: SharedSequenceNumberGenerator,
pub table_id_generator: SequenceNumberCounter,
pub blob_file_id_generator: SequenceNumberCounter,
pub config: Arc<Config>,
pub version_history: Arc<RwLock<SuperVersions>>,
pub strategy: Arc<dyn CompactionStrategy>,
pub stop_signal: StopSignal,
pub mvcc_gc_watermark: u64,
pub compaction_state: Arc<Mutex<CompactionState>>,
pub runtime_config: Arc<crate::runtime_config::handle::RuntimeConfigHandle>,
pub encryption: Option<Arc<dyn crate::encryption::EncryptionProvider>>,
pub rate_limiter: Arc<crate::rate_limiter::RateLimiter>,
#[cfg(feature = "metrics")]
pub metrics: Arc<Metrics>,
}
impl Options {
pub fn from_tree(tree: &crate::Tree, strategy: Arc<dyn CompactionStrategy>) -> Self {
Self {
global_seqno: tree.config.seqno.clone(),
visible_seqno: tree.config.visible_seqno.clone(),
tree_id: tree.id,
table_id_generator: tree.table_id_counter.clone(),
blob_file_id_generator: tree.blob_file_id_counter.clone(),
config: tree.config.clone(),
version_history: tree.version_history.clone(),
stop_signal: tree.stop_signal.clone(),
strategy,
mvcc_gc_watermark: 0,
compaction_state: tree.compaction_state.clone(),
runtime_config: tree.runtime_config.clone(),
encryption: tree.config.encryption.clone(),
rate_limiter: Arc::new(crate::rate_limiter::RateLimiter::new(
tree.config.compaction_rate_limit,
)),
#[cfg(feature = "metrics")]
metrics: tree.metrics.clone(),
}
}
}
pub fn do_compaction(opts: &Options) -> crate::Result<CompactionResult> {
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let compaction_state = opts.compaction_state.lock().expect("lock is poisoned");
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let version_history_lock = opts.version_history.read().expect("lock is poisoned");
let start = Instant::now();
log::trace!(
"Consulting compaction strategy {:?}",
opts.strategy.get_name(),
);
let choice = opts.strategy.choose(
&version_history_lock.latest_version().version,
&opts.config,
&compaction_state,
);
log::debug!("Compaction choice: {choice:?} in {:?}", start.elapsed());
match choice {
Choice::Merge(payload) => {
merge_tables(compaction_state, version_history_lock, opts, &payload)
}
Choice::Move(payload) => {
if opts.config.level_routes.is_some() {
let (dst_folder, _) = opts.config.tables_folder_for_level(payload.dest_level);
let version = &version_history_lock.latest_version().version;
let cross_folder = version
.iter_levels()
.flat_map(|level| level.iter())
.flat_map(|run| run.iter())
.filter(|t| payload.table_ids.contains(&t.id()))
.any(|t| t.path.parent() != Some(dst_folder.as_path()));
if cross_folder {
log::debug!("Converting trivial move to merge: cross-folder level routing");
return merge_tables(compaction_state, version_history_lock, opts, &payload);
}
}
drop(version_history_lock);
move_tables(&compaction_state, opts, &payload)
}
Choice::Drop(payload) => {
drop(version_history_lock);
let ids = payload.into_iter().collect::<Vec<_>>();
drop_tables(compaction_state, opts, &ids)
}
Choice::DoNothing => {
log::trace!("Compactor chose to do nothing");
Ok(CompactionResult::nothing())
}
}
}
fn pick_run_indexes(run: &Run<Table>, to_compact: &[TableId]) -> Option<(usize, usize)> {
let lo = run
.iter()
.position(|table| to_compact.contains(&table.id()))?;
let hi = run
.iter()
.rposition(|table| to_compact.contains(&table.id()))?;
Some((lo, hi))
}
fn create_compaction_stream<'a>(
version: &Version,
to_compact: &[TableId],
eviction_seqno: SeqNo,
merge_operator: Option<Arc<dyn crate::merge_operator::MergeOperator>>,
comparator: crate::comparator::SharedComparator,
) -> crate::Result<Option<CompactionStream<'a, Merger<CompactionReader<'a>>>>> {
let mut readers: Vec<CompactionReader<'_>> = vec![];
let mut found = 0;
for run in version.iter_levels().flat_map(|lvl| lvl.iter()) {
if run.len() > 1 {
let Some((lo, hi)) = pick_run_indexes(run, to_compact) else {
continue;
};
readers.push(Box::new(RunScanner::culled(
run.clone(),
(Some(lo), Some(hi)),
)?));
found += hi - lo + 1;
} else {
for table in run.iter().filter(|x| to_compact.contains(&x.metadata.id)) {
found += 1;
readers.push(Box::new(table.scan()?));
}
}
}
Ok(if found == to_compact.len() {
Some(
CompactionStream::new(Merger::new(readers, comparator), eviction_seqno)
.with_merge_operator(merge_operator),
)
} else {
None
})
}
#[cfg(feature = "std")]
fn create_bounded_compaction_stream<'a>(
version: &'a Version,
to_compact: &HashSet<TableId>,
bounds: (std::ops::Bound<UserKey>, std::ops::Bound<UserKey>),
eviction_seqno: SeqNo,
merge_operator: Option<Arc<dyn crate::merge_operator::MergeOperator>>,
comparator: crate::comparator::SharedComparator,
) -> Option<CompactionStream<'a, Merger<CompactionReader<'a>>>> {
let mut readers: Vec<CompactionReader<'_>> = vec![];
let mut found = 0;
for run in version.iter_levels().flat_map(|lvl| lvl.iter()) {
for table in run.iter().filter(|x| to_compact.contains(&x.metadata.id)) {
found += 1;
readers.push(Box::new(table.range(bounds.clone())));
}
}
if found == to_compact.len() {
Some(
CompactionStream::new(Merger::new(readers, comparator), eviction_seqno)
.with_merge_operator(merge_operator),
)
} else {
None
}
}
#[cfg(feature = "std")]
fn collect_version_tombstones(version: &Version) -> Vec<crate::range_tombstone::RangeTombstone> {
version
.iter_levels()
.flat_map(|level| level.iter())
.flat_map(|run| run.iter())
.flat_map(|t| t.range_tombstones().iter().cloned())
.collect()
}
#[cfg(feature = "std")]
fn range_tombstones_after_gc(
input_rts: &[crate::range_tombstone::RangeTombstone],
version: &Version,
input_ids: &HashSet<TableId>,
watermark: SeqNo,
is_last_level: bool,
comparator: &crate::comparator::SharedComparator,
) -> Vec<crate::range_tombstone::RangeTombstone> {
if !is_last_level {
return input_rts.to_vec();
}
let cmp = comparator.as_ref();
input_rts
.iter()
.filter(|rt| {
if !rt.visible_at(watermark) {
return true;
}
version
.iter_levels()
.flat_map(|level| level.iter())
.flat_map(|run| run.iter())
.filter(|t| !input_ids.contains(&t.id()))
.any(|t| {
let kr = &t.metadata.key_range;
cmp.compare(&rt.start, kr.max()) != std::cmp::Ordering::Greater
&& cmp.compare(kr.min(), &rt.end) == std::cmp::Ordering::Less
})
})
.cloned()
.collect()
}
fn boundary_candidates(
mut keys: Vec<UserKey>,
comparator: &crate::comparator::SharedComparator,
) -> Vec<UserKey> {
if keys.len() < 2 {
return Vec::new();
}
keys.sort_by(|a, b| comparator.compare(a, b));
keys.dedup_by(|a, b| comparator.compare(a, b).is_eq());
keys.pop();
keys
}
#[cfg(feature = "std")]
fn subcompaction_boundaries(
version: &Version,
dest_level: usize,
max_ranges: usize,
comparator: &crate::comparator::SharedComparator,
) -> Vec<UserKey> {
if max_ranges < 2 {
return Vec::new();
}
let Some(level) = version.level(dest_level) else {
return Vec::new();
};
let keys: Vec<UserKey> = level
.iter()
.flat_map(|run| run.iter())
.map(|t| t.metadata.key_range.max().clone())
.collect();
let keys = boundary_candidates(keys, comparator);
if keys.is_empty() {
return Vec::new();
}
let want = (max_ranges - 1).min(keys.len());
if want == keys.len() {
return keys;
}
let mut out = Vec::with_capacity(want);
for i in 1..=want {
let idx = ((i * keys.len()) / (want + 1)).min(keys.len() - 1);
if let Some(key) = keys.get(idx) {
out.push(key.clone());
}
}
out.dedup();
out
}
#[cfg(feature = "std")]
fn ranges_from_boundaries(
boundaries: &[UserKey],
) -> Vec<(std::ops::Bound<UserKey>, std::ops::Bound<UserKey>)> {
use std::ops::Bound::{Excluded, Included, Unbounded};
let mut ranges = Vec::with_capacity(boundaries.len() + 1);
let mut lo = Unbounded;
for b in boundaries {
ranges.push((lo.clone(), Excluded(b.clone())));
lo = Included(b.clone());
}
ranges.push((lo, Unbounded));
ranges
}
#[cfg(feature = "std")]
fn cancelled_compaction() -> crate::Error {
crate::Error::Io(std::io::Error::new(
std::io::ErrorKind::Interrupted,
"sub-compaction cancelled by stop signal",
))
}
#[cfg(feature = "std")]
#[expect(
clippy::too_many_arguments,
reason = "a sub-compaction needs the full compaction context (opts, payload, \
version, inputs, range, level info, blob folder) threaded in by value/ref \
so it can run on its own thread; bundling into a struct would just move \
the argument list"
)]
fn run_subcompaction(
opts: &Options,
payload: &CompactionPayload,
version: &Version,
tables_for_deletion: Vec<Table>,
input_range_tombstones: &[crate::range_tombstone::RangeTombstone],
bounds: (std::ops::Bound<UserKey>, std::ops::Bound<UserKey>),
dst_lvl: usize,
is_last_level: bool,
blobs_folder: &std::path::Path,
) -> crate::Result<super::flavour::ProducedOutput> {
use super::flavour::CompactionFlavour;
#[cfg(test)]
if opts
.config
.fail_one_subcompaction
.swap(false, std::sync::atomic::Ordering::SeqCst)
{
return Err(cancelled_compaction());
}
let mut blob_frag_map = FragmentationMap::default();
let Some(mut merge_iter) = create_bounded_compaction_stream(
version,
&payload.table_ids,
bounds,
opts.mvcc_gc_watermark,
opts.config.merge_operator.clone(),
opts.config.comparator.clone(),
) else {
return Err(crate::Error::Io(std::io::Error::other(
"sub-compaction input tables disappeared mid-flight",
)));
};
let version_tombstones = if is_last_level {
collect_version_tombstones(version)
} else {
Vec::new()
};
merge_iter = merge_iter
.evict_tombstones(is_last_level)
.zero_seqnos(false);
if is_last_level {
merge_iter = merge_iter.with_range_tombstone_application(
version_tombstones.clone(),
opts.config.comparator.clone(),
);
}
let filter_ctx = Context { is_last_level };
let mut compaction_filter = opts
.config
.compaction_filter_factory
.as_ref()
.map(|f| f.make_filter(&filter_ctx));
if opts.config.kv_separation_opts.is_some() {
merge_iter = merge_iter.with_drop_callback(&mut blob_frag_map);
}
let mut filter_blob_writer = None;
let merge_iter = merge_iter.with_filter(StreamFilterAdapter::new(
compaction_filter.as_deref_mut(),
opts,
version,
blobs_folder,
&mut filter_blob_writer,
&filter_ctx,
));
let merge_iter = super::seqno_zeroer::BottommostSeqnoZeroer::new(
merge_iter,
is_last_level,
version_tombstones,
opts.mvcc_gc_watermark,
opts.config.comparator.clone(),
);
let table_writer = super::flavour::prepare_table_writer(version, opts, payload, false)?;
let mut compactor: Box<dyn CompactionFlavour> =
Box::new(StandardCompaction::new(table_writer, tables_for_deletion));
let output_rts = range_tombstones_after_gc(
input_range_tombstones,
version,
&payload.table_ids,
opts.mvcc_gc_watermark,
is_last_level,
&opts.config.comparator,
);
if !output_rts.is_empty() {
compactor.write_range_tombstones(&output_rts);
}
for (idx, item) in merge_iter.enumerate() {
let item = item?;
let io_bytes = (item.key.user_key.len() as u64).saturating_add(item.value.len() as u64);
if opts
.rate_limiter
.request_interruptible(io_bytes, || opts.stop_signal.is_stopped())
{
return Err(cancelled_compaction());
}
compactor.write(item)?;
if idx % 1_000_000 == 0 && opts.stop_signal.is_stopped() {
return Err(cancelled_compaction());
}
}
if let Some(filter) = compaction_filter {
filter.finish();
}
let extra_blob_files = filter_blob_writer
.map(BlobFileWriter::finish)
.transpose()?
.unwrap_or_default();
let rollback_extra_blob_files = extra_blob_files.clone();
compactor
.produce(opts, dst_lvl, blob_frag_map, extra_blob_files)
.inspect_err(|_| {
for blob_file in &rollback_extra_blob_files {
blob_file.mark_as_deleted();
}
})
}
#[expect(
clippy::significant_drop_tightening,
reason = "version_history_lock must be held across upgrade_version and maintenance"
)]
fn move_tables(
compaction_state: &MutexGuard<'_, CompactionState>,
opts: &Options,
payload: &CompactionPayload,
) -> crate::Result<CompactionResult> {
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let mut version_history_lock = opts.version_history.write().expect("lock is poisoned");
if compaction_state
.hidden_set()
.should_decline_compaction(payload.table_ids.iter().copied())
{
log::warn!(
"Compaction task created by {:?} contained hidden tables, declining to run it - please report this at https://github.com/fjall-rs/lsm-tree/issues/new?template=bug_report.md",
opts.strategy.get_name(),
);
return Ok(CompactionResult::nothing());
}
let table_count = payload.table_ids.len();
let table_ids = payload.table_ids.iter().copied().collect::<Vec<_>>();
version_history_lock.upgrade_version(
&opts.config.path,
|current| {
let mut copy = current.clone();
let ctx = crate::version::TransformContext::new(opts.config.comparator.as_ref());
copy.version = copy
.version
.with_moved(&table_ids, payload.dest_level as usize, &ctx);
Ok(copy)
},
&opts.global_seqno,
&opts.visible_seqno,
&*opts.config.fs,
opts.runtime_config.load_full(),
opts.encryption.clone(),
)?;
if let Err(e) = version_history_lock.maintenance(
&opts.config.path,
opts.mvcc_gc_watermark,
&*opts.config.fs,
) {
log::error!("Manifest maintenance failed: {e:?}");
return Err(e);
}
Ok(CompactionResult {
action: CompactionAction::Moved,
dest_level: Some(payload.dest_level),
tables_in: table_count,
tables_out: table_count,
})
}
fn pick_blob_files_to_rewrite(
picked_tables: &HashSet<TableId>,
current_version: &Version,
blob_opts: &crate::KvSeparationOptions,
) -> crate::Result<Vec<BlobFile>> {
use crate::Table;
let linked_blob_files = picked_tables
.iter()
.map(|&id| {
current_version.get_table(id).unwrap_or_else(|| {
panic!("Table {id} should exist");
})
})
.map(Table::list_blob_file_references)
.collect::<Result<Vec<_>, _>>()?;
let mut linked_blob_files = linked_blob_files
.into_iter()
.flatten()
.flatten()
.map(|blob_file_ref| {
current_version
.blob_files
.get(blob_file_ref.blob_file_id)
.unwrap_or_else(|| {
panic!("Blob file {} should exist", blob_file_ref.blob_file_id);
})
})
.filter(|blob_file| {
blob_file.is_stale(current_version.gc_stats(), blob_opts.staleness_threshold)
})
.filter(|blob_file| {
!blob_file.is_dead(current_version.gc_stats())
})
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>();
linked_blob_files.sort_by_key(|a| a.id());
#[expect(
clippy::cast_precision_loss,
clippy::cast_possible_truncation,
clippy::cast_sign_loss,
reason = "precision loss and truncation are acceptable for cutoff calculation"
)]
let cutoff_point = {
let len = linked_blob_files.len() as f32;
(len * blob_opts.age_cutoff) as usize
};
linked_blob_files.drain(cutoff_point..);
for table in current_version.iter_tables() {
if picked_tables.contains(&table.id()) {
continue;
}
let other_refs = table
.list_blob_file_references()?
.unwrap_or_default()
.into_iter()
.filter(|x| linked_blob_files.iter().any(|bf| bf.id() == x.blob_file_id))
.collect::<Vec<_>>();
for additional_ref in other_refs {
linked_blob_files.retain(|x| x.id() != additional_ref.blob_file_id);
}
}
Ok(linked_blob_files.into_iter().cloned().collect::<Vec<_>>())
}
fn hidden_guard<T>(
payload: &CompactionPayload,
opts: &Options,
f: impl FnOnce() -> crate::Result<T>,
) -> crate::Result<T> {
f().inspect_err(|e| {
log::error!("Compaction failed: {e:?}");
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let mut compaction_state = opts.compaction_state.lock().expect("lock is poisoned");
compaction_state
.hidden_set_mut()
.show(payload.table_ids.iter().copied());
})
}
#[expect(clippy::too_many_lines)]
fn merge_tables(
mut compaction_state: MutexGuard<'_, CompactionState>,
version_history_lock: RwLockReadGuard<'_, SuperVersions>,
opts: &Options,
payload: &CompactionPayload,
) -> crate::Result<CompactionResult> {
if opts.stop_signal.is_stopped() {
log::debug!("Stopping before compaction because of stop signal");
return Ok(CompactionResult::nothing());
}
if compaction_state
.hidden_set()
.should_decline_compaction(payload.table_ids.iter().copied())
{
log::warn!(
"Compaction task created by {:?} contained hidden tables, declining to run it - please report this at https://github.com/fjall-rs/lsm-tree/issues/new?template=bug_report.md",
opts.strategy.get_name(),
);
return Ok(CompactionResult::nothing());
}
let current_super_version = Arc::new(version_history_lock.latest_version());
let Some(tables) = payload
.table_ids
.iter()
.map(|&id| current_super_version.version.get_table(id).cloned())
.collect::<Option<Vec<_>>>()
else {
log::warn!(
"Compaction task created by {:?} contained tables not referenced in the level manifest",
opts.strategy.get_name(),
);
return Ok(CompactionResult::nothing());
};
let tables_in = payload.table_ids.len();
let mut input_range_tombstones: Vec<crate::range_tombstone::RangeTombstone> = tables
.iter()
.flat_map(|t| t.range_tombstones().iter().cloned())
.collect();
input_range_tombstones.sort();
input_range_tombstones.dedup();
#[cfg(feature = "std")]
{
let threads = opts.config.compaction_threads;
let dst_lvl: usize = payload.canonical_level.into();
let is_last_level = payload.dest_level == opts.config.level_count - 1;
let relocating = match &opts.config.kv_separation_opts {
Some(blob_opts) => !pick_blob_files_to_rewrite(
&payload.table_ids,
¤t_super_version.version,
blob_opts,
)?
.is_empty(),
None => false,
};
let total_input_bytes: u64 = tables.iter().map(Table::file_size).sum();
let boundaries = if threads > 1
&& !relocating
&& total_input_bytes >= opts.config.subcompaction_min_bytes
{
subcompaction_boundaries(
¤t_super_version.version,
payload.dest_level as usize,
threads,
&opts.config.comparator,
)
} else {
Vec::new()
};
if !boundaries.is_empty() {
let blobs_folder = opts.config.path.join(BLOBS_FOLDER);
let ranges = ranges_from_boundaries(&boundaries);
drop(version_history_lock);
compaction_state
.hidden_set_mut()
.hide(payload.table_ids.iter().copied());
drop(compaction_state);
let num_ranges = ranges.len();
let only_first_owns_inputs =
|idx: usize| if idx == 0 { tables.clone() } else { Vec::new() };
let outputs: Vec<crate::Result<super::flavour::ProducedOutput>> =
if let Some(spawner) = opts.config.compaction_pool.clone() {
let opts = Arc::new(opts.clone());
let payload = Arc::new(payload.clone());
let version = Arc::clone(¤t_super_version);
let rts = Arc::new(input_range_tombstones.clone());
let blobs = Arc::new(blobs_folder);
let (tx, rx) = std::sync::mpsc::channel();
for (idx, range) in ranges.iter().cloned().enumerate() {
let tx = tx.clone();
let opts = Arc::clone(&opts);
let payload = Arc::clone(&payload);
let version = Arc::clone(&version);
let rts = Arc::clone(&rts);
let blobs = Arc::clone(&blobs);
let tables_for_deletion = only_first_owns_inputs(idx);
spawner.spawn(Box::new(move || {
let out = run_subcompaction(
&opts,
&payload,
&version.version,
tables_for_deletion,
&rts,
range,
dst_lvl,
is_last_level,
&blobs,
);
let _ = tx.send((idx, out));
}));
}
drop(tx);
let mut slots: Vec<Option<crate::Result<super::flavour::ProducedOutput>>> =
(0..num_ranges).map(|_| None).collect();
for (idx, out) in rx {
if let Some(slot) = slots.get_mut(idx) {
*slot = Some(out);
}
}
slots
.into_iter()
.map(|slot| {
slot.unwrap_or_else(|| {
Err(crate::Error::Io(std::io::Error::other(
"sub-compaction worker did not report a result",
)))
})
})
.collect()
} else {
ranges
.iter()
.cloned()
.enumerate()
.map(|(idx, range)| {
run_subcompaction(
opts,
payload,
¤t_super_version.version,
only_first_owns_inputs(idx),
&input_range_tombstones,
range,
dst_lvl,
is_last_level,
&blobs_folder,
)
})
.collect()
};
let mut committed = Vec::with_capacity(outputs.len());
let mut first_err = None;
for out in outputs {
match out {
Ok(done) => committed.push(done),
Err(e) => {
first_err.get_or_insert(e);
}
}
}
if let Some(err) = first_err {
log::error!("Sub-compaction failed: {err:?}");
for done in &committed {
done.rollback_uninstalled();
}
{
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let mut state = opts.compaction_state.lock().expect("lock is poisoned");
state
.hidden_set_mut()
.show(payload.table_ids.iter().copied());
}
return Err(err);
}
let outputs = committed;
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let mut compaction_state = opts.compaction_state.lock().expect("lock is poisoned");
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let mut version_history_lock = opts.version_history.write().expect("lock is poisoned");
let tables_out =
super::flavour::install_merge(&mut version_history_lock, opts, payload, outputs)
.inspect_err(|e| {
log::error!("Sub-compaction install failed: {e:?}");
compaction_state
.hidden_set_mut()
.show(payload.table_ids.iter().copied());
})?;
compaction_state
.hidden_set_mut()
.show(payload.table_ids.iter().copied());
version_history_lock
.maintenance(&opts.config.path, opts.mvcc_gc_watermark, &*opts.config.fs)
.inspect_err(|e| log::error!("Manifest maintenance failed: {e:?}"))?;
drop(version_history_lock);
drop(compaction_state);
log::trace!("Parallel compaction done in {num_ranges} sub-ranges");
return Ok(CompactionResult {
action: CompactionAction::Merged,
dest_level: Some(payload.dest_level),
tables_in,
tables_out,
});
}
}
let mut blob_frag_map = FragmentationMap::default();
let Some(mut merge_iter) = create_compaction_stream(
¤t_super_version.version,
&payload.table_ids.iter().copied().collect::<Vec<_>>(),
opts.mvcc_gc_watermark,
opts.config.merge_operator.clone(),
opts.config.comparator.clone(),
)?
else {
log::warn!(
"Compaction task tried to compact tables that do not exist, declining to run it"
);
return Ok(CompactionResult::nothing());
};
let dst_lvl = payload.canonical_level.into();
let is_last_level = payload.dest_level == opts.config.level_count - 1;
merge_iter = merge_iter
.evict_tombstones(is_last_level)
.zero_seqnos(false);
#[cfg(feature = "std")]
let zeroing_tombstones = if is_last_level {
collect_version_tombstones(¤t_super_version.version)
} else {
Vec::new()
};
#[cfg(feature = "std")]
if is_last_level {
merge_iter = merge_iter.with_range_tombstone_application(
zeroing_tombstones.clone(),
opts.config.comparator.clone(),
);
}
let blobs_folder = opts.config.path.join(BLOBS_FOLDER);
let filter_ctx = Context { is_last_level };
let mut compaction_filter = opts.config.compaction_filter_factory.as_ref().map(|f| {
log::trace!("Installing custom compaction filter {:?}", f.name());
f.make_filter(&filter_ctx)
});
let mut filter_blob_writer = None;
let mut merge_iter = merge_iter.with_filter(StreamFilterAdapter::new(
compaction_filter.as_deref_mut(),
opts,
¤t_super_version.version,
&blobs_folder,
&mut filter_blob_writer,
&filter_ctx,
));
let table_writer =
super::flavour::prepare_table_writer(¤t_super_version.version, opts, payload, true)?;
let start = Instant::now();
let mut compactor = match &opts.config.kv_separation_opts {
Some(blob_opts) => {
merge_iter = merge_iter.with_drop_callback(&mut blob_frag_map);
let blob_files_to_rewrite = pick_blob_files_to_rewrite(
&payload.table_ids,
¤t_super_version.version,
blob_opts,
)?;
if blob_files_to_rewrite.is_empty() {
log::debug!("No blob relocation needed");
Box::new(StandardCompaction::new(table_writer, tables))
as Box<dyn super::flavour::CompactionFlavour>
} else {
log::debug!(
"Relocate blob files: {:?}",
blob_files_to_rewrite
.iter()
.map(BlobFile::id)
.collect::<Vec<_>>(),
);
let scanner = BlobFileMergeScanner::new(
blob_files_to_rewrite
.iter()
.map(|bf| BlobFileScanner::new(&bf.0.path, bf.id()))
.collect::<crate::Result<Vec<_>>>()?,
);
let writer = BlobFileWriter::new(
opts.blob_file_id_generator.clone(),
&blobs_folder,
opts.tree_id,
opts.config.descriptor_table.clone(),
opts.config.fs.clone(),
)?
.use_target_size(blob_opts.file_target_size)
.use_passthrough_compression(blob_opts.compression)
.use_sync_mode(opts.config.sync_mode);
let inner = StandardCompaction::new(table_writer, tables);
Box::new(RelocatingCompaction::new(
inner,
scanner.peekable(),
writer,
blob_files_to_rewrite,
opts.rate_limiter.clone(),
opts.stop_signal.clone(),
))
}
}
None => Box::new(StandardCompaction::new(table_writer, tables)),
};
log::trace!("Blob file GC preparation done in {:?}", start.elapsed());
drop(version_history_lock);
{
compaction_state
.hidden_set_mut()
.hide(payload.table_ids.iter().copied());
}
drop(compaction_state);
hidden_guard(payload, opts, || {
if !input_range_tombstones.is_empty() {
log::debug!(
"Propagating {} range tombstones to compaction output",
input_range_tombstones.len(),
);
compactor.write_range_tombstones(&input_range_tombstones);
}
#[cfg(feature = "std")]
let merge_iter = super::seqno_zeroer::BottommostSeqnoZeroer::new(
merge_iter,
is_last_level,
zeroing_tombstones,
opts.mvcc_gc_watermark,
opts.config.comparator.clone(),
);
for (idx, item) in merge_iter.enumerate() {
let item = item?;
let io_bytes = (item.key.user_key.len() as u64).saturating_add(item.value.len() as u64);
if opts
.rate_limiter
.request_interruptible(io_bytes, || opts.stop_signal.is_stopped())
{
log::debug!("Stopping amidst compaction because of stop signal (I/O throttle)");
return Ok(());
}
compactor.write(item)?;
if idx % 1_000_000 == 0 && opts.stop_signal.is_stopped() {
log::debug!("Stopping amidst compaction because of stop signal");
return Ok(());
}
}
Ok(())
})?;
if let Some(filter) = compaction_filter {
filter.finish();
}
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let mut compaction_state = opts.compaction_state.lock().expect("lock is poisoned");
log::trace!("Acquiring super version write lock");
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let mut version_history_lock = opts.version_history.write().expect("lock is poisoned");
log::trace!("Acquired super version write lock");
log::trace!("Blob fragmentation diff: {blob_frag_map:#?}");
let extra_blob_files = filter_blob_writer
.map(BlobFileWriter::finish)
.transpose()
.inspect_err(|e| {
log::error!("Compaction failed while finishing filter blob writer: {e:?}");
compaction_state
.hidden_set_mut()
.show(payload.table_ids.iter().copied());
})?
.unwrap_or_default();
let rollback_extra_blob_files = extra_blob_files.clone();
let produce_output = compactor
.produce(opts, dst_lvl, blob_frag_map, extra_blob_files)
.inspect_err(|e| {
log::error!("Compaction failed: {e:?}");
compaction_state
.hidden_set_mut()
.show(payload.table_ids.iter().copied());
for blob_file in &rollback_extra_blob_files {
blob_file.mark_as_deleted();
}
})?;
let tables_out = super::flavour::install_merge(
&mut version_history_lock,
opts,
payload,
vec![produce_output],
)
.inspect_err(|e| {
log::error!("Compaction failed: {e:?}");
compaction_state
.hidden_set_mut()
.show(payload.table_ids.iter().copied());
})?;
compaction_state
.hidden_set_mut()
.show(payload.table_ids.iter().copied());
version_history_lock
.maintenance(&opts.config.path, opts.mvcc_gc_watermark, &*opts.config.fs)
.inspect_err(|e| {
log::error!("Manifest maintenance failed: {e:?}");
})?;
drop(version_history_lock);
drop(compaction_state);
log::trace!("Compaction successful");
Ok(CompactionResult {
action: CompactionAction::Merged,
dest_level: Some(payload.dest_level),
tables_in,
tables_out,
})
}
fn drop_tables(
compaction_state: MutexGuard<'_, CompactionState>,
opts: &Options,
ids_to_drop: &[TableId],
) -> crate::Result<CompactionResult> {
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let mut version_history_lock = opts.version_history.write().expect("lock is poisoned");
if compaction_state
.hidden_set()
.should_decline_compaction(ids_to_drop.iter().copied())
{
log::warn!(
"Compaction task created by {:?} contained hidden tables, declining to run it - please report this at https://github.com/fjall-rs/lsm-tree/issues/new?template=bug_report.md",
opts.strategy.get_name(),
);
return Ok(CompactionResult::nothing());
}
let Some(tables) = ids_to_drop
.iter()
.map(|&id| {
version_history_lock
.latest_version()
.version
.get_table(id)
.cloned()
})
.collect::<Option<Vec<_>>>()
else {
log::warn!(
"Compaction task created by {:?} contained tables not referenced in the level manifest",
opts.strategy.get_name(),
);
return Ok(CompactionResult::nothing());
};
log::debug!("Dropping tables: {ids_to_drop:?}");
let mut dropped_blob_files = vec![];
version_history_lock.upgrade_version(
&opts.config.path,
|current| {
let mut copy = current.clone();
let ctx = crate::version::TransformContext::new(opts.config.comparator.as_ref());
copy.version = copy
.version
.with_dropped(ids_to_drop, &mut dropped_blob_files, &ctx)?;
Ok(copy)
},
&opts.global_seqno,
&opts.visible_seqno,
&*opts.config.fs,
opts.runtime_config.load_full(),
opts.encryption.clone(),
)?;
if let Err(e) = version_history_lock.maintenance(
&opts.config.path,
opts.mvcc_gc_watermark,
&*opts.config.fs,
) {
log::error!("Manifest maintenance failed: {e:?}");
return Err(e);
}
drop(version_history_lock);
for table in tables {
table.mark_as_deleted();
}
for blob_file in dropped_blob_files {
blob_file.mark_as_deleted();
}
let tables_dropped = ids_to_drop.len();
drop(compaction_state);
log::trace!("Dropped {tables_dropped} tables");
Ok(CompactionResult {
action: CompactionAction::Dropped,
dest_level: None,
tables_in: tables_dropped,
tables_out: 0,
})
}
#[cfg(test)]
mod tests {
use super::{create_compaction_stream, pick_run_indexes};
use crate::{
AbstractTree, Config, KvSeparationOptions, SequenceNumberCounter, TableId,
compaction::{Choice, CompactionStrategy, Input, state::CompactionState},
config::BlockSizePolicy,
version::Version,
};
use std::sync::Arc;
use test_log::test;
struct FirstByteComparator;
impl crate::comparator::UserComparator for FirstByteComparator {
fn name(&self) -> &'static str {
"test-first-byte"
}
fn compare(&self, a: &[u8], b: &[u8]) -> std::cmp::Ordering {
a.first().cmp(&b.first())
}
}
#[test]
fn boundary_candidates_dedups_comparator_equal_keys() {
let cmp: crate::comparator::SharedComparator = Arc::new(FirstByteComparator);
let keys = vec![
crate::UserKey::from("a1"),
crate::UserKey::from("a2"),
crate::UserKey::from("b1"),
];
let out = super::boundary_candidates(keys, &cmp);
assert_eq!(
out.len(),
1,
"comparator-equal keys must collapse to a single boundary candidate",
);
assert_eq!(
out.first().and_then(|k| k.first()),
Some(&b'a'),
"the surviving boundary should be from the deduped a-group",
);
}
#[cfg(feature = "parallel")]
#[test]
fn failed_subcompaction_rolls_back_and_restores_inputs() -> crate::Result<()> {
use std::sync::atomic::Ordering;
const N: u64 = 4_000;
let key = |i: u64| format!("key_{i:08}");
let val = |i: u64, generation: u64| format!("g{generation}-{i}-{}", "x".repeat(40));
let dir = tempfile::tempdir()?;
let config = Config::new(
&dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.data_block_size_policy(BlockSizePolicy::all(512))
.compaction_threads(4)
.subcompaction_min_bytes(0)
.with_kv_separation(Some(
KvSeparationOptions::default().separation_threshold(16),
));
let failpoint = config.fail_one_subcompaction.clone();
let tree = config.open()?;
for i in 0..N {
tree.insert(key(i), val(i, 0), i);
}
tree.flush_active_memtable(0)?;
tree.major_compact(4_096, 0)?;
for i in 0..N {
tree.insert(key(i), val(i, 1), N + i);
}
tree.flush_active_memtable(0)?;
let tables_before = tree.table_count();
failpoint.store(true, Ordering::SeqCst);
let result = tree.major_compact(u64::MAX, 0);
assert!(
result.is_err(),
"a failing sub-compaction range must abort the compaction",
);
assert!(
!failpoint.load(Ordering::SeqCst),
"the failpoint should have fired and disarmed itself",
);
assert_eq!(
tree.table_count(),
tables_before,
"rollback must leave nothing partially installed",
);
for i in 0..N {
assert_eq!(
tree.get(key(i), crate::MAX_SEQNO)?.as_deref(),
Some(val(i, 1).as_bytes()),
"value for {} must survive the rolled-back compaction",
key(i),
);
}
Ok(())
}
#[test]
fn last_level_applies_and_gcs_below_watermark_range_tombstone() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
&dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.data_block_size_policy(BlockSizePolicy::all(512))
.compaction_threads(4)
.subcompaction_min_bytes(0)
.open()?;
let key = |i: u64| format!("k{i:04}");
let val = |i: u64| format!("v{i}-{}", "x".repeat(40));
for i in 0..200u64 {
tree.insert(key(i), val(i), i);
}
tree.flush_active_memtable(0)?;
tree.major_compact(4_096, 0)?;
tree.remove_range(
crate::UserKey::from("k0000"),
crate::UserKey::from("k0050"),
1000,
);
for i in 50..200u64 {
tree.insert(key(i), val(i), 1001 + i);
}
tree.flush_active_memtable(0)?;
tree.major_compact(u64::MAX, 5000)?;
for i in 0..50u64 {
assert_eq!(
tree.get(key(i), crate::MAX_SEQNO)?,
None,
"covered key {} must be physically gone after GC",
key(i),
);
}
for i in 50..200u64 {
assert!(
tree.get(key(i), crate::MAX_SEQNO)?.is_some(),
"uncovered key {} must survive",
key(i),
);
}
let remaining = super::collect_version_tombstones(&tree.current_version());
assert!(
remaining.is_empty(),
"a fully-applied below-watermark tombstone must be GC'd, found {remaining:?}",
);
Ok(())
}
#[test]
fn above_watermark_range_tombstone_is_retained() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
&dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
let key = |i: u64| format!("k{i:04}");
for i in 0..50u64 {
tree.insert(key(i), "v", i);
}
tree.flush_active_memtable(0)?;
tree.remove_range(
crate::UserKey::from("k0000"),
crate::UserKey::from("k0025"),
100,
);
tree.flush_active_memtable(0)?;
tree.major_compact(u64::MAX, 50)?;
let remaining = super::collect_version_tombstones(&tree.current_version());
assert!(
!remaining.is_empty(),
"an above-watermark tombstone must be retained, not GC'd",
);
Ok(())
}
#[test]
fn range_tombstone_at_exact_watermark_is_not_applied_or_gced() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
&dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.data_block_size_policy(BlockSizePolicy::all(512))
.compaction_threads(4)
.subcompaction_min_bytes(0)
.open()?;
let key = |i: u64| format!("k{i:04}");
let val = |i: u64| format!("v{i}-{}", "x".repeat(40));
for i in 0..200u64 {
tree.insert(key(i), val(i), i);
}
tree.flush_active_memtable(0)?;
tree.major_compact(4_096, 0)?;
tree.remove_range(
crate::UserKey::from("k0000"),
crate::UserKey::from("k0050"),
1000,
);
for i in 50..200u64 {
tree.insert(key(i), val(i), 1001 + i);
}
tree.flush_active_memtable(0)?;
tree.major_compact(u64::MAX, 1000)?;
for i in 0..50u64 {
assert_eq!(
tree.get(key(i), 1000)?.as_deref(),
Some(val(i).as_bytes()),
"covered key {} must survive: RT@watermark is invisible at read==watermark",
key(i),
);
}
let remaining = super::collect_version_tombstones(&tree.current_version());
assert!(
!remaining.is_empty(),
"a tombstone at the exact watermark must be retained, not GC'd",
);
Ok(())
}
#[test]
fn compaction_stream_run_not_found() -> crate::Result<()> {
let folder = tempfile::tempdir()?;
let tree = crate::Config::new(
folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
tree.insert("a", "a", 0);
tree.flush_active_memtable(0)?;
assert!(
create_compaction_stream(
&tree.current_version(),
&[666],
0,
None,
crate::comparator::default_comparator()
)?
.is_none()
);
Ok(())
}
#[test]
#[expect(clippy::unwrap_used)]
fn compaction_stream_run() -> crate::Result<()> {
let folder = tempfile::tempdir()?;
let tree = crate::Config::new(
folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
tree.insert("a", "a", 0);
tree.flush_active_memtable(0)?;
tree.insert("b", "b", 0);
tree.flush_active_memtable(0)?;
tree.insert("c", "c", 0);
tree.flush_active_memtable(0)?;
assert_eq!(
Some((0, 2)),
pick_run_indexes(
tree.current_version()
.level(0)
.unwrap()
.iter()
.next()
.unwrap(),
&[0, 1, 2],
)
);
Ok(())
}
#[test]
#[expect(clippy::unwrap_used)]
fn compaction_stream_run_2() -> crate::Result<()> {
let folder = tempfile::tempdir()?;
let tree = crate::Config::new(
folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
tree.insert("a", "a", 0);
tree.flush_active_memtable(0)?;
tree.insert("b", "b", 0);
tree.flush_active_memtable(0)?;
tree.insert("c", "c", 0);
tree.flush_active_memtable(0)?;
assert_eq!(
Some((0, 0)),
pick_run_indexes(
tree.current_version()
.level(0)
.unwrap()
.iter()
.next()
.unwrap(),
&[0],
)
);
Ok(())
}
#[test]
#[expect(clippy::unwrap_used)]
fn compaction_stream_run_3() -> crate::Result<()> {
let folder = tempfile::tempdir()?;
let tree = crate::Config::new(
folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
tree.insert("a", "a", 0);
tree.flush_active_memtable(0)?;
tree.insert("b", "b", 0);
tree.flush_active_memtable(0)?;
tree.insert("c", "c", 0);
tree.flush_active_memtable(0)?;
assert_eq!(
Some((2, 2)),
pick_run_indexes(
tree.current_version()
.level(0)
.unwrap()
.iter()
.next()
.unwrap(),
&[2],
)
);
Ok(())
}
#[test]
#[expect(clippy::unwrap_used)]
fn compaction_stream_run_4() -> crate::Result<()> {
let folder = tempfile::tempdir()?;
let tree = crate::Config::new(
folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
tree.insert("a", "a", 0);
tree.flush_active_memtable(0)?;
tree.insert("b", "b", 0);
tree.flush_active_memtable(0)?;
tree.insert("c", "c", 0);
tree.flush_active_memtable(0)?;
assert_eq!(
None,
pick_run_indexes(
tree.current_version()
.level(0)
.unwrap()
.iter()
.next()
.unwrap(),
&[4],
)
);
Ok(())
}
#[test]
fn compaction_drop_tables() -> crate::Result<()> {
let folder = tempfile::tempdir()?;
let tree = crate::Config::new(
folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
tree.insert("a", "a", 0);
tree.flush_active_memtable(0)?;
assert_eq!(1, tree.approximate_len());
assert_eq!(0, tree.sealed_memtable_count());
tree.insert("b", "a", 1);
tree.flush_active_memtable(0)?;
assert_eq!(2, tree.approximate_len());
assert_eq!(0, tree.sealed_memtable_count());
tree.insert("c", "a", 2);
tree.flush_active_memtable(0)?;
assert_eq!(3, tree.approximate_len());
assert_eq!(0, tree.sealed_memtable_count());
tree.compact(Arc::new(crate::compaction::Fifo::new(1, None)), 3)?;
assert_eq!(0, tree.table_count());
Ok(())
}
#[test]
fn blob_file_picking_simple() -> crate::Result<()> {
struct InPlaceStrategy(Vec<TableId>);
impl CompactionStrategy for InPlaceStrategy {
fn get_name(&self) -> &'static str {
"InPlaceCompaction"
}
fn choose(&self, _: &Version, _: &Config, _: &CompactionState) -> Choice {
Choice::Merge(Input {
table_ids: self.0.iter().copied().collect(),
dest_level: 6,
target_size: 64_000_000,
canonical_level: 6, })
}
}
let folder = tempfile::tempdir()?;
let tree = crate::Config::new(
folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.data_block_size_policy(BlockSizePolicy::all(1))
.with_kv_separation(Some(
KvSeparationOptions::default()
.separation_threshold(1)
.age_cutoff(1.0)
.staleness_threshold(0.01)
.compression(crate::CompressionType::None),
))
.open()?;
tree.insert("a", "a", 0);
tree.insert("b", "b", 0);
tree.insert("c", "c", 0);
tree.flush_active_memtable(1_000)?;
assert_eq!(0, tree.sealed_memtable_count());
assert_eq!(1, tree.table_count());
assert_eq!(1, tree.blob_file_count());
tree.major_compact(1, 1_000)?;
assert_eq!(3, tree.table_count());
assert_eq!(1, tree.blob_file_count());
tree.drop_range("a"..="a")?;
assert_eq!(2, tree.table_count());
assert_eq!(1, tree.blob_file_count());
{
assert_eq!(
&{
let mut map = crate::HashMap::default();
map.insert(0, crate::blob_tree::FragmentationEntry::new(1, 1, 1));
map
},
&**tree.current_version().gc_stats(),
);
}
tree.compact(Arc::new(InPlaceStrategy(vec![2])), 1_000)?;
assert_eq!(2, tree.table_count());
assert_eq!(1, tree.blob_file_count());
{
assert_eq!(
&{
let mut map = crate::HashMap::default();
map.insert(0, crate::blob_tree::FragmentationEntry::new(1, 1, 1));
map
},
&**tree.current_version().gc_stats(),
);
}
tree.compact(Arc::new(InPlaceStrategy(vec![3, 4])), 1_000)?;
assert_eq!(1, tree.table_count());
assert_eq!(1, tree.blob_file_count());
{
assert_eq!(
crate::HashMap::default(),
**tree.current_version().gc_stats(),
);
}
Ok(())
}
}