use super::{CompactionAction, CompactionResult, CompactionStrategy, Input as CompactionPayload};
use crate::time::Instant;
use crate::tree::inner::{CompactionGuard, VersionsReadGuard};
use crate::{
BlobFile, Config, HashSet, InternalValue, SeqNo, SequenceNumberCounter,
SharedSequenceNumberGenerator, Table, TableId, UserKey,
blob_tree::FragmentationMap,
compaction::{
Choice,
filter::{Context, StreamFilterAdapter},
flavour::{RelocatingCompaction, StandardCompaction},
state::CompactionState,
stream::CompactionStream,
},
file::BLOBS_FOLDER,
merge::Merger,
run_scanner::RunScanner,
stop_signal::StopSignal,
tree::inner::TreeId,
version::{Run, SuperVersions, Version},
vlog::{BlobFileId, BlobFileMergeScanner, BlobFileScanner, BlobFileWriter},
};
use alloc::sync::Arc;
#[cfg(not(feature = "std"))]
use alloc::{boxed::Box, vec::Vec};
#[cfg(feature = "std")]
use parking_lot::{Mutex, RwLock};
#[cfg(not(feature = "std"))]
use spin::{Mutex, RwLock};
#[cfg(feature = "metrics")]
use crate::metrics::Metrics;
pub type CompactionReader<'a> = Box<dyn Iterator<Item = crate::Result<InternalValue>> + 'a>;
#[cfg(feature = "std")]
pub const SUBCOMPACTION_MIN_INPUT_BYTES: u64 = 8 * 1024 * 1024;
#[derive(Clone)]
pub struct Options {
pub tree_id: TreeId,
pub global_seqno: SharedSequenceNumberGenerator,
pub visible_seqno: SharedSequenceNumberGenerator,
pub table_id_generator: SequenceNumberCounter,
pub blob_file_id_generator: SequenceNumberCounter,
pub config: Arc<Config>,
pub version_history: Arc<RwLock<SuperVersions>>,
pub strategy: Arc<dyn CompactionStrategy>,
pub stop_signal: StopSignal,
pub mvcc_gc_watermark: u64,
pub compaction_state: Arc<Mutex<CompactionState>>,
pub runtime_config: Arc<crate::runtime_config::handle::RuntimeConfigHandle>,
pub encryption: Option<Arc<dyn crate::encryption::EncryptionProvider>>,
pub rate_limiter: Arc<crate::rate_limiter::RateLimiter>,
#[cfg(feature = "metrics")]
pub metrics: Arc<Metrics>,
}
impl Options {
pub fn from_tree(tree: &crate::Tree, strategy: Arc<dyn CompactionStrategy>) -> Self {
Self {
global_seqno: tree.config.seqno.clone(),
visible_seqno: tree.config.visible_seqno.clone(),
tree_id: tree.id,
table_id_generator: tree.table_id_counter.clone(),
blob_file_id_generator: tree.blob_file_id_counter.clone(),
config: tree.config.clone(),
version_history: tree.version_history.clone(),
stop_signal: tree.stop_signal.clone(),
strategy,
mvcc_gc_watermark: 0,
compaction_state: tree.compaction_state.clone(),
runtime_config: tree.runtime_config.clone(),
encryption: tree.config.encryption.clone(),
rate_limiter: Arc::new(crate::rate_limiter::RateLimiter::new(
tree.config.compaction_rate_limit,
)),
#[cfg(feature = "metrics")]
metrics: tree.metrics.clone(),
}
}
}
pub fn do_compaction(opts: &Options) -> crate::Result<CompactionResult> {
let compaction_state = opts.compaction_state.lock();
let version_history_lock = opts.version_history.read();
let start = Instant::now();
log::trace!(
"Consulting compaction strategy {:?}",
opts.strategy.get_name(),
);
let choice = opts.strategy.choose(
&version_history_lock.latest_version().version,
&opts.config,
&compaction_state,
);
log::debug!("Compaction choice: {choice:?} in {:?}", start.elapsed());
match choice {
Choice::Merge(payload) => {
let decision = {
let super_version = version_history_lock.latest_version();
space_gate_for_merge(&super_version.version, opts, &payload)?
};
match decision {
SpaceGate::Run => {
merge_tables(compaction_state, version_history_lock, opts, &payload)
}
SpaceGate::Narrowed(narrowed) => {
log::debug!(
"Compaction space gate: narrowed merge from {} to {} tables to fit free space",
payload.table_ids.len(),
narrowed.table_ids.len(),
);
merge_tables(compaction_state, version_history_lock, opts, &narrowed)
}
SpaceGate::Skip => {
#[cfg(feature = "std")]
if opts.runtime_config.load().tight_space_compaction {
return run_tight_space_compaction(
compaction_state,
version_history_lock,
opts,
&payload,
);
}
log::info!(
"Compaction space gate: skipping {}-table merge — free space cannot cover the transient output and no fitting subset exists (opt-in tight-space reclaim handles this)",
payload.table_ids.len(),
);
Ok(CompactionResult::nothing())
}
}
}
Choice::Move(payload) => {
if opts.config.level_routes.is_some() {
let (dst_folder, _) = opts.config.tables_folder_for_level(payload.dest_level);
let version = &version_history_lock.latest_version().version;
let cross_folder = version
.iter_levels()
.flat_map(|level| level.iter())
.flat_map(|run| run.iter())
.filter(|t| payload.table_ids.contains(&t.id()))
.any(|t| t.path.parent() != Some(dst_folder.as_path()));
if cross_folder {
log::debug!("Converting trivial move to merge: cross-folder level routing");
return merge_tables(compaction_state, version_history_lock, opts, &payload);
}
}
drop(version_history_lock);
move_tables(&compaction_state, opts, &payload)
}
Choice::Drop(payload) => {
drop(version_history_lock);
let ids = payload.into_iter().collect::<Vec<_>>();
drop_tables(compaction_state, opts, &ids)
}
Choice::DoNothing => {
log::trace!("Compactor chose to do nothing");
Ok(CompactionResult::nothing())
}
}
}
enum SpaceGate {
Run,
Narrowed(CompactionPayload),
Skip,
}
pub fn space_fits_two_layer(
config: &Config,
quota_headroom: u64,
sst_bytes: u64,
sst_dest_level: u8,
blob_bytes: u64,
) -> bool {
const RESERVE: u64 = crate::tree::MIN_RESERVED_HEADROOM;
if sst_bytes + blob_bytes > quota_headroom {
return false;
}
let volume_fits = |demand: u64, free: u64| -> bool {
(free >= RESERVE && demand <= free - RESERVE) || demand <= free
};
let (sst_path, sst_fs) = config.tables_folder_for_level(sst_dest_level);
let sst_free = sst_fs.available_space(&sst_path).unwrap_or(u64::MAX);
let blob_dir = config.path.join(BLOBS_FOLDER);
let blob_free = config.fs.available_space(&blob_dir).unwrap_or(u64::MAX);
let independent = match (sst_fs.volume_id(&sst_path), config.fs.volume_id(&blob_dir)) {
(Some(sst_vol), Some(blob_vol)) => sst_vol != blob_vol,
_ => false,
};
if independent {
volume_fits(sst_bytes, sst_free) && volume_fits(blob_bytes, blob_free)
} else {
volume_fits(sst_bytes + blob_bytes, sst_free.min(blob_free))
}
}
fn space_gate_for_merge(
version: &Version,
opts: &Options,
payload: &CompactionPayload,
) -> crate::Result<SpaceGate> {
let rc = opts.runtime_config.load_full();
if !rc.storage_admission_check {
return Ok(SpaceGate::Run);
}
let quota_headroom = match rc.storage_limit_bytes {
Some(limit) => limit.saturating_sub(crate::storage_stats::compute_used_bytes(version)?),
None => u64::MAX,
};
let fits = |p: &CompactionPayload| -> crate::Result<bool> {
let sst_sigma: u64 = p
.table_ids
.iter()
.filter_map(|&id| version.get_table(id))
.map(Table::file_size)
.sum();
let blob_sigma: u64 = match &opts.config.kv_separation_opts {
Some(blob_opts) => pick_blob_files_to_rewrite(&p.table_ids, version, blob_opts)?
.iter()
.try_fold(0u64, |acc, bf| bf.physical_size().map(|size| acc + size))?,
None => 0,
};
Ok(space_fits_two_layer(
&opts.config,
quota_headroom,
sst_sigma,
p.dest_level,
blob_sigma,
))
};
if fits(payload)? {
return Ok(SpaceGate::Run);
}
for narrowed in narrow_merge_candidates(version, payload) {
if fits(&narrowed)? {
return Ok(SpaceGate::Narrowed(narrowed));
}
}
Ok(SpaceGate::Skip)
}
fn narrow_merge_candidates(
version: &Version,
payload: &CompactionPayload,
) -> Vec<CompactionPayload> {
let Some(run) = version
.iter_levels()
.flat_map(|level| level.iter())
.find(|run| {
payload
.table_ids
.iter()
.all(|id| run.iter().any(|t| t.id() == *id))
})
else {
return Vec::new();
};
let run_tables: Vec<&Table> = run.iter().collect();
let mut candidates: Vec<(u64, CompactionPayload)> = Vec::new();
for pair in run_tables.windows(2) {
let [a, b] = pair else { continue };
if !payload.table_ids.contains(&a.id()) || !payload.table_ids.contains(&b.id()) {
continue;
}
let combined = a.file_size() + b.file_size();
candidates.push((
combined,
CompactionPayload {
table_ids: [a.id(), b.id()].into_iter().collect(),
dest_level: payload.dest_level,
canonical_level: payload.canonical_level,
target_size: payload.target_size,
},
));
}
candidates.sort_by_key(|(combined, _)| *combined);
candidates.into_iter().map(|(_, p)| p).collect()
}
fn pick_run_indexes(run: &Run<Table>, to_compact: &[TableId]) -> Option<(usize, usize)> {
let lo = run
.iter()
.position(|table| to_compact.contains(&table.id()))?;
let hi = run
.iter()
.rposition(|table| to_compact.contains(&table.id()))?;
Some((lo, hi))
}
fn create_compaction_stream<'a>(
version: &Version,
to_compact: &[TableId],
eviction_seqno: SeqNo,
merge_operator: Option<Arc<dyn crate::merge_operator::MergeOperator>>,
comparator: crate::comparator::SharedComparator,
) -> crate::Result<Option<CompactionStream<'a, Merger<CompactionReader<'a>>>>> {
let mut readers: Vec<CompactionReader<'_>> = vec![];
let mut found = 0;
for run in version.iter_levels().flat_map(|lvl| lvl.iter()) {
if run.len() > 1 {
let Some((lo, hi)) = pick_run_indexes(run, to_compact) else {
continue;
};
readers.push(Box::new(RunScanner::culled(
run.clone(),
(Some(lo), Some(hi)),
)?));
found += hi - lo + 1;
} else {
for table in run.iter().filter(|x| to_compact.contains(&x.metadata.id)) {
found += 1;
readers.push(Box::new(table.scan()?));
}
}
}
Ok(if found == to_compact.len() {
Some(
CompactionStream::new(Merger::new(readers, comparator), eviction_seqno)
.with_merge_operator(merge_operator),
)
} else {
None
})
}
#[cfg(feature = "std")]
fn create_bounded_compaction_stream<'a>(
version: &'a Version,
to_compact: &HashSet<TableId>,
bounds: (core::ops::Bound<UserKey>, core::ops::Bound<UserKey>),
eviction_seqno: SeqNo,
merge_operator: Option<Arc<dyn crate::merge_operator::MergeOperator>>,
comparator: crate::comparator::SharedComparator,
) -> Option<CompactionStream<'a, Merger<CompactionReader<'a>>>> {
let mut readers: Vec<CompactionReader<'_>> = vec![];
let mut found = 0;
for run in version.iter_levels().flat_map(|lvl| lvl.iter()) {
for table in run.iter().filter(|x| to_compact.contains(&x.metadata.id)) {
found += 1;
readers.push(Box::new(table.range(bounds.clone())));
}
}
if found == to_compact.len() {
Some(
CompactionStream::new(Merger::new(readers, comparator), eviction_seqno)
.with_merge_operator(merge_operator),
)
} else {
None
}
}
#[cfg(feature = "std")]
fn collect_version_tombstones(version: &Version) -> Vec<crate::range_tombstone::RangeTombstone> {
version
.iter_levels()
.flat_map(|level| level.iter())
.flat_map(|run| run.iter())
.flat_map(|t| t.range_tombstones().iter().cloned())
.collect()
}
#[cfg(feature = "std")]
fn range_tombstones_after_gc(
input_rts: &[crate::range_tombstone::RangeTombstone],
version: &Version,
input_ids: &HashSet<TableId>,
watermark: SeqNo,
is_last_level: bool,
comparator: &crate::comparator::SharedComparator,
) -> Vec<crate::range_tombstone::RangeTombstone> {
if !is_last_level {
return input_rts.to_vec();
}
let cmp = comparator.as_ref();
input_rts
.iter()
.filter(|rt| {
if !rt.visible_at(watermark) {
return true;
}
version
.iter_levels()
.flat_map(|level| level.iter())
.flat_map(|run| run.iter())
.filter(|t| !input_ids.contains(&t.id()))
.any(|t| {
let kr = &t.metadata.key_range;
cmp.compare(&rt.start, kr.max()) != core::cmp::Ordering::Greater
&& cmp.compare(kr.min(), &rt.end) == core::cmp::Ordering::Less
})
})
.cloned()
.collect()
}
fn boundary_candidates(
mut keys: Vec<UserKey>,
comparator: &crate::comparator::SharedComparator,
) -> Vec<UserKey> {
if keys.len() < 2 {
return Vec::new();
}
keys.sort_by(|a, b| comparator.compare(a, b));
keys.dedup_by(|a, b| comparator.compare(a, b).is_eq());
keys.pop();
keys
}
#[cfg(feature = "std")]
fn subcompaction_boundaries(
version: &Version,
dest_level: usize,
max_ranges: usize,
comparator: &crate::comparator::SharedComparator,
) -> Vec<UserKey> {
if max_ranges < 2 {
return Vec::new();
}
let Some(level) = version.level(dest_level) else {
return Vec::new();
};
let keys: Vec<UserKey> = level
.iter()
.flat_map(|run| run.iter())
.map(|t| t.metadata.key_range.max().clone())
.collect();
let keys = boundary_candidates(keys, comparator);
if keys.is_empty() {
return Vec::new();
}
let want = (max_ranges - 1).min(keys.len());
if want == keys.len() {
return keys;
}
let mut out = Vec::with_capacity(want);
for i in 1..=want {
let idx = ((i * keys.len()) / (want + 1)).min(keys.len() - 1);
if let Some(key) = keys.get(idx) {
out.push(key.clone());
}
}
out.dedup();
out
}
#[cfg(feature = "std")]
fn ranges_from_boundaries(
boundaries: &[UserKey],
) -> Vec<(core::ops::Bound<UserKey>, core::ops::Bound<UserKey>)> {
use core::ops::Bound::{Excluded, Included, Unbounded};
let mut ranges = Vec::with_capacity(boundaries.len() + 1);
let mut lo = Unbounded;
for b in boundaries {
ranges.push((lo.clone(), Excluded(b.clone())));
lo = Included(b.clone());
}
ranges.push((lo, Unbounded));
ranges
}
#[cfg(feature = "std")]
fn cancelled_compaction() -> crate::Error {
crate::Error::from(crate::io::Error::new(
crate::io::ErrorKind::Interrupted,
"sub-compaction cancelled by stop signal",
))
}
#[cfg(feature = "std")]
fn tight_slice_boundaries(
inputs: &[Table],
slice_budget: u64,
cmp: &dyn crate::comparator::UserComparator,
) -> crate::Result<Vec<UserKey>> {
use crate::table::block_index::BlockIndex;
let mut entries: Vec<(UserKey, u32)> = Vec::new();
for input in inputs {
for handle in input.block_index.iter() {
let handle = handle?;
entries.push((handle.end_key().clone(), handle.size()));
}
}
if entries.is_empty() {
return Ok(Vec::new());
}
entries.sort_by(|a, b| cmp.compare(&a.0, &b.0));
let global_max = entries.last().map(|(k, _)| k.clone());
let mut boundaries = Vec::new();
let mut acc = 0u64;
for (end_key, size) in &entries {
acc += u64::from(*size);
if acc >= slice_budget
&& global_max.as_ref() != Some(end_key)
&& boundaries.last() != Some(end_key)
{
boundaries.push(end_key.clone());
acc = 0;
}
}
Ok(boundaries)
}
#[cfg(feature = "std")]
fn slice_payload_for(views: &[Table], payload: &CompactionPayload) -> CompactionPayload {
CompactionPayload {
table_ids: views.iter().map(Table::id).collect(),
dest_level: payload.dest_level,
canonical_level: payload.canonical_level,
target_size: payload.target_size,
}
}
#[cfg(feature = "std")]
fn run_tight_space_compaction(
mut compaction_state: CompactionGuard<'_>,
version_history_lock: VersionsReadGuard<'_>,
opts: &Options,
payload: &CompactionPayload,
) -> crate::Result<CompactionResult> {
use core::ops::Bound;
let (dest_path, dest_fs) = opts.config.tables_folder_for_level(payload.dest_level);
if !dest_fs.capabilities(&dest_path).punch_hole {
log::info!("Tight-space compaction unavailable: backend lacks punch_hole");
return Ok(CompactionResult::nothing());
}
let latest = version_history_lock.latest_version();
let Some(inputs) = payload
.table_ids
.iter()
.map(|&id| latest.version.get_table(id).cloned())
.collect::<Option<Vec<_>>>()
else {
return Ok(CompactionResult::nothing());
};
let (stale_ids, stale_total_bytes): (Vec<BlobFileId>, u64) =
match &opts.config.kv_separation_opts {
Some(blob_opts) => {
let picked: HashSet<TableId> = payload.table_ids.iter().copied().collect();
let files = pick_blob_files_to_rewrite(&picked, &latest.version, blob_opts)?;
let mut total = 0u64;
for bf in &files {
total += bf.physical_size()?;
}
(files.iter().map(BlobFile::id).collect(), total)
}
None => (Vec::new(), 0),
};
let relocating = !stale_ids.is_empty();
drop(latest);
drop(version_history_lock);
if inputs.is_empty() {
return Ok(CompactionResult::nothing());
}
let tables_in = inputs.len();
let comparator = opts.config.comparator.clone();
let available = dest_fs.available_space(&dest_path).unwrap_or(u64::MAX);
let slice_budget = if relocating && stale_total_bytes > 0 {
let inputs_total: u64 = inputs.iter().map(Table::file_size).sum();
let scaled =
u128::from(available) * u128::from(inputs_total.max(1)) / u128::from(stale_total_bytes);
u64::try_from(scaled).unwrap_or(u64::MAX).max(1)
} else {
available.max(1)
};
let boundaries = tight_slice_boundaries(&inputs, slice_budget, comparator.as_ref())?;
if boundaries.is_empty() {
return Ok(CompactionResult::nothing());
}
compaction_state
.hidden_set_mut()
.hide(payload.table_ids.iter().copied());
let dst_lvl: usize = payload.canonical_level.into();
let is_last_level = payload.dest_level == opts.config.level_count - 1;
let blobs_folder = opts.config.path.join(BLOBS_FOLDER);
let mut rts: Vec<crate::range_tombstone::RangeTombstone> = inputs
.iter()
.flat_map(|t| t.range_tombstones().iter().cloned())
.collect();
rts.sort();
rts.dedup();
let mut current_views = inputs;
let mut lower: Bound<UserKey> = Bound::Unbounded;
let mut resume_offsets: crate::HashMap<BlobFileId, u64> = crate::HashMap::default();
let result = (|| -> crate::Result<usize> {
let mut tables_out = 0usize;
for boundary in &boundaries {
let slice_payload = slice_payload_for(¤t_views, payload);
let version = opts.version_history.read().latest_version();
let current_stale: Vec<BlobFile> = if relocating {
stale_ids
.iter()
.filter_map(|id| version.version.blob_files.get(*id).cloned())
.collect()
} else {
Vec::new()
};
let reloc = if relocating {
Some(RelocationSetup {
stale_files: current_stale.clone(),
resume_offsets: resume_offsets.clone(),
})
} else {
None
};
let produced = run_subcompaction(
opts,
&slice_payload,
&version.version,
Vec::new(),
&rts,
(lower.clone(), Bound::Excluded(boundary.clone())),
dst_lvl,
is_last_level,
&blobs_folder,
reloc,
)?;
drop(version);
let outputs: Vec<Table> = produced.created_tables().to_vec();
let mut new_blobs: Vec<BlobFile> = produced.created_blob_files().to_vec();
let frag = produced.blob_frag_map().clone();
let gc_diff = if frag.is_empty() { None } else { Some(frag) };
if relocating {
for (id, fe) in produced.consumed_through() {
let slot = resume_offsets.entry(*id).or_insert(0);
*slot = (*slot).max(*fe);
}
}
drop(produced);
let mut prior_to_punch: Vec<(BlobFile, u64)> = Vec::new();
if relocating {
for sf in ¤t_stale {
if let Some(off) = resume_offsets.get(&sf.id()).copied() {
new_blobs.push(sf.reopen()?);
prior_to_punch.push((sf.clone(), off));
}
}
}
let blobs_for_cleanup = new_blobs.clone();
let mut restricted_pairs: Vec<(TableId, Table)> = Vec::new();
let mut removed_ids: Vec<TableId> = Vec::new();
let mut next_views: Vec<Table> = Vec::new();
for view in ¤t_views {
if comparator.compare(view.metadata.key_range.max(), boundary)
== core::cmp::Ordering::Less
{
removed_ids.push(view.id());
} else {
let restricted = view.reopen_restricted(boundary.clone())?;
restricted_pairs.push((view.id(), restricted.clone()));
next_views.push(restricted);
}
}
let install = opts.version_history.write().upgrade_version(
&opts.config.path,
|sv| {
let mut copy = sv.clone();
let ctx = crate::version::TransformContext::new(comparator.as_ref());
copy.version = copy.version.with_tight_slice(
&restricted_pairs,
&removed_ids,
&outputs,
new_blobs,
gc_diff,
payload.dest_level as usize,
&ctx,
);
Ok(copy)
},
&opts.global_seqno,
&opts.visible_seqno,
&*opts.config.fs,
opts.runtime_config.load_full(),
opts.encryption.clone(),
);
if let Err(e) = install {
for t in &outputs {
t.mark_as_deleted();
}
for b in &blobs_for_cleanup {
b.mark_as_deleted();
}
return Err(e);
}
for view in ¤t_views {
if removed_ids.contains(&view.id()) {
view.mark_as_deleted();
} else {
view.mark_punch_on_drop(view.punch_offset_for(boundary)?);
}
}
for (bf, off) in &prior_to_punch {
bf.mark_punch_on_drop(*off);
}
tables_out += outputs.len();
current_views = next_views;
lower = Bound::Included(boundary.clone());
drop(prior_to_punch);
drop(current_stale);
opts.version_history.write().drain_obsolete_to_latest();
#[cfg(test)]
if opts
.config
.fail_tight_after_first_slice
.swap(false, core::sync::atomic::Ordering::SeqCst)
{
return Err(cancelled_compaction());
}
if current_views.is_empty() {
return Ok(tables_out);
}
}
let slice_payload = slice_payload_for(¤t_views, payload);
let version = opts.version_history.read().latest_version();
let tail_reloc = if relocating {
let current_stale: Vec<BlobFile> = stale_ids
.iter()
.filter_map(|id| version.version.blob_files.get(*id).cloned())
.collect();
Some(RelocationSetup {
stale_files: current_stale,
resume_offsets: resume_offsets.clone(),
})
} else {
None
};
let produced = run_subcompaction(
opts,
&slice_payload,
&version.version,
current_views.clone(),
&rts,
(lower.clone(), Bound::Unbounded),
dst_lvl,
is_last_level,
&blobs_folder,
tail_reloc,
)?;
drop(version);
let tail_out = produced.created_tables().len();
super::flavour::install_merge(
&mut opts.version_history.write(),
opts,
&slice_payload,
vec![produced],
)?;
Ok(tables_out + tail_out)
})();
compaction_state
.hidden_set_mut()
.show(payload.table_ids.iter().copied());
let tables_out = result?;
opts.version_history.write().maintenance(
&opts.config.path,
opts.mvcc_gc_watermark,
&*opts.config.fs,
)?;
Ok(CompactionResult {
action: CompactionAction::Merged,
dest_level: Some(payload.dest_level),
tables_in,
tables_out,
})
}
#[cfg(feature = "std")]
struct RelocationSetup {
stale_files: Vec<BlobFile>,
resume_offsets: crate::HashMap<BlobFileId, u64>,
}
#[cfg(feature = "std")]
#[expect(
clippy::too_many_arguments,
reason = "a sub-compaction needs the full compaction context (opts, payload, \
version, inputs, range, level info, blob folder) threaded in by value/ref \
so it can run on its own thread; bundling into a struct would just move \
the argument list"
)]
fn run_subcompaction(
opts: &Options,
payload: &CompactionPayload,
version: &Version,
tables_for_deletion: Vec<Table>,
input_range_tombstones: &[crate::range_tombstone::RangeTombstone],
bounds: (core::ops::Bound<UserKey>, core::ops::Bound<UserKey>),
dst_lvl: usize,
is_last_level: bool,
blobs_folder: &std::path::Path,
relocation: Option<RelocationSetup>,
) -> crate::Result<super::flavour::ProducedOutput> {
use super::flavour::CompactionFlavour;
#[cfg(test)]
if opts
.config
.fail_one_subcompaction
.swap(false, std::sync::atomic::Ordering::SeqCst)
{
return Err(cancelled_compaction());
}
let mut blob_frag_map = FragmentationMap::default();
let Some(mut merge_iter) = create_bounded_compaction_stream(
version,
&payload.table_ids,
bounds,
opts.mvcc_gc_watermark,
opts.config.merge_operator.clone(),
opts.config.comparator.clone(),
) else {
return Err(crate::Error::from(crate::io::Error::other(
"sub-compaction input tables disappeared mid-flight",
)));
};
let version_tombstones = if is_last_level {
collect_version_tombstones(version)
} else {
Vec::new()
};
merge_iter = merge_iter
.evict_tombstones(is_last_level)
.zero_seqnos(false);
if is_last_level {
merge_iter = merge_iter.with_range_tombstone_application(
version_tombstones.clone(),
opts.config.comparator.clone(),
);
}
let filter_ctx = Context { is_last_level };
let mut compaction_filter = opts
.config
.compaction_filter_factory
.as_ref()
.map(|f| f.make_filter(&filter_ctx));
if opts.config.kv_separation_opts.is_some() {
merge_iter = merge_iter.with_drop_callback(&mut blob_frag_map);
}
let mut filter_blob_writer = None;
let merge_iter = merge_iter.with_filter(StreamFilterAdapter::new(
compaction_filter.as_deref_mut(),
opts,
version,
blobs_folder,
&mut filter_blob_writer,
&filter_ctx,
));
let merge_iter = super::seqno_zeroer::BottommostSeqnoZeroer::new(
merge_iter,
is_last_level,
version_tombstones,
opts.mvcc_gc_watermark,
opts.config.comparator.clone(),
);
let table_writer = super::flavour::prepare_table_writer(version, opts, payload, false)?;
let mut compactor: Box<dyn CompactionFlavour> = match relocation {
Some(reloc) if opts.config.kv_separation_opts.is_some() => {
#[expect(clippy::expect_used, reason = "guarded by is_some() above")]
let blob_opts = opts
.config
.kv_separation_opts
.as_ref()
.expect("kv_separation_opts present");
let scanner = BlobFileMergeScanner::new(
reloc
.stale_files
.iter()
.map(|bf| match reloc.resume_offsets.get(&bf.id()) {
Some(&off) => BlobFileScanner::resume(&bf.0.path, &*bf.0.fs, bf.id(), off),
None => BlobFileScanner::new(&bf.0.path, &*bf.0.fs, bf.id()),
})
.collect::<crate::Result<Vec<_>>>()?,
);
let writer = BlobFileWriter::new(
opts.blob_file_id_generator.clone(),
blobs_folder,
opts.tree_id,
opts.config.descriptor_table.clone(),
opts.config.fs.clone(),
)?
.use_target_size(blob_opts.file_target_size)
.use_passthrough_compression(blob_opts.compression)
.use_sync_mode(opts.config.sync_mode);
let inner = StandardCompaction::new(table_writer, tables_for_deletion);
Box::new(RelocatingCompaction::new(
inner,
scanner.peekable(),
writer,
reloc.stale_files,
opts.rate_limiter.clone(),
opts.stop_signal.clone(),
))
}
_ => Box::new(StandardCompaction::new(table_writer, tables_for_deletion)),
};
let output_rts = range_tombstones_after_gc(
input_range_tombstones,
version,
&payload.table_ids,
opts.mvcc_gc_watermark,
is_last_level,
&opts.config.comparator,
);
if !output_rts.is_empty() {
compactor.write_range_tombstones(&output_rts);
}
for (idx, item) in merge_iter.enumerate() {
let item = item?;
let io_bytes = item.key.user_key.len() as u64 + item.value.len() as u64;
if opts
.rate_limiter
.request_interruptible(io_bytes, || opts.stop_signal.is_stopped())
{
return Err(cancelled_compaction());
}
compactor.write(item)?;
if idx % 1_000_000 == 0 && opts.stop_signal.is_stopped() {
return Err(cancelled_compaction());
}
}
if let Some(filter) = compaction_filter {
filter.finish();
}
let extra_blob_files = filter_blob_writer
.map(BlobFileWriter::finish)
.transpose()?
.unwrap_or_default();
let rollback_extra_blob_files = extra_blob_files.clone();
compactor
.produce(opts, dst_lvl, blob_frag_map, extra_blob_files)
.inspect_err(|_| {
for blob_file in &rollback_extra_blob_files {
blob_file.mark_as_deleted();
}
})
}
#[expect(
clippy::significant_drop_tightening,
reason = "version_history_lock must be held across upgrade_version and maintenance"
)]
fn move_tables(
compaction_state: &CompactionGuard<'_>,
opts: &Options,
payload: &CompactionPayload,
) -> crate::Result<CompactionResult> {
let mut version_history_lock = opts.version_history.write();
if compaction_state
.hidden_set()
.should_decline_compaction(payload.table_ids.iter().copied())
{
log::warn!(
"Compaction task created by {:?} contained hidden tables, declining to run it - please report this at https://github.com/fjall-rs/lsm-tree/issues/new?template=bug_report.md",
opts.strategy.get_name(),
);
return Ok(CompactionResult::nothing());
}
let table_count = payload.table_ids.len();
let table_ids = payload.table_ids.iter().copied().collect::<Vec<_>>();
version_history_lock.upgrade_version(
&opts.config.path,
|current| {
let mut copy = current.clone();
let ctx = crate::version::TransformContext::new(opts.config.comparator.as_ref());
copy.version = copy
.version
.with_moved(&table_ids, payload.dest_level as usize, &ctx);
Ok(copy)
},
&opts.global_seqno,
&opts.visible_seqno,
&*opts.config.fs,
opts.runtime_config.load_full(),
opts.encryption.clone(),
)?;
if let Err(e) = version_history_lock.maintenance(
&opts.config.path,
opts.mvcc_gc_watermark,
&*opts.config.fs,
) {
log::error!("Manifest maintenance failed: {e:?}");
return Err(e);
}
Ok(CompactionResult {
action: CompactionAction::Moved,
dest_level: Some(payload.dest_level),
tables_in: table_count,
tables_out: table_count,
})
}
pub fn pick_blob_files_to_rewrite(
picked_tables: &HashSet<TableId>,
current_version: &Version,
blob_opts: &crate::KvSeparationOptions,
) -> crate::Result<Vec<BlobFile>> {
use crate::Table;
let linked_blob_files = picked_tables
.iter()
.map(|&id| {
current_version.get_table(id).unwrap_or_else(|| {
panic!("Table {id} should exist");
})
})
.map(Table::list_blob_file_references)
.collect::<Result<Vec<_>, _>>()?;
let mut linked_blob_files = linked_blob_files
.into_iter()
.flatten()
.flatten()
.map(|blob_file_ref| {
current_version
.blob_files
.get(blob_file_ref.blob_file_id)
.unwrap_or_else(|| {
panic!("Blob file {} should exist", blob_file_ref.blob_file_id);
})
})
.filter(|blob_file| {
blob_file.is_stale(current_version.gc_stats(), blob_opts.staleness_threshold)
})
.filter(|blob_file| {
!blob_file.is_dead(current_version.gc_stats())
})
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>();
linked_blob_files.sort_by_key(|a| a.id());
#[expect(
clippy::cast_precision_loss,
clippy::cast_possible_truncation,
clippy::cast_sign_loss,
reason = "precision loss and truncation are acceptable for cutoff calculation"
)]
let cutoff_point = {
let len = linked_blob_files.len() as f32;
(len * blob_opts.age_cutoff) as usize
};
linked_blob_files.drain(cutoff_point..);
for table in current_version.iter_tables() {
if picked_tables.contains(&table.id()) {
continue;
}
let other_refs = table
.list_blob_file_references()?
.unwrap_or_default()
.into_iter()
.filter(|x| linked_blob_files.iter().any(|bf| bf.id() == x.blob_file_id))
.collect::<Vec<_>>();
for additional_ref in other_refs {
linked_blob_files.retain(|x| x.id() != additional_ref.blob_file_id);
}
}
Ok(linked_blob_files.into_iter().cloned().collect::<Vec<_>>())
}
fn hidden_guard<T>(
payload: &CompactionPayload,
opts: &Options,
f: impl FnOnce() -> crate::Result<T>,
) -> crate::Result<T> {
f().inspect_err(|e| {
log::error!("Compaction failed: {e:?}");
let mut compaction_state = opts.compaction_state.lock();
compaction_state
.hidden_set_mut()
.show(payload.table_ids.iter().copied());
})
}
#[expect(clippy::too_many_lines)]
fn merge_tables(
mut compaction_state: CompactionGuard<'_>,
version_history_lock: VersionsReadGuard<'_>,
opts: &Options,
payload: &CompactionPayload,
) -> crate::Result<CompactionResult> {
if opts.stop_signal.is_stopped() {
log::debug!("Stopping before compaction because of stop signal");
return Ok(CompactionResult::nothing());
}
if compaction_state
.hidden_set()
.should_decline_compaction(payload.table_ids.iter().copied())
{
log::warn!(
"Compaction task created by {:?} contained hidden tables, declining to run it - please report this at https://github.com/fjall-rs/lsm-tree/issues/new?template=bug_report.md",
opts.strategy.get_name(),
);
return Ok(CompactionResult::nothing());
}
let current_super_version = Arc::new(version_history_lock.latest_version());
let Some(tables) = payload
.table_ids
.iter()
.map(|&id| current_super_version.version.get_table(id).cloned())
.collect::<Option<Vec<_>>>()
else {
log::warn!(
"Compaction task created by {:?} contained tables not referenced in the level manifest",
opts.strategy.get_name(),
);
return Ok(CompactionResult::nothing());
};
let tables_in = payload.table_ids.len();
let mut input_range_tombstones: Vec<crate::range_tombstone::RangeTombstone> = tables
.iter()
.flat_map(|t| t.range_tombstones().iter().cloned())
.collect();
input_range_tombstones.sort();
input_range_tombstones.dedup();
#[cfg(feature = "std")]
{
let threads = opts.config.compaction_threads;
let dst_lvl: usize = payload.canonical_level.into();
let is_last_level = payload.dest_level == opts.config.level_count - 1;
let relocating = match &opts.config.kv_separation_opts {
Some(blob_opts) => !pick_blob_files_to_rewrite(
&payload.table_ids,
¤t_super_version.version,
blob_opts,
)?
.is_empty(),
None => false,
};
let total_input_bytes: u64 = tables.iter().map(Table::file_size).sum();
let boundaries = if threads > 1
&& !relocating
&& total_input_bytes >= opts.config.subcompaction_min_bytes
{
subcompaction_boundaries(
¤t_super_version.version,
payload.dest_level as usize,
threads,
&opts.config.comparator,
)
} else {
Vec::new()
};
if !boundaries.is_empty() {
let blobs_folder = opts.config.path.join(BLOBS_FOLDER);
let ranges = ranges_from_boundaries(&boundaries);
drop(version_history_lock);
compaction_state
.hidden_set_mut()
.hide(payload.table_ids.iter().copied());
drop(compaction_state);
let num_ranges = ranges.len();
let only_first_owns_inputs =
|idx: usize| if idx == 0 { tables.clone() } else { Vec::new() };
let outputs: Vec<crate::Result<super::flavour::ProducedOutput>> =
if let Some(spawner) = opts.config.compaction_pool.clone() {
let opts = Arc::new(opts.clone());
let payload = Arc::new(payload.clone());
let version = Arc::clone(¤t_super_version);
let rts = Arc::new(input_range_tombstones.clone());
let blobs = Arc::new(blobs_folder);
let (tx, rx) = std::sync::mpsc::channel();
for (idx, range) in ranges.iter().cloned().enumerate() {
let tx = tx.clone();
let opts = Arc::clone(&opts);
let payload = Arc::clone(&payload);
let version = Arc::clone(&version);
let rts = Arc::clone(&rts);
let blobs = Arc::clone(&blobs);
let tables_for_deletion = only_first_owns_inputs(idx);
spawner.spawn(Box::new(move || {
let out = run_subcompaction(
&opts,
&payload,
&version.version,
tables_for_deletion,
&rts,
range,
dst_lvl,
is_last_level,
&blobs,
None,
);
let _ = tx.send((idx, out));
}));
}
drop(tx);
let mut slots: Vec<Option<crate::Result<super::flavour::ProducedOutput>>> =
(0..num_ranges).map(|_| None).collect();
for (idx, out) in rx {
if let Some(slot) = slots.get_mut(idx) {
*slot = Some(out);
}
}
slots
.into_iter()
.map(|slot| {
slot.unwrap_or_else(|| {
Err(crate::Error::from(crate::io::Error::other(
"sub-compaction worker did not report a result",
)))
})
})
.collect()
} else {
ranges
.iter()
.cloned()
.enumerate()
.map(|(idx, range)| {
run_subcompaction(
opts,
payload,
¤t_super_version.version,
only_first_owns_inputs(idx),
&input_range_tombstones,
range,
dst_lvl,
is_last_level,
&blobs_folder,
None,
)
})
.collect()
};
let mut committed = Vec::with_capacity(outputs.len());
let mut first_err = None;
for out in outputs {
match out {
Ok(done) => committed.push(done),
Err(e) => {
first_err.get_or_insert(e);
}
}
}
if let Some(err) = first_err {
log::error!("Sub-compaction failed: {err:?}");
for done in &committed {
done.rollback_uninstalled();
}
{
let mut state = opts.compaction_state.lock();
state
.hidden_set_mut()
.show(payload.table_ids.iter().copied());
}
return Err(err);
}
let outputs = committed;
let mut compaction_state = opts.compaction_state.lock();
let mut version_history_lock = opts.version_history.write();
let tables_out =
super::flavour::install_merge(&mut version_history_lock, opts, payload, outputs)
.inspect_err(|e| {
log::error!("Sub-compaction install failed: {e:?}");
compaction_state
.hidden_set_mut()
.show(payload.table_ids.iter().copied());
})?;
compaction_state
.hidden_set_mut()
.show(payload.table_ids.iter().copied());
version_history_lock
.maintenance(&opts.config.path, opts.mvcc_gc_watermark, &*opts.config.fs)
.inspect_err(|e| log::error!("Manifest maintenance failed: {e:?}"))?;
drop(version_history_lock);
drop(compaction_state);
log::trace!("Parallel compaction done in {num_ranges} sub-ranges");
return Ok(CompactionResult {
action: CompactionAction::Merged,
dest_level: Some(payload.dest_level),
tables_in,
tables_out,
});
}
}
let mut blob_frag_map = FragmentationMap::default();
let Some(mut merge_iter) = create_compaction_stream(
¤t_super_version.version,
&payload.table_ids.iter().copied().collect::<Vec<_>>(),
opts.mvcc_gc_watermark,
opts.config.merge_operator.clone(),
opts.config.comparator.clone(),
)?
else {
log::warn!(
"Compaction task tried to compact tables that do not exist, declining to run it"
);
return Ok(CompactionResult::nothing());
};
let dst_lvl = payload.canonical_level.into();
let is_last_level = payload.dest_level == opts.config.level_count - 1;
merge_iter = merge_iter
.evict_tombstones(is_last_level)
.zero_seqnos(false);
#[cfg(feature = "std")]
let zeroing_tombstones = if is_last_level {
collect_version_tombstones(¤t_super_version.version)
} else {
Vec::new()
};
#[cfg(feature = "std")]
if is_last_level {
merge_iter = merge_iter.with_range_tombstone_application(
zeroing_tombstones.clone(),
opts.config.comparator.clone(),
);
}
let blobs_folder = opts.config.path.join(BLOBS_FOLDER);
let filter_ctx = Context { is_last_level };
let mut compaction_filter = opts.config.compaction_filter_factory.as_ref().map(|f| {
log::trace!("Installing custom compaction filter {:?}", f.name());
f.make_filter(&filter_ctx)
});
let mut filter_blob_writer = None;
let mut merge_iter = merge_iter.with_filter(StreamFilterAdapter::new(
compaction_filter.as_deref_mut(),
opts,
¤t_super_version.version,
&blobs_folder,
&mut filter_blob_writer,
&filter_ctx,
));
let table_writer =
super::flavour::prepare_table_writer(¤t_super_version.version, opts, payload, true)?;
let start = Instant::now();
let mut compactor = match &opts.config.kv_separation_opts {
Some(blob_opts) => {
merge_iter = merge_iter.with_drop_callback(&mut blob_frag_map);
let blob_files_to_rewrite = pick_blob_files_to_rewrite(
&payload.table_ids,
¤t_super_version.version,
blob_opts,
)?;
if blob_files_to_rewrite.is_empty() {
log::debug!("No blob relocation needed");
Box::new(StandardCompaction::new(table_writer, tables))
as Box<dyn super::flavour::CompactionFlavour>
} else {
log::debug!(
"Relocate blob files: {:?}",
blob_files_to_rewrite
.iter()
.map(BlobFile::id)
.collect::<Vec<_>>(),
);
let scanner = BlobFileMergeScanner::new(
blob_files_to_rewrite
.iter()
.map(|bf| BlobFileScanner::new(&bf.0.path, &*bf.0.fs, bf.id()))
.collect::<crate::Result<Vec<_>>>()?,
);
let writer = BlobFileWriter::new(
opts.blob_file_id_generator.clone(),
&blobs_folder,
opts.tree_id,
opts.config.descriptor_table.clone(),
opts.config.fs.clone(),
)?
.use_target_size(blob_opts.file_target_size)
.use_passthrough_compression(blob_opts.compression)
.use_sync_mode(opts.config.sync_mode);
let inner = StandardCompaction::new(table_writer, tables);
Box::new(RelocatingCompaction::new(
inner,
scanner.peekable(),
writer,
blob_files_to_rewrite,
opts.rate_limiter.clone(),
opts.stop_signal.clone(),
))
}
}
None => Box::new(StandardCompaction::new(table_writer, tables)),
};
log::trace!("Blob file GC preparation done in {:?}", start.elapsed());
drop(version_history_lock);
{
compaction_state
.hidden_set_mut()
.hide(payload.table_ids.iter().copied());
}
drop(compaction_state);
hidden_guard(payload, opts, || {
if !input_range_tombstones.is_empty() {
log::debug!(
"Propagating {} range tombstones to compaction output",
input_range_tombstones.len(),
);
compactor.write_range_tombstones(&input_range_tombstones);
}
#[cfg(feature = "std")]
let merge_iter = super::seqno_zeroer::BottommostSeqnoZeroer::new(
merge_iter,
is_last_level,
zeroing_tombstones,
opts.mvcc_gc_watermark,
opts.config.comparator.clone(),
);
for (idx, item) in merge_iter.enumerate() {
let item = item?;
let io_bytes = item.key.user_key.len() as u64 + item.value.len() as u64;
if opts
.rate_limiter
.request_interruptible(io_bytes, || opts.stop_signal.is_stopped())
{
log::debug!("Stopping amidst compaction because of stop signal (I/O throttle)");
return Ok(());
}
compactor.write(item)?;
if idx % 1_000_000 == 0 && opts.stop_signal.is_stopped() {
log::debug!("Stopping amidst compaction because of stop signal");
return Ok(());
}
}
Ok(())
})?;
if let Some(filter) = compaction_filter {
filter.finish();
}
let mut compaction_state = opts.compaction_state.lock();
log::trace!("Acquiring super version write lock");
let mut version_history_lock = opts.version_history.write();
log::trace!("Acquired super version write lock");
log::trace!("Blob fragmentation diff: {blob_frag_map:#?}");
let extra_blob_files = filter_blob_writer
.map(BlobFileWriter::finish)
.transpose()
.inspect_err(|e| {
log::error!("Compaction failed while finishing filter blob writer: {e:?}");
compaction_state
.hidden_set_mut()
.show(payload.table_ids.iter().copied());
})?
.unwrap_or_default();
let rollback_extra_blob_files = extra_blob_files.clone();
let produce_output = compactor
.produce(opts, dst_lvl, blob_frag_map, extra_blob_files)
.inspect_err(|e| {
log::error!("Compaction failed: {e:?}");
compaction_state
.hidden_set_mut()
.show(payload.table_ids.iter().copied());
for blob_file in &rollback_extra_blob_files {
blob_file.mark_as_deleted();
}
})?;
let tables_out = super::flavour::install_merge(
&mut version_history_lock,
opts,
payload,
vec![produce_output],
)
.inspect_err(|e| {
log::error!("Compaction failed: {e:?}");
compaction_state
.hidden_set_mut()
.show(payload.table_ids.iter().copied());
})?;
compaction_state
.hidden_set_mut()
.show(payload.table_ids.iter().copied());
version_history_lock
.maintenance(&opts.config.path, opts.mvcc_gc_watermark, &*opts.config.fs)
.inspect_err(|e| {
log::error!("Manifest maintenance failed: {e:?}");
})?;
drop(version_history_lock);
drop(compaction_state);
log::trace!("Compaction successful");
Ok(CompactionResult {
action: CompactionAction::Merged,
dest_level: Some(payload.dest_level),
tables_in,
tables_out,
})
}
fn drop_tables(
compaction_state: CompactionGuard<'_>,
opts: &Options,
ids_to_drop: &[TableId],
) -> crate::Result<CompactionResult> {
let mut version_history_lock = opts.version_history.write();
if compaction_state
.hidden_set()
.should_decline_compaction(ids_to_drop.iter().copied())
{
log::warn!(
"Compaction task created by {:?} contained hidden tables, declining to run it - please report this at https://github.com/fjall-rs/lsm-tree/issues/new?template=bug_report.md",
opts.strategy.get_name(),
);
return Ok(CompactionResult::nothing());
}
let Some(tables) = ids_to_drop
.iter()
.map(|&id| {
version_history_lock
.latest_version()
.version
.get_table(id)
.cloned()
})
.collect::<Option<Vec<_>>>()
else {
log::warn!(
"Compaction task created by {:?} contained tables not referenced in the level manifest",
opts.strategy.get_name(),
);
return Ok(CompactionResult::nothing());
};
log::debug!("Dropping tables: {ids_to_drop:?}");
let mut dropped_blob_files = vec![];
version_history_lock.upgrade_version(
&opts.config.path,
|current| {
let mut copy = current.clone();
let ctx = crate::version::TransformContext::new(opts.config.comparator.as_ref());
copy.version = copy
.version
.with_dropped(ids_to_drop, &mut dropped_blob_files, &ctx)?;
Ok(copy)
},
&opts.global_seqno,
&opts.visible_seqno,
&*opts.config.fs,
opts.runtime_config.load_full(),
opts.encryption.clone(),
)?;
if let Err(e) = version_history_lock.maintenance(
&opts.config.path,
opts.mvcc_gc_watermark,
&*opts.config.fs,
) {
log::error!("Manifest maintenance failed: {e:?}");
return Err(e);
}
drop(version_history_lock);
for table in tables {
table.mark_as_deleted();
}
for blob_file in dropped_blob_files {
blob_file.mark_as_deleted();
}
let tables_dropped = ids_to_drop.len();
drop(compaction_state);
log::trace!("Dropped {tables_dropped} tables");
Ok(CompactionResult {
action: CompactionAction::Dropped,
dest_level: None,
tables_in: tables_dropped,
tables_out: 0,
})
}
#[cfg(test)]
mod tests {
use super::{create_compaction_stream, pick_run_indexes};
use crate::{
AbstractTree, Config, KvSeparationOptions, SequenceNumberCounter, Table, TableId,
compaction::{Choice, CompactionStrategy, Input, state::CompactionState},
config::BlockSizePolicy,
version::Version,
};
use std::sync::Arc;
use test_log::test;
struct FirstByteComparator;
impl crate::comparator::UserComparator for FirstByteComparator {
fn name(&self) -> &'static str {
"test-first-byte"
}
fn compare(&self, a: &[u8], b: &[u8]) -> core::cmp::Ordering {
a.first().cmp(&b.first())
}
}
#[test]
fn boundary_candidates_dedups_comparator_equal_keys() {
let cmp: crate::comparator::SharedComparator = Arc::new(FirstByteComparator);
let keys = vec![
crate::UserKey::from("a1"),
crate::UserKey::from("a2"),
crate::UserKey::from("b1"),
];
let out = super::boundary_candidates(keys, &cmp);
assert_eq!(
out.len(),
1,
"comparator-equal keys must collapse to a single boundary candidate",
);
assert_eq!(
out.first().and_then(|k| k.first()),
Some(&b'a'),
"the surviving boundary should be from the deduped a-group",
);
}
#[cfg(feature = "parallel")]
#[test]
fn failed_subcompaction_rolls_back_and_restores_inputs() -> crate::Result<()> {
use core::sync::atomic::Ordering;
const N: u64 = 4_000;
let key = |i: u64| format!("key_{i:08}");
let val = |i: u64, generation: u64| format!("g{generation}-{i}-{}", "x".repeat(40));
let dir = tempfile::tempdir()?;
let config = Config::new(
&dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.data_block_size_policy(BlockSizePolicy::all(512))
.compaction_threads(4)
.subcompaction_min_bytes(0)
.with_kv_separation(Some(
KvSeparationOptions::default().separation_threshold(16),
));
let failpoint = config.fail_one_subcompaction.clone();
let tree = config.open()?;
for i in 0..N {
tree.insert(key(i), val(i, 0), i);
}
tree.flush_active_memtable(0)?;
tree.major_compact(4_096, 0)?;
for i in 0..N {
tree.insert(key(i), val(i, 1), N + i);
}
tree.flush_active_memtable(0)?;
let tables_before = tree.table_count();
failpoint.store(true, Ordering::SeqCst);
let result = tree.major_compact(u64::MAX, 0);
assert!(
result.is_err(),
"a failing sub-compaction range must abort the compaction",
);
assert!(
!failpoint.load(Ordering::SeqCst),
"the failpoint should have fired and disarmed itself",
);
assert_eq!(
tree.table_count(),
tables_before,
"rollback must leave nothing partially installed",
);
for i in 0..N {
assert_eq!(
tree.get(key(i), crate::MAX_SEQNO)?.as_deref(),
Some(val(i, 1).as_bytes()),
"value for {} must survive the rolled-back compaction",
key(i),
);
}
Ok(())
}
#[test]
fn tight_space_crash_after_first_slice_recovers_all_keys_on_reopen() -> crate::Result<()> {
use core::sync::atomic::Ordering;
const N: u64 = 2_000;
let k = |i: u64| format!("key{i:08}");
let dir = tempfile::tempdir()?;
let mem = crate::fs::MemFs::with_capacity(u64::MAX);
let config = Config::new(
&dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.data_block_size_policy(BlockSizePolicy::all(512))
.with_shared_fs(Arc::new(mem.clone()));
let failpoint = config.fail_tight_after_first_slice.clone();
let tree = match config.open()? {
crate::AnyTree::Standard(t) => t,
crate::AnyTree::Blob(_) => panic!("expected Standard tree"),
};
for i in 0..N {
tree.insert(k(i).as_bytes(), vec![0xCDu8; 64], i);
}
tree.flush_active_memtable(0)?;
let used = tree.storage_stats()?.used_bytes;
mem.set_capacity(used + used / 4);
tree.update_runtime_config(|c| {
c.storage_admission_check = true;
c.tight_space_compaction = true;
})?;
failpoint.store(true, Ordering::SeqCst);
assert!(
tree.major_compact(64 * 1024 * 1024, 0).is_err(),
"the crash failpoint must abort the tight-space compaction",
);
assert!(
!failpoint.load(Ordering::SeqCst),
"the failpoint should have fired and disarmed",
);
assert!(
mem.punched_bytes() > 0,
"the first slice must have punched before the crash",
);
drop(tree);
let reopened = Config::new(
&dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.with_shared_fs(Arc::new(mem))
.open()?;
for i in 0..N {
assert!(
reopened.get(k(i).as_bytes(), crate::MAX_SEQNO)?.is_some(),
"key {i} lost after a crash mid tight-space compaction + reopen",
);
}
Ok(())
}
#[test]
fn tight_space_blob_relocation_crash_after_first_slice_recovers_all_keys() -> crate::Result<()>
{
use core::sync::atomic::Ordering;
const N: u64 = 4_000;
let k = |i: u64| format!("key{i:08}");
let val = |i: u64, generation: u8| -> Vec<u8> {
let mut s = (i + 1).wrapping_mul(0x9E37_79B9_7F4A_7C15) ^ (u64::from(generation) << 1);
(0..200u32)
.map(|_| {
s ^= s << 13;
s ^= s >> 7;
s ^= s << 17;
#[expect(
clippy::cast_possible_truncation,
reason = "xorshift byte extraction; the high bits are intentionally dropped"
)]
let byte = (s >> 24) as u8;
byte
})
.collect()
};
let dir = tempfile::tempdir()?;
let mem = crate::fs::MemFs::with_capacity(u64::MAX);
let config = Config::new(
&dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.data_block_size_policy(BlockSizePolicy::all(512))
.with_shared_fs(Arc::new(mem.clone()))
.with_kv_separation(Some(
KvSeparationOptions::default()
.separation_threshold(64)
.age_cutoff(1.0)
.staleness_threshold(0.1)
.file_target_size(48 * 1024),
));
let failpoint = config.fail_tight_after_first_slice.clone();
let tree = match config.open()? {
crate::AnyTree::Blob(t) => t,
crate::AnyTree::Standard(_) => panic!("expected Blob tree"),
};
for i in 0..N {
tree.insert(k(i).as_bytes(), val(i, 1), i);
}
tree.flush_active_memtable(0)?;
for i in (0..N).step_by(2) {
tree.insert(k(i).as_bytes(), val(i, 2), N + i);
}
tree.flush_active_memtable(0)?;
let gc_watermark = 4 * N;
tree.index.update_runtime_config(|c| {
c.storage_admission_check = true;
c.storage_limit_bytes = None;
})?;
tree.major_compact(64 * 1024 * 1024, gc_watermark)?;
let used = tree.storage_stats()?.used_bytes;
mem.set_capacity(used + used / 4);
tree.index.update_runtime_config(|c| {
c.tight_space_compaction = true;
})?;
failpoint.store(true, Ordering::SeqCst);
assert!(
tree.major_compact(64 * 1024 * 1024, gc_watermark).is_err(),
"the crash failpoint must abort the relocating tight-space compaction",
);
assert!(
!failpoint.load(Ordering::SeqCst),
"the failpoint should have fired and disarmed",
);
assert!(
mem.punched_bytes() > 0,
"the first relocated slice must have punched a stale blob prefix",
);
drop(tree);
let reopened = match Config::new(
&dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.with_kv_separation(Some(
KvSeparationOptions::default().separation_threshold(64),
))
.with_shared_fs(Arc::new(mem))
.open()?
{
crate::AnyTree::Blob(t) => t,
crate::AnyTree::Standard(_) => panic!("expected Blob tree"),
};
for i in 0..N {
let expected = if i % 2 == 0 { val(i, 2) } else { val(i, 1) };
assert_eq!(
reopened.get(k(i).as_bytes(), crate::MAX_SEQNO)?.as_deref(),
Some(expected.as_slice()),
"key {i} wrong/lost after a crash mid blob-relocation + reopen",
);
}
Ok(())
}
#[test]
fn last_level_applies_and_gcs_below_watermark_range_tombstone() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
&dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.data_block_size_policy(BlockSizePolicy::all(512))
.compaction_threads(4)
.subcompaction_min_bytes(0)
.open()?;
let key = |i: u64| format!("k{i:04}");
let val = |i: u64| format!("v{i}-{}", "x".repeat(40));
for i in 0..200u64 {
tree.insert(key(i), val(i), i);
}
tree.flush_active_memtable(0)?;
tree.major_compact(4_096, 0)?;
tree.remove_range(
crate::UserKey::from("k0000"),
crate::UserKey::from("k0050"),
1000,
);
for i in 50..200u64 {
tree.insert(key(i), val(i), 1001 + i);
}
tree.flush_active_memtable(0)?;
tree.major_compact(u64::MAX, 5000)?;
for i in 0..50u64 {
assert_eq!(
tree.get(key(i), crate::MAX_SEQNO)?,
None,
"covered key {} must be physically gone after GC",
key(i),
);
}
for i in 50..200u64 {
assert!(
tree.get(key(i), crate::MAX_SEQNO)?.is_some(),
"uncovered key {} must survive",
key(i),
);
}
let remaining = super::collect_version_tombstones(&tree.current_version());
assert!(
remaining.is_empty(),
"a fully-applied below-watermark tombstone must be GC'd, found {remaining:?}",
);
Ok(())
}
#[test]
fn above_watermark_range_tombstone_is_retained() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
&dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
let key = |i: u64| format!("k{i:04}");
for i in 0..50u64 {
tree.insert(key(i), "v", i);
}
tree.flush_active_memtable(0)?;
tree.remove_range(
crate::UserKey::from("k0000"),
crate::UserKey::from("k0025"),
100,
);
tree.flush_active_memtable(0)?;
tree.major_compact(u64::MAX, 50)?;
let remaining = super::collect_version_tombstones(&tree.current_version());
assert!(
!remaining.is_empty(),
"an above-watermark tombstone must be retained, not GC'd",
);
Ok(())
}
#[test]
fn range_tombstone_at_exact_watermark_is_not_applied_or_gced() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
&dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.data_block_size_policy(BlockSizePolicy::all(512))
.compaction_threads(4)
.subcompaction_min_bytes(0)
.open()?;
let key = |i: u64| format!("k{i:04}");
let val = |i: u64| format!("v{i}-{}", "x".repeat(40));
for i in 0..200u64 {
tree.insert(key(i), val(i), i);
}
tree.flush_active_memtable(0)?;
tree.major_compact(4_096, 0)?;
tree.remove_range(
crate::UserKey::from("k0000"),
crate::UserKey::from("k0050"),
1000,
);
for i in 50..200u64 {
tree.insert(key(i), val(i), 1001 + i);
}
tree.flush_active_memtable(0)?;
tree.major_compact(u64::MAX, 1000)?;
for i in 0..50u64 {
assert_eq!(
tree.get(key(i), 1000)?.as_deref(),
Some(val(i).as_bytes()),
"covered key {} must survive: RT@watermark is invisible at read==watermark",
key(i),
);
}
let remaining = super::collect_version_tombstones(&tree.current_version());
assert!(
!remaining.is_empty(),
"a tombstone at the exact watermark must be retained, not GC'd",
);
Ok(())
}
#[test]
fn compaction_stream_run_not_found() -> crate::Result<()> {
let folder = tempfile::tempdir()?;
let tree = crate::Config::new(
folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
tree.insert("a", "a", 0);
tree.flush_active_memtable(0)?;
assert!(
create_compaction_stream(
&tree.current_version(),
&[666],
0,
None,
crate::comparator::default_comparator()
)?
.is_none()
);
Ok(())
}
#[test]
#[expect(clippy::unwrap_used)]
fn compaction_stream_run() -> crate::Result<()> {
let folder = tempfile::tempdir()?;
let tree = crate::Config::new(
folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
tree.insert("a", "a", 0);
tree.flush_active_memtable(0)?;
tree.insert("b", "b", 0);
tree.flush_active_memtable(0)?;
tree.insert("c", "c", 0);
tree.flush_active_memtable(0)?;
assert_eq!(
Some((0, 2)),
pick_run_indexes(
tree.current_version()
.level(0)
.unwrap()
.iter()
.next()
.unwrap(),
&[0, 1, 2],
)
);
Ok(())
}
#[test]
#[expect(clippy::unwrap_used)]
fn compaction_stream_run_2() -> crate::Result<()> {
let folder = tempfile::tempdir()?;
let tree = crate::Config::new(
folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
tree.insert("a", "a", 0);
tree.flush_active_memtable(0)?;
tree.insert("b", "b", 0);
tree.flush_active_memtable(0)?;
tree.insert("c", "c", 0);
tree.flush_active_memtable(0)?;
assert_eq!(
Some((0, 0)),
pick_run_indexes(
tree.current_version()
.level(0)
.unwrap()
.iter()
.next()
.unwrap(),
&[0],
)
);
Ok(())
}
#[test]
#[expect(clippy::unwrap_used)]
fn compaction_stream_run_3() -> crate::Result<()> {
let folder = tempfile::tempdir()?;
let tree = crate::Config::new(
folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
tree.insert("a", "a", 0);
tree.flush_active_memtable(0)?;
tree.insert("b", "b", 0);
tree.flush_active_memtable(0)?;
tree.insert("c", "c", 0);
tree.flush_active_memtable(0)?;
assert_eq!(
Some((2, 2)),
pick_run_indexes(
tree.current_version()
.level(0)
.unwrap()
.iter()
.next()
.unwrap(),
&[2],
)
);
Ok(())
}
#[test]
#[expect(clippy::unwrap_used)]
fn compaction_stream_run_4() -> crate::Result<()> {
let folder = tempfile::tempdir()?;
let tree = crate::Config::new(
folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
tree.insert("a", "a", 0);
tree.flush_active_memtable(0)?;
tree.insert("b", "b", 0);
tree.flush_active_memtable(0)?;
tree.insert("c", "c", 0);
tree.flush_active_memtable(0)?;
assert_eq!(
None,
pick_run_indexes(
tree.current_version()
.level(0)
.unwrap()
.iter()
.next()
.unwrap(),
&[4],
)
);
Ok(())
}
#[test]
fn compaction_drop_tables() -> crate::Result<()> {
let folder = tempfile::tempdir()?;
let tree = crate::Config::new(
folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
tree.insert("a", "a", 0);
tree.flush_active_memtable(0)?;
assert_eq!(1, tree.approximate_len());
assert_eq!(0, tree.sealed_memtable_count());
tree.insert("b", "a", 1);
tree.flush_active_memtable(0)?;
assert_eq!(2, tree.approximate_len());
assert_eq!(0, tree.sealed_memtable_count());
tree.insert("c", "a", 2);
tree.flush_active_memtable(0)?;
assert_eq!(3, tree.approximate_len());
assert_eq!(0, tree.sealed_memtable_count());
tree.compact(Arc::new(crate::compaction::Fifo::new(1, None)), 3)?;
assert_eq!(0, tree.table_count());
Ok(())
}
#[test]
fn blob_file_picking_simple() -> crate::Result<()> {
struct InPlaceStrategy(Vec<TableId>);
impl CompactionStrategy for InPlaceStrategy {
fn get_name(&self) -> &'static str {
"InPlaceCompaction"
}
fn choose(&self, _: &Version, _: &Config, _: &CompactionState) -> Choice {
Choice::Merge(Input {
table_ids: self.0.iter().copied().collect(),
dest_level: 6,
target_size: 64_000_000,
canonical_level: 6, })
}
}
let folder = tempfile::tempdir()?;
let tree = crate::Config::new(
folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.data_block_size_policy(BlockSizePolicy::all(1))
.with_kv_separation(Some(
KvSeparationOptions::default()
.separation_threshold(1)
.age_cutoff(1.0)
.staleness_threshold(0.01)
.compression(crate::CompressionType::None),
))
.open()?;
tree.insert("a", "a", 0);
tree.insert("b", "b", 0);
tree.insert("c", "c", 0);
tree.flush_active_memtable(1_000)?;
assert_eq!(0, tree.sealed_memtable_count());
assert_eq!(1, tree.table_count());
assert_eq!(1, tree.blob_file_count());
tree.major_compact(1, 1_000)?;
assert_eq!(3, tree.table_count());
assert_eq!(1, tree.blob_file_count());
tree.drop_range("a"..="a")?;
assert_eq!(2, tree.table_count());
assert_eq!(1, tree.blob_file_count());
{
assert_eq!(
&{
let mut map = crate::HashMap::default();
map.insert(0, crate::blob_tree::FragmentationEntry::new(1, 1, 1));
map
},
&**tree.current_version().gc_stats(),
);
}
tree.compact(Arc::new(InPlaceStrategy(vec![2])), 1_000)?;
assert_eq!(2, tree.table_count());
assert_eq!(1, tree.blob_file_count());
{
assert_eq!(
&{
let mut map = crate::HashMap::default();
map.insert(0, crate::blob_tree::FragmentationEntry::new(1, 1, 1));
map
},
&**tree.current_version().gc_stats(),
);
}
tree.compact(Arc::new(InPlaceStrategy(vec![3, 4])), 1_000)?;
assert_eq!(1, tree.table_count());
assert_eq!(1, tree.blob_file_count());
{
assert_eq!(
crate::HashMap::default(),
**tree.current_version().gc_stats(),
);
}
Ok(())
}
#[expect(
clippy::expect_used,
clippy::indexing_slicing,
reason = "test asserts over known-good fixtures; failure surfaces via panic"
)]
#[test]
fn narrow_merge_candidates_for_full_run_are_adjacent_pairs_sorted_ascending()
-> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
&dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.data_block_size_policy(BlockSizePolicy::all(512))
.open()?;
for i in 0..3_000u64 {
tree.insert(format!("k{i:08}"), "v".repeat(40), i);
}
tree.flush_active_memtable(0)?;
tree.major_compact(16 * 1024, 0)?;
let version = tree.current_version();
let run = version
.iter_levels()
.flat_map(|level| level.iter())
.find(|run| run.len() >= 3)
.expect("a bottom-level run with >= 3 tables");
let ordered: Vec<(TableId, u64)> = run.iter().map(|t| (t.id(), t.file_size())).collect();
let payload = Input {
table_ids: ordered.iter().map(|(id, _)| *id).collect(),
dest_level: 6,
canonical_level: 6,
target_size: 64 * 1024 * 1024,
};
let candidates = super::narrow_merge_candidates(&version, &payload);
assert_eq!(
candidates.len(),
ordered.len() - 1,
"one candidate per run-adjacent pair"
);
for c in &candidates {
assert_eq!(c.table_ids.len(), 2, "each candidate is an adjacent pair");
assert_eq!(c.dest_level, 6, "destination preserved");
}
let combined = |c: &Input| -> u64 {
c.table_ids
.iter()
.filter_map(|id| version.get_table(*id))
.map(Table::file_size)
.sum()
};
let sums: Vec<u64> = candidates.iter().map(combined).collect();
let mut sorted = sums.clone();
sorted.sort_unstable();
assert_eq!(sums, sorted, "candidates sorted ascending by SST size");
let smallest_pair = ordered
.windows(2)
.map(|w| w[0].1 + w[1].1)
.min()
.expect(">= 2 tables");
assert_eq!(sums[0], smallest_pair, "smallest-Σ pair is tried first");
Ok(())
}
#[test]
fn space_fits_two_layer_combines_shared_volume_outputs_and_separates_routed_ones()
-> crate::Result<()> {
use crate::fs::MemFs;
const MIB: u64 = 1024 * 1024;
let dir = tempfile::tempdir()?;
let cfg = Config::new(
&dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.with_shared_fs(Arc::new(MemFs::with_capacity(100 * MIB)));
assert!(
!super::space_fits_two_layer(&cfg, u64::MAX, 60 * MIB, 6, 60 * MIB),
"shared-volume outputs must be summed, not checked independently"
);
assert!(super::space_fits_two_layer(
&cfg,
u64::MAX,
60 * MIB,
6,
30 * MIB
));
assert!(!super::space_fits_two_layer(
&cfg,
80 * MIB,
50 * MIB,
6,
40 * MIB
));
let routed = Config::new(
&dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.with_shared_fs(Arc::new(MemFs::with_capacity(100 * MIB)))
.level_routes(vec![crate::config::LevelRoute {
levels: 6..7,
path: crate::path::PathBuf::from("/cold-tier"),
fs: Arc::new(MemFs::with_capacity(100 * MIB)),
}]);
assert!(
super::space_fits_two_layer(&routed, u64::MAX, 60 * MIB, 6, 60 * MIB),
"proven-independent volumes are checked independently"
);
assert!(!super::space_fits_two_layer(
&routed,
u64::MAX,
60 * MIB,
6,
130 * MIB
));
let shared: Arc<dyn crate::fs::Fs> = Arc::new(MemFs::with_capacity(100 * MIB));
let routed_same_mount = Config::new(
&dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.with_shared_fs(Arc::clone(&shared))
.level_routes(vec![crate::config::LevelRoute {
levels: 6..7,
path: crate::path::PathBuf::from("/same-mount-subdir"),
fs: Arc::clone(&shared),
}]);
assert!(
!super::space_fits_two_layer(&routed_same_mount, u64::MAX, 60 * MIB, 6, 60 * MIB),
"a route on the same volume must combine budgets, not admit each independently"
);
Ok(())
}
#[expect(
clippy::expect_used,
reason = "test asserts over known-good fixtures; failure surfaces via panic"
)]
#[test]
fn space_gate_for_merge_narrows_a_full_run_that_exceeds_free() -> crate::Result<()> {
use crate::fs::MemFs;
let dir = tempfile::tempdir()?;
let mem = MemFs::with_capacity(u64::MAX);
let any = Config::new(
&dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.with_shared_fs(Arc::new(mem.clone()))
.data_block_size_policy(BlockSizePolicy::all(512))
.open()?;
let crate::AnyTree::Standard(tree) = any else {
panic!("expected Standard tree");
};
for i in 0..3_000u64 {
tree.insert(format!("k{i:08}"), "v".repeat(40), i);
}
tree.flush_active_memtable(0)?;
tree.major_compact(16 * 1024, 0)?;
let version = tree.current_version();
let run = version
.iter_levels()
.flat_map(|level| level.iter())
.find(|run| run.len() >= 3)
.expect("a bottom-level run with >= 3 tables");
let run_sigma: u64 = run.iter().map(Table::file_size).sum();
let payload = Input {
table_ids: run.iter().map(Table::id).collect(),
dest_level: 6,
canonical_level: 6,
target_size: 64 * 1024 * 1024,
};
let probe_capacity = 1u64 << 40;
mem.set_capacity(probe_capacity);
let stored = probe_capacity
- crate::fs::Fs::available_space(&mem, dir.path()).unwrap_or(probe_capacity);
mem.set_capacity(stored + run_sigma - 1);
tree.update_runtime_config(|c| {
c.storage_admission_check = true;
c.storage_limit_bytes = None;
})?;
let opts = super::Options::from_tree(
&tree,
Arc::new(crate::compaction::major::Strategy::new(64 * 1024 * 1024)),
);
match super::space_gate_for_merge(&version, &opts, &payload)? {
super::SpaceGate::Narrowed(narrowed) => {
assert_eq!(narrowed.table_ids.len(), 2, "narrowed to an adjacent pair");
}
super::SpaceGate::Run => {
panic!("expected Narrowed, got Run (full run wrongly admitted)")
}
super::SpaceGate::Skip => panic!("expected Narrowed, got Skip (no pair admitted)"),
}
Ok(())
}
}