use super::{CompactionAction, CompactionResult, CompactionStrategy, Input as CompactionPayload};
use crate::{
BlobFile, Config, HashSet, InternalValue, SeqNo, SequenceNumberCounter,
SharedSequenceNumberGenerator, Table, TableId,
blob_tree::FragmentationMap,
compaction::{
Choice,
filter::{Context, StreamFilterAdapter},
flavour::{RelocatingCompaction, StandardCompaction},
state::CompactionState,
stream::CompactionStream,
},
file::BLOBS_FOLDER,
merge::Merger,
run_scanner::RunScanner,
stop_signal::StopSignal,
tree::inner::TreeId,
version::{Run, SuperVersions, Version},
vlog::{BlobFileMergeScanner, BlobFileScanner, BlobFileWriter},
};
use std::{
sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard},
time::Instant,
};
#[cfg(feature = "metrics")]
use crate::metrics::Metrics;
pub type CompactionReader<'a> = Box<dyn Iterator<Item = crate::Result<InternalValue>> + 'a>;
pub struct Options {
pub tree_id: TreeId,
pub global_seqno: SharedSequenceNumberGenerator,
pub visible_seqno: SharedSequenceNumberGenerator,
pub table_id_generator: SequenceNumberCounter,
pub blob_file_id_generator: SequenceNumberCounter,
pub config: Arc<Config>,
pub version_history: Arc<RwLock<SuperVersions>>,
pub strategy: Arc<dyn CompactionStrategy>,
pub stop_signal: StopSignal,
pub mvcc_gc_watermark: u64,
pub compaction_state: Arc<Mutex<CompactionState>>,
#[cfg(feature = "metrics")]
pub metrics: Arc<Metrics>,
}
impl Options {
pub fn from_tree(tree: &crate::Tree, strategy: Arc<dyn CompactionStrategy>) -> Self {
Self {
global_seqno: tree.config.seqno.clone(),
visible_seqno: tree.config.visible_seqno.clone(),
tree_id: tree.id,
table_id_generator: tree.table_id_counter.clone(),
blob_file_id_generator: tree.blob_file_id_counter.clone(),
config: tree.config.clone(),
version_history: tree.version_history.clone(),
stop_signal: tree.stop_signal.clone(),
strategy,
mvcc_gc_watermark: 0,
compaction_state: tree.compaction_state.clone(),
#[cfg(feature = "metrics")]
metrics: tree.metrics.clone(),
}
}
}
pub fn do_compaction(opts: &Options) -> crate::Result<CompactionResult> {
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let compaction_state = opts.compaction_state.lock().expect("lock is poisoned");
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let version_history_lock = opts.version_history.read().expect("lock is poisoned");
let start = Instant::now();
log::trace!(
"Consulting compaction strategy {:?}",
opts.strategy.get_name(),
);
let choice = opts.strategy.choose(
&version_history_lock.latest_version().version,
&opts.config,
&compaction_state,
);
log::debug!("Compaction choice: {choice:?} in {:?}", start.elapsed());
match choice {
Choice::Merge(payload) => {
merge_tables(compaction_state, version_history_lock, opts, &payload)
}
Choice::Move(payload) => {
if opts.config.level_routes.is_some() {
let (dst_folder, _) = opts.config.tables_folder_for_level(payload.dest_level);
let version = &version_history_lock.latest_version().version;
let cross_folder = version
.iter_levels()
.flat_map(|level| level.iter())
.flat_map(|run| run.iter())
.filter(|t| payload.table_ids.contains(&t.id()))
.any(|t| t.path.parent() != Some(dst_folder.as_path()));
if cross_folder {
log::debug!("Converting trivial move to merge: cross-folder level routing");
return merge_tables(compaction_state, version_history_lock, opts, &payload);
}
}
drop(version_history_lock);
move_tables(&compaction_state, opts, &payload)
}
Choice::Drop(payload) => {
drop(version_history_lock);
let ids = payload.into_iter().collect::<Vec<_>>();
drop_tables(compaction_state, opts, &ids)
}
Choice::DoNothing => {
log::trace!("Compactor chose to do nothing");
Ok(CompactionResult::nothing())
}
}
}
fn pick_run_indexes(run: &Run<Table>, to_compact: &[TableId]) -> Option<(usize, usize)> {
let lo = run
.iter()
.position(|table| to_compact.contains(&table.id()))?;
let hi = run
.iter()
.rposition(|table| to_compact.contains(&table.id()))?;
Some((lo, hi))
}
fn create_compaction_stream<'a>(
version: &Version,
to_compact: &[TableId],
eviction_seqno: SeqNo,
merge_operator: Option<Arc<dyn crate::merge_operator::MergeOperator>>,
comparator: crate::comparator::SharedComparator,
) -> crate::Result<Option<CompactionStream<'a, Merger<CompactionReader<'a>>>>> {
let mut readers: Vec<CompactionReader<'_>> = vec![];
let mut found = 0;
for run in version.iter_levels().flat_map(|lvl| lvl.iter()) {
if run.len() > 1 {
let Some((lo, hi)) = pick_run_indexes(run, to_compact) else {
continue;
};
readers.push(Box::new(RunScanner::culled(
run.clone(),
(Some(lo), Some(hi)),
)?));
found += hi - lo + 1;
} else {
for table in run.iter().filter(|x| to_compact.contains(&x.metadata.id)) {
found += 1;
readers.push(Box::new(table.scan()?));
}
}
}
Ok(if found == to_compact.len() {
Some(
CompactionStream::new(Merger::new(readers, comparator), eviction_seqno)
.with_merge_operator(merge_operator),
)
} else {
None
})
}
#[expect(
clippy::significant_drop_tightening,
reason = "version_history_lock must be held across upgrade_version and maintenance"
)]
fn move_tables(
compaction_state: &MutexGuard<'_, CompactionState>,
opts: &Options,
payload: &CompactionPayload,
) -> crate::Result<CompactionResult> {
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let mut version_history_lock = opts.version_history.write().expect("lock is poisoned");
if compaction_state
.hidden_set()
.should_decline_compaction(payload.table_ids.iter().copied())
{
log::warn!(
"Compaction task created by {:?} contained hidden tables, declining to run it - please report this at https://github.com/fjall-rs/lsm-tree/issues/new?template=bug_report.md",
opts.strategy.get_name(),
);
return Ok(CompactionResult::nothing());
}
let table_count = payload.table_ids.len();
let table_ids = payload.table_ids.iter().copied().collect::<Vec<_>>();
version_history_lock.upgrade_version(
&opts.config.path,
|current| {
let mut copy = current.clone();
let ctx = crate::version::TransformContext::new(opts.config.comparator.as_ref());
copy.version = copy
.version
.with_moved(&table_ids, payload.dest_level as usize, &ctx);
Ok(copy)
},
&opts.global_seqno,
&opts.visible_seqno,
&*opts.config.fs,
)?;
if let Err(e) = version_history_lock.maintenance(&opts.config.path, opts.mvcc_gc_watermark) {
log::error!("Manifest maintenance failed: {e:?}");
return Err(e);
}
Ok(CompactionResult {
action: CompactionAction::Moved,
dest_level: Some(payload.dest_level),
tables_in: table_count,
tables_out: table_count,
})
}
fn pick_blob_files_to_rewrite(
picked_tables: &HashSet<TableId>,
current_version: &Version,
blob_opts: &crate::KvSeparationOptions,
) -> crate::Result<Vec<BlobFile>> {
use crate::Table;
let linked_blob_files = picked_tables
.iter()
.map(|&id| {
current_version.get_table(id).unwrap_or_else(|| {
panic!("Table {id} should exist");
})
})
.map(Table::list_blob_file_references)
.collect::<Result<Vec<_>, _>>()?;
let mut linked_blob_files = linked_blob_files
.into_iter()
.flatten()
.flatten()
.map(|blob_file_ref| {
current_version
.blob_files
.get(blob_file_ref.blob_file_id)
.unwrap_or_else(|| {
panic!("Blob file {} should exist", blob_file_ref.blob_file_id);
})
})
.filter(|blob_file| {
blob_file.is_stale(current_version.gc_stats(), blob_opts.staleness_threshold)
})
.filter(|blob_file| {
!blob_file.is_dead(current_version.gc_stats())
})
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>();
linked_blob_files.sort_by_key(|a| a.id());
#[expect(
clippy::cast_precision_loss,
clippy::cast_possible_truncation,
clippy::cast_sign_loss,
reason = "precision loss and truncation are acceptable for cutoff calculation"
)]
let cutoff_point = {
let len = linked_blob_files.len() as f32;
(len * blob_opts.age_cutoff) as usize
};
linked_blob_files.drain(cutoff_point..);
for table in current_version.iter_tables() {
if picked_tables.contains(&table.id()) {
continue;
}
let other_refs = table
.list_blob_file_references()?
.unwrap_or_default()
.into_iter()
.filter(|x| linked_blob_files.iter().any(|bf| bf.id() == x.blob_file_id))
.collect::<Vec<_>>();
for additional_ref in other_refs {
linked_blob_files.retain(|x| x.id() != additional_ref.blob_file_id);
}
}
Ok(linked_blob_files.into_iter().cloned().collect::<Vec<_>>())
}
fn hidden_guard<T>(
payload: &CompactionPayload,
opts: &Options,
f: impl FnOnce() -> crate::Result<T>,
) -> crate::Result<T> {
f().inspect_err(|e| {
log::error!("Compaction failed: {e:?}");
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let mut compaction_state = opts.compaction_state.lock().expect("lock is poisoned");
compaction_state
.hidden_set_mut()
.show(payload.table_ids.iter().copied());
})
}
#[expect(clippy::too_many_lines)]
fn merge_tables(
mut compaction_state: MutexGuard<'_, CompactionState>,
version_history_lock: RwLockReadGuard<'_, SuperVersions>,
opts: &Options,
payload: &CompactionPayload,
) -> crate::Result<CompactionResult> {
if opts.stop_signal.is_stopped() {
log::debug!("Stopping before compaction because of stop signal");
return Ok(CompactionResult::nothing());
}
if compaction_state
.hidden_set()
.should_decline_compaction(payload.table_ids.iter().copied())
{
log::warn!(
"Compaction task created by {:?} contained hidden tables, declining to run it - please report this at https://github.com/fjall-rs/lsm-tree/issues/new?template=bug_report.md",
opts.strategy.get_name(),
);
return Ok(CompactionResult::nothing());
}
let current_super_version = version_history_lock.latest_version();
let Some(tables) = payload
.table_ids
.iter()
.map(|&id| current_super_version.version.get_table(id).cloned())
.collect::<Option<Vec<_>>>()
else {
log::warn!(
"Compaction task created by {:?} contained tables not referenced in the level manifest",
opts.strategy.get_name(),
);
return Ok(CompactionResult::nothing());
};
let tables_in = payload.table_ids.len();
let mut input_range_tombstones: Vec<crate::range_tombstone::RangeTombstone> = tables
.iter()
.flat_map(|t| t.range_tombstones().iter().cloned())
.collect();
input_range_tombstones.sort();
input_range_tombstones.dedup();
let mut blob_frag_map = FragmentationMap::default();
let Some(mut merge_iter) = create_compaction_stream(
¤t_super_version.version,
&payload.table_ids.iter().copied().collect::<Vec<_>>(),
opts.mvcc_gc_watermark,
opts.config.merge_operator.clone(),
opts.config.comparator.clone(),
)?
else {
log::warn!(
"Compaction task tried to compact tables that do not exist, declining to run it"
);
return Ok(CompactionResult::nothing());
};
let dst_lvl = payload.canonical_level.into();
let is_last_level = payload.dest_level == opts.config.level_count - 1;
merge_iter = merge_iter
.evict_tombstones(is_last_level)
.zero_seqnos(false);
let blobs_folder = opts.config.path.join(BLOBS_FOLDER);
let filter_ctx = Context { is_last_level };
let mut compaction_filter = opts.config.compaction_filter_factory.as_ref().map(|f| {
log::trace!("Installing custom compaction filter {:?}", f.name());
f.make_filter(&filter_ctx)
});
let mut filter_blob_writer = None;
let mut merge_iter = merge_iter.with_filter(StreamFilterAdapter::new(
compaction_filter.as_deref_mut(),
opts,
¤t_super_version.version,
&blobs_folder,
&mut filter_blob_writer,
&filter_ctx,
));
let table_writer =
super::flavour::prepare_table_writer(¤t_super_version.version, opts, payload)?;
let start = Instant::now();
let mut compactor = match &opts.config.kv_separation_opts {
Some(blob_opts) => {
merge_iter = merge_iter.with_drop_callback(&mut blob_frag_map);
let blob_files_to_rewrite = pick_blob_files_to_rewrite(
&payload.table_ids,
¤t_super_version.version,
blob_opts,
)?;
if blob_files_to_rewrite.is_empty() {
log::debug!("No blob relocation needed");
Box::new(StandardCompaction::new(table_writer, tables))
as Box<dyn super::flavour::CompactionFlavour>
} else {
log::debug!(
"Relocate blob files: {:?}",
blob_files_to_rewrite
.iter()
.map(BlobFile::id)
.collect::<Vec<_>>(),
);
let scanner = BlobFileMergeScanner::new(
blob_files_to_rewrite
.iter()
.map(|bf| BlobFileScanner::new(&bf.0.path, bf.id()))
.collect::<crate::Result<Vec<_>>>()?,
);
let writer = BlobFileWriter::new(
opts.blob_file_id_generator.clone(),
&blobs_folder,
opts.tree_id,
opts.config.descriptor_table.clone(),
opts.config.fs.clone(),
)?
.use_target_size(blob_opts.file_target_size)
.use_passthrough_compression(blob_opts.compression);
let inner = StandardCompaction::new(table_writer, tables);
Box::new(RelocatingCompaction::new(
inner,
scanner.peekable(),
writer,
blob_files_to_rewrite,
))
}
}
None => Box::new(StandardCompaction::new(table_writer, tables)),
};
log::trace!("Blob file GC preparation done in {:?}", start.elapsed());
drop(version_history_lock);
{
compaction_state
.hidden_set_mut()
.hide(payload.table_ids.iter().copied());
}
drop(compaction_state);
hidden_guard(payload, opts, || {
if !input_range_tombstones.is_empty() {
log::debug!(
"Propagating {} range tombstones to compaction output",
input_range_tombstones.len(),
);
compactor.write_range_tombstones(&input_range_tombstones);
}
for (idx, item) in merge_iter.enumerate() {
let item = item?;
compactor.write(item)?;
if idx % 1_000_000 == 0 && opts.stop_signal.is_stopped() {
log::debug!("Stopping amidst compaction because of stop signal");
return Ok(());
}
}
Ok(())
})?;
if let Some(filter) = compaction_filter {
filter.finish();
}
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let mut compaction_state = opts.compaction_state.lock().expect("lock is poisoned");
log::trace!("Acquiring super version write lock");
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let mut version_history_lock = opts.version_history.write().expect("lock is poisoned");
log::trace!("Acquired super version write lock");
log::trace!("Blob fragmentation diff: {blob_frag_map:#?}");
let extra_blob_files = filter_blob_writer
.map(BlobFileWriter::finish)
.transpose()
.inspect_err(|e| {
log::error!("Compaction failed while finishing filter blob writer: {e:?}");
compaction_state
.hidden_set_mut()
.show(payload.table_ids.iter().copied());
})?
.unwrap_or_default();
let tables_out = compactor
.finish(
&mut version_history_lock,
opts,
payload,
dst_lvl,
blob_frag_map,
extra_blob_files,
)
.inspect_err(|e| {
log::error!("Compaction failed: {e:?}");
compaction_state
.hidden_set_mut()
.show(payload.table_ids.iter().copied());
})?;
compaction_state
.hidden_set_mut()
.show(payload.table_ids.iter().copied());
version_history_lock
.maintenance(&opts.config.path, opts.mvcc_gc_watermark)
.inspect_err(|e| {
log::error!("Manifest maintenance failed: {e:?}");
})?;
drop(version_history_lock);
drop(compaction_state);
log::trace!("Compaction successful");
Ok(CompactionResult {
action: CompactionAction::Merged,
dest_level: Some(payload.dest_level),
tables_in,
tables_out,
})
}
fn drop_tables(
compaction_state: MutexGuard<'_, CompactionState>,
opts: &Options,
ids_to_drop: &[TableId],
) -> crate::Result<CompactionResult> {
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let mut version_history_lock = opts.version_history.write().expect("lock is poisoned");
if compaction_state
.hidden_set()
.should_decline_compaction(ids_to_drop.iter().copied())
{
log::warn!(
"Compaction task created by {:?} contained hidden tables, declining to run it - please report this at https://github.com/fjall-rs/lsm-tree/issues/new?template=bug_report.md",
opts.strategy.get_name(),
);
return Ok(CompactionResult::nothing());
}
let Some(tables) = ids_to_drop
.iter()
.map(|&id| {
version_history_lock
.latest_version()
.version
.get_table(id)
.cloned()
})
.collect::<Option<Vec<_>>>()
else {
log::warn!(
"Compaction task created by {:?} contained tables not referenced in the level manifest",
opts.strategy.get_name(),
);
return Ok(CompactionResult::nothing());
};
log::debug!("Dropping tables: {ids_to_drop:?}");
let mut dropped_blob_files = vec![];
version_history_lock.upgrade_version(
&opts.config.path,
|current| {
let mut copy = current.clone();
let ctx = crate::version::TransformContext::new(opts.config.comparator.as_ref());
copy.version = copy
.version
.with_dropped(ids_to_drop, &mut dropped_blob_files, &ctx)?;
Ok(copy)
},
&opts.global_seqno,
&opts.visible_seqno,
&*opts.config.fs,
)?;
if let Err(e) = version_history_lock.maintenance(&opts.config.path, opts.mvcc_gc_watermark) {
log::error!("Manifest maintenance failed: {e:?}");
return Err(e);
}
drop(version_history_lock);
for table in tables {
table.mark_as_deleted();
}
for blob_file in dropped_blob_files {
blob_file.mark_as_deleted();
}
let tables_dropped = ids_to_drop.len();
drop(compaction_state);
log::trace!("Dropped {tables_dropped} tables");
Ok(CompactionResult {
action: CompactionAction::Dropped,
dest_level: None,
tables_in: tables_dropped,
tables_out: 0,
})
}
#[cfg(test)]
mod tests {
use super::{create_compaction_stream, pick_run_indexes};
use crate::{
AbstractTree, Config, KvSeparationOptions, SequenceNumberCounter, TableId,
compaction::{Choice, CompactionStrategy, Input, state::CompactionState},
config::BlockSizePolicy,
version::Version,
};
use std::sync::Arc;
use test_log::test;
#[test]
fn compaction_stream_run_not_found() -> crate::Result<()> {
let folder = tempfile::tempdir()?;
let tree = crate::Config::new(
folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
tree.insert("a", "a", 0);
tree.flush_active_memtable(0)?;
assert!(
create_compaction_stream(
&tree.current_version(),
&[666],
0,
None,
crate::comparator::default_comparator()
)?
.is_none()
);
Ok(())
}
#[test]
#[expect(clippy::unwrap_used)]
fn compaction_stream_run() -> crate::Result<()> {
let folder = tempfile::tempdir()?;
let tree = crate::Config::new(
folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
tree.insert("a", "a", 0);
tree.flush_active_memtable(0)?;
tree.insert("b", "b", 0);
tree.flush_active_memtable(0)?;
tree.insert("c", "c", 0);
tree.flush_active_memtable(0)?;
assert_eq!(
Some((0, 2)),
pick_run_indexes(
tree.current_version()
.level(0)
.unwrap()
.iter()
.next()
.unwrap(),
&[0, 1, 2],
)
);
Ok(())
}
#[test]
#[expect(clippy::unwrap_used)]
fn compaction_stream_run_2() -> crate::Result<()> {
let folder = tempfile::tempdir()?;
let tree = crate::Config::new(
folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
tree.insert("a", "a", 0);
tree.flush_active_memtable(0)?;
tree.insert("b", "b", 0);
tree.flush_active_memtable(0)?;
tree.insert("c", "c", 0);
tree.flush_active_memtable(0)?;
assert_eq!(
Some((0, 0)),
pick_run_indexes(
tree.current_version()
.level(0)
.unwrap()
.iter()
.next()
.unwrap(),
&[0],
)
);
Ok(())
}
#[test]
#[expect(clippy::unwrap_used)]
fn compaction_stream_run_3() -> crate::Result<()> {
let folder = tempfile::tempdir()?;
let tree = crate::Config::new(
folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
tree.insert("a", "a", 0);
tree.flush_active_memtable(0)?;
tree.insert("b", "b", 0);
tree.flush_active_memtable(0)?;
tree.insert("c", "c", 0);
tree.flush_active_memtable(0)?;
assert_eq!(
Some((2, 2)),
pick_run_indexes(
tree.current_version()
.level(0)
.unwrap()
.iter()
.next()
.unwrap(),
&[2],
)
);
Ok(())
}
#[test]
#[expect(clippy::unwrap_used)]
fn compaction_stream_run_4() -> crate::Result<()> {
let folder = tempfile::tempdir()?;
let tree = crate::Config::new(
folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
tree.insert("a", "a", 0);
tree.flush_active_memtable(0)?;
tree.insert("b", "b", 0);
tree.flush_active_memtable(0)?;
tree.insert("c", "c", 0);
tree.flush_active_memtable(0)?;
assert_eq!(
None,
pick_run_indexes(
tree.current_version()
.level(0)
.unwrap()
.iter()
.next()
.unwrap(),
&[4],
)
);
Ok(())
}
#[test]
fn compaction_drop_tables() -> crate::Result<()> {
let folder = tempfile::tempdir()?;
let tree = crate::Config::new(
folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
tree.insert("a", "a", 0);
tree.flush_active_memtable(0)?;
assert_eq!(1, tree.approximate_len());
assert_eq!(0, tree.sealed_memtable_count());
tree.insert("b", "a", 1);
tree.flush_active_memtable(0)?;
assert_eq!(2, tree.approximate_len());
assert_eq!(0, tree.sealed_memtable_count());
tree.insert("c", "a", 2);
tree.flush_active_memtable(0)?;
assert_eq!(3, tree.approximate_len());
assert_eq!(0, tree.sealed_memtable_count());
tree.compact(Arc::new(crate::compaction::Fifo::new(1, None)), 3)?;
assert_eq!(0, tree.table_count());
Ok(())
}
#[test]
fn blob_file_picking_simple() -> crate::Result<()> {
struct InPlaceStrategy(Vec<TableId>);
impl CompactionStrategy for InPlaceStrategy {
fn get_name(&self) -> &'static str {
"InPlaceCompaction"
}
fn choose(&self, _: &Version, _: &Config, _: &CompactionState) -> Choice {
Choice::Merge(Input {
table_ids: self.0.iter().copied().collect(),
dest_level: 6,
target_size: 64_000_000,
canonical_level: 6, })
}
}
let folder = tempfile::tempdir()?;
let tree = crate::Config::new(
folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.data_block_size_policy(BlockSizePolicy::all(1))
.with_kv_separation(Some(
KvSeparationOptions::default()
.separation_threshold(1)
.age_cutoff(1.0)
.staleness_threshold(0.01)
.compression(crate::CompressionType::None),
))
.open()?;
tree.insert("a", "a", 0);
tree.insert("b", "b", 0);
tree.insert("c", "c", 0);
tree.flush_active_memtable(1_000)?;
assert_eq!(0, tree.sealed_memtable_count());
assert_eq!(1, tree.table_count());
assert_eq!(1, tree.blob_file_count());
tree.major_compact(1, 1_000)?;
assert_eq!(3, tree.table_count());
assert_eq!(1, tree.blob_file_count());
tree.drop_range("a"..="a")?;
assert_eq!(2, tree.table_count());
assert_eq!(1, tree.blob_file_count());
{
assert_eq!(
&{
let mut map = crate::HashMap::default();
map.insert(0, crate::blob_tree::FragmentationEntry::new(1, 1, 1));
map
},
&**tree.current_version().gc_stats(),
);
}
tree.compact(Arc::new(InPlaceStrategy(vec![2])), 1_000)?;
assert_eq!(2, tree.table_count());
assert_eq!(1, tree.blob_file_count());
{
assert_eq!(
&{
let mut map = crate::HashMap::default();
map.insert(0, crate::blob_tree::FragmentationEntry::new(1, 1, 1));
map
},
&**tree.current_version().gc_stats(),
);
}
tree.compact(Arc::new(InPlaceStrategy(vec![3, 4])), 1_000)?;
assert_eq!(1, tree.table_count());
assert_eq!(1, tree.blob_file_count());
{
assert_eq!(
crate::HashMap::default(),
**tree.current_version().gc_stats(),
);
}
Ok(())
}
}