use super::{create_compaction_stream, pick_run_indexes};
use crate::{
AbstractTree, Config, KvSeparationOptions, SequenceNumberCounter, Table, TableId,
compaction::{Choice, CompactionStrategy, Input, state::CompactionState},
config::BlockSizePolicy,
version::Version,
};
use std::sync::Arc;
use test_log::test;
struct FirstByteComparator;
impl crate::comparator::UserComparator for FirstByteComparator {
fn name(&self) -> &'static str {
"test-first-byte"
}
fn compare(&self, a: &[u8], b: &[u8]) -> core::cmp::Ordering {
a.first().cmp(&b.first())
}
}
#[test]
fn boundary_candidates_dedups_comparator_equal_keys() {
let cmp: crate::comparator::SharedComparator = Arc::new(FirstByteComparator);
let keys = vec![
crate::UserKey::from("a1"),
crate::UserKey::from("a2"),
crate::UserKey::from("b1"),
];
let out = super::boundary_candidates(keys, &cmp);
assert_eq!(
out.len(),
1,
"comparator-equal keys must collapse to a single boundary candidate",
);
assert_eq!(
out.first().and_then(|k| k.first()),
Some(&b'a'),
"the surviving boundary should be from the deduped a-group",
);
}
#[cfg(feature = "parallel")]
#[test]
fn failed_subcompaction_rolls_back_and_restores_inputs() -> crate::Result<()> {
use core::sync::atomic::Ordering;
const N: u64 = 4_000;
let key = |i: u64| format!("key_{i:08}");
let val = |i: u64, generation: u64| format!("g{generation}-{i}-{}", "x".repeat(40));
let dir = tempfile::tempdir()?;
let config = Config::new(
&dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.data_block_size_policy(BlockSizePolicy::all(512))
.compaction_threads(4)
.subcompaction_min_bytes(0)
.with_kv_separation(Some(
KvSeparationOptions::default().separation_threshold(16),
));
let failpoint = config.fail_one_subcompaction.clone();
let tree = config.open()?;
for i in 0..N {
tree.insert(key(i), val(i, 0), i);
}
tree.flush_active_memtable(0)?;
tree.major_compact(4_096, 0)?;
for i in 0..N {
tree.insert(key(i), val(i, 1), N + i);
}
tree.flush_active_memtable(0)?;
let tables_before = tree.table_count();
failpoint.store(true, Ordering::SeqCst);
let result = tree.major_compact(u64::MAX, 0);
assert!(
result.is_err(),
"a failing sub-compaction range must abort the compaction",
);
assert!(
!failpoint.load(Ordering::SeqCst),
"the failpoint should have fired and disarmed itself",
);
assert_eq!(
tree.table_count(),
tables_before,
"rollback must leave nothing partially installed",
);
for i in 0..N {
assert_eq!(
tree.get(key(i), crate::MAX_SEQNO)?.as_deref(),
Some(val(i, 1).as_bytes()),
"value for {} must survive the rolled-back compaction",
key(i),
);
}
Ok(())
}
#[test]
fn tight_space_crash_after_first_slice_recovers_all_keys_on_reopen() -> crate::Result<()> {
use core::sync::atomic::Ordering;
const N: u64 = 2_000;
let k = |i: u64| format!("key{i:08}");
let dir = tempfile::tempdir()?;
let mem = crate::fs::MemFs::with_capacity(u64::MAX);
let config = Config::new(
&dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.data_block_size_policy(BlockSizePolicy::all(512))
.with_shared_fs(Arc::new(mem.clone()));
let failpoint = config.fail_tight_after_first_slice.clone();
let tree = match config.open()? {
crate::AnyTree::Standard(t) => t,
crate::AnyTree::Blob(_) => panic!("expected Standard tree"),
};
for i in 0..N {
tree.insert(k(i).as_bytes(), vec![0xCDu8; 64], i);
}
tree.flush_active_memtable(0)?;
let used = tree.storage_stats()?.used_bytes;
mem.set_capacity(used + used / 4);
tree.update_runtime_config(|c| {
c.storage_admission_check = true;
c.tight_space_compaction = true;
})?;
failpoint.store(true, Ordering::SeqCst);
assert!(
tree.major_compact(64 * 1024 * 1024, 0).is_err(),
"the crash failpoint must abort the tight-space compaction",
);
assert!(
!failpoint.load(Ordering::SeqCst),
"the failpoint should have fired and disarmed",
);
assert!(
mem.punched_bytes() > 0,
"the first slice must have punched before the crash",
);
drop(tree);
let reopened = Config::new(
&dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.with_shared_fs(Arc::new(mem))
.open()?;
for i in 0..N {
assert!(
reopened.get(k(i).as_bytes(), crate::MAX_SEQNO)?.is_some(),
"key {i} lost after a crash mid tight-space compaction + reopen",
);
}
Ok(())
}
#[test]
fn tight_space_blob_relocation_crash_after_first_slice_recovers_all_keys() -> crate::Result<()> {
use core::sync::atomic::Ordering;
const N: u64 = 4_000;
let k = |i: u64| format!("key{i:08}");
let val = |i: u64, generation: u8| -> Vec<u8> {
let mut s = (i + 1).wrapping_mul(0x9E37_79B9_7F4A_7C15) ^ (u64::from(generation) << 1);
(0..200u32)
.map(|_| {
s ^= s << 13;
s ^= s >> 7;
s ^= s << 17;
#[expect(
clippy::cast_possible_truncation,
reason = "xorshift byte extraction; the high bits are intentionally dropped"
)]
let byte = (s >> 24) as u8;
byte
})
.collect()
};
let dir = tempfile::tempdir()?;
let mem = crate::fs::MemFs::with_capacity(u64::MAX);
let config = Config::new(
&dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.data_block_size_policy(BlockSizePolicy::all(512))
.with_shared_fs(Arc::new(mem.clone()))
.with_kv_separation(Some(
KvSeparationOptions::default()
.separation_threshold(64)
.age_cutoff(1.0)
.staleness_threshold(0.1)
.file_target_size(48 * 1024),
));
let failpoint = config.fail_tight_after_first_slice.clone();
let tree = match config.open()? {
crate::AnyTree::Blob(t) => t,
crate::AnyTree::Standard(_) => panic!("expected Blob tree"),
};
for i in 0..N {
tree.insert(k(i).as_bytes(), val(i, 1), i);
}
tree.flush_active_memtable(0)?;
for i in (0..N).step_by(2) {
tree.insert(k(i).as_bytes(), val(i, 2), N + i);
}
tree.flush_active_memtable(0)?;
let gc_watermark = 4 * N;
tree.index.update_runtime_config(|c| {
c.storage_admission_check = true;
c.storage_limit_bytes = None;
})?;
tree.major_compact(64 * 1024 * 1024, gc_watermark)?;
let used = tree.storage_stats()?.used_bytes;
mem.set_capacity(used + used / 4);
tree.index.update_runtime_config(|c| {
c.tight_space_compaction = true;
})?;
failpoint.store(true, Ordering::SeqCst);
assert!(
tree.major_compact(64 * 1024 * 1024, gc_watermark).is_err(),
"the crash failpoint must abort the relocating tight-space compaction",
);
assert!(
!failpoint.load(Ordering::SeqCst),
"the failpoint should have fired and disarmed",
);
assert!(
mem.punched_bytes() > 0,
"the first relocated slice must have punched a stale blob prefix",
);
drop(tree);
let reopened = match Config::new(
&dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.with_kv_separation(Some(
KvSeparationOptions::default().separation_threshold(64),
))
.with_shared_fs(Arc::new(mem))
.open()?
{
crate::AnyTree::Blob(t) => t,
crate::AnyTree::Standard(_) => panic!("expected Blob tree"),
};
for i in 0..N {
let expected = if i % 2 == 0 { val(i, 2) } else { val(i, 1) };
assert_eq!(
reopened.get(k(i).as_bytes(), crate::MAX_SEQNO)?.as_deref(),
Some(expected.as_slice()),
"key {i} wrong/lost after a crash mid blob-relocation + reopen",
);
}
Ok(())
}
#[test]
fn last_level_applies_and_gcs_below_watermark_range_tombstone() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
&dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.data_block_size_policy(BlockSizePolicy::all(512))
.compaction_threads(4)
.subcompaction_min_bytes(0)
.open()?;
let key = |i: u64| format!("k{i:04}");
let val = |i: u64| format!("v{i}-{}", "x".repeat(40));
for i in 0..200u64 {
tree.insert(key(i), val(i), i);
}
tree.flush_active_memtable(0)?;
tree.major_compact(4_096, 0)?;
tree.remove_range(
crate::UserKey::from("k0000"),
crate::UserKey::from("k0050"),
1000,
);
for i in 50..200u64 {
tree.insert(key(i), val(i), 1001 + i);
}
tree.flush_active_memtable(0)?;
tree.major_compact(u64::MAX, 5000)?;
for i in 0..50u64 {
assert_eq!(
tree.get(key(i), crate::MAX_SEQNO)?,
None,
"covered key {} must be physically gone after GC",
key(i),
);
}
for i in 50..200u64 {
assert!(
tree.get(key(i), crate::MAX_SEQNO)?.is_some(),
"uncovered key {} must survive",
key(i),
);
}
let remaining = super::collect_version_tombstones(&tree.current_version());
assert!(
remaining.is_empty(),
"a fully-applied below-watermark tombstone must be GC'd, found {remaining:?}",
);
Ok(())
}
#[test]
fn above_watermark_range_tombstone_is_retained() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
&dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
let key = |i: u64| format!("k{i:04}");
for i in 0..50u64 {
tree.insert(key(i), "v", i);
}
tree.flush_active_memtable(0)?;
tree.remove_range(
crate::UserKey::from("k0000"),
crate::UserKey::from("k0025"),
100,
);
tree.flush_active_memtable(0)?;
tree.major_compact(u64::MAX, 50)?;
let remaining = super::collect_version_tombstones(&tree.current_version());
assert!(
!remaining.is_empty(),
"an above-watermark tombstone must be retained, not GC'd",
);
Ok(())
}
#[test]
fn range_tombstone_at_exact_watermark_is_not_applied_or_gced() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
&dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.data_block_size_policy(BlockSizePolicy::all(512))
.compaction_threads(4)
.subcompaction_min_bytes(0)
.open()?;
let key = |i: u64| format!("k{i:04}");
let val = |i: u64| format!("v{i}-{}", "x".repeat(40));
for i in 0..200u64 {
tree.insert(key(i), val(i), i);
}
tree.flush_active_memtable(0)?;
tree.major_compact(4_096, 0)?;
tree.remove_range(
crate::UserKey::from("k0000"),
crate::UserKey::from("k0050"),
1000,
);
for i in 50..200u64 {
tree.insert(key(i), val(i), 1001 + i);
}
tree.flush_active_memtable(0)?;
tree.major_compact(u64::MAX, 1000)?;
for i in 0..50u64 {
assert_eq!(
tree.get(key(i), 1000)?.as_deref(),
Some(val(i).as_bytes()),
"covered key {} must survive: RT@watermark is invisible at read==watermark",
key(i),
);
}
let remaining = super::collect_version_tombstones(&tree.current_version());
assert!(
!remaining.is_empty(),
"a tombstone at the exact watermark must be retained, not GC'd",
);
Ok(())
}
#[test]
fn compaction_stream_run_not_found() -> crate::Result<()> {
let folder = tempfile::tempdir()?;
let tree = crate::Config::new(
folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
tree.insert("a", "a", 0);
tree.flush_active_memtable(0)?;
assert!(
create_compaction_stream(
&tree.current_version(),
&[666],
0,
None,
crate::comparator::default_comparator()
)?
.is_none()
);
Ok(())
}
#[test]
#[expect(clippy::unwrap_used)]
fn compaction_stream_run() -> crate::Result<()> {
let folder = tempfile::tempdir()?;
let tree = crate::Config::new(
folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
tree.insert("a", "a", 0);
tree.flush_active_memtable(0)?;
tree.insert("b", "b", 0);
tree.flush_active_memtable(0)?;
tree.insert("c", "c", 0);
tree.flush_active_memtable(0)?;
assert_eq!(
Some((0, 2)),
pick_run_indexes(
tree.current_version()
.level(0)
.unwrap()
.iter()
.next()
.unwrap(),
&[0, 1, 2],
)
);
Ok(())
}
#[test]
#[expect(clippy::unwrap_used)]
fn compaction_stream_run_2() -> crate::Result<()> {
let folder = tempfile::tempdir()?;
let tree = crate::Config::new(
folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
tree.insert("a", "a", 0);
tree.flush_active_memtable(0)?;
tree.insert("b", "b", 0);
tree.flush_active_memtable(0)?;
tree.insert("c", "c", 0);
tree.flush_active_memtable(0)?;
assert_eq!(
Some((0, 0)),
pick_run_indexes(
tree.current_version()
.level(0)
.unwrap()
.iter()
.next()
.unwrap(),
&[0],
)
);
Ok(())
}
#[test]
#[expect(clippy::unwrap_used)]
fn compaction_stream_run_3() -> crate::Result<()> {
let folder = tempfile::tempdir()?;
let tree = crate::Config::new(
folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
tree.insert("a", "a", 0);
tree.flush_active_memtable(0)?;
tree.insert("b", "b", 0);
tree.flush_active_memtable(0)?;
tree.insert("c", "c", 0);
tree.flush_active_memtable(0)?;
assert_eq!(
Some((2, 2)),
pick_run_indexes(
tree.current_version()
.level(0)
.unwrap()
.iter()
.next()
.unwrap(),
&[2],
)
);
Ok(())
}
#[test]
#[expect(clippy::unwrap_used)]
fn compaction_stream_run_4() -> crate::Result<()> {
let folder = tempfile::tempdir()?;
let tree = crate::Config::new(
folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
tree.insert("a", "a", 0);
tree.flush_active_memtable(0)?;
tree.insert("b", "b", 0);
tree.flush_active_memtable(0)?;
tree.insert("c", "c", 0);
tree.flush_active_memtable(0)?;
assert_eq!(
None,
pick_run_indexes(
tree.current_version()
.level(0)
.unwrap()
.iter()
.next()
.unwrap(),
&[4],
)
);
Ok(())
}
#[test]
fn compaction_drop_tables() -> crate::Result<()> {
let folder = tempfile::tempdir()?;
let tree = crate::Config::new(
folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
tree.insert("a", "a", 0);
tree.flush_active_memtable(0)?;
assert_eq!(1, tree.approximate_len());
assert_eq!(0, tree.sealed_memtable_count());
tree.insert("b", "a", 1);
tree.flush_active_memtable(0)?;
assert_eq!(2, tree.approximate_len());
assert_eq!(0, tree.sealed_memtable_count());
tree.insert("c", "a", 2);
tree.flush_active_memtable(0)?;
assert_eq!(3, tree.approximate_len());
assert_eq!(0, tree.sealed_memtable_count());
tree.compact(Arc::new(crate::compaction::Fifo::new(1, None)), 3)?;
assert_eq!(0, tree.table_count());
Ok(())
}
#[test]
fn blob_file_picking_simple() -> crate::Result<()> {
struct InPlaceStrategy(Vec<TableId>);
impl CompactionStrategy for InPlaceStrategy {
fn get_name(&self) -> &'static str {
"InPlaceCompaction"
}
fn choose(&self, _: &Version, _: &Config, _: &CompactionState) -> Choice {
Choice::Merge(Input {
table_ids: self.0.iter().copied().collect(),
dest_level: 6,
target_size: 64_000_000,
canonical_level: 6, })
}
}
let folder = tempfile::tempdir()?;
let tree = crate::Config::new(
folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.data_block_size_policy(BlockSizePolicy::all(1))
.with_kv_separation(Some(
KvSeparationOptions::default()
.separation_threshold(1)
.age_cutoff(1.0)
.staleness_threshold(0.01)
.compression(crate::CompressionType::None),
))
.open()?;
tree.insert("a", "a", 0);
tree.insert("b", "b", 0);
tree.insert("c", "c", 0);
tree.flush_active_memtable(1_000)?;
assert_eq!(0, tree.sealed_memtable_count());
assert_eq!(1, tree.table_count());
assert_eq!(1, tree.blob_file_count());
tree.major_compact(1, 1_000)?;
assert_eq!(3, tree.table_count());
assert_eq!(1, tree.blob_file_count());
tree.drop_range("a"..="a")?;
assert_eq!(2, tree.table_count());
assert_eq!(1, tree.blob_file_count());
{
assert_eq!(
&{
let mut map = crate::HashMap::default();
map.insert(0, crate::blob_tree::FragmentationEntry::new(1, 1, 1));
map
},
&**tree.current_version().gc_stats(),
);
}
tree.compact(Arc::new(InPlaceStrategy(vec![2])), 1_000)?;
assert_eq!(2, tree.table_count());
assert_eq!(1, tree.blob_file_count());
{
assert_eq!(
&{
let mut map = crate::HashMap::default();
map.insert(0, crate::blob_tree::FragmentationEntry::new(1, 1, 1));
map
},
&**tree.current_version().gc_stats(),
);
}
tree.compact(Arc::new(InPlaceStrategy(vec![3, 4])), 1_000)?;
assert_eq!(1, tree.table_count());
assert_eq!(1, tree.blob_file_count());
{
assert_eq!(
crate::HashMap::default(),
**tree.current_version().gc_stats(),
);
}
Ok(())
}
#[expect(
clippy::expect_used,
clippy::indexing_slicing,
reason = "test asserts over known-good fixtures; failure surfaces via panic"
)]
#[test]
fn narrow_merge_candidates_for_full_run_are_adjacent_pairs_sorted_ascending() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
&dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.data_block_size_policy(BlockSizePolicy::all(512))
.open()?;
for i in 0..3_000u64 {
tree.insert(format!("k{i:08}"), "v".repeat(40), i);
}
tree.flush_active_memtable(0)?;
tree.major_compact(16 * 1024, 0)?;
let version = tree.current_version();
let run = version
.iter_levels()
.flat_map(|level| level.iter())
.find(|run| run.len() >= 3)
.expect("a bottom-level run with >= 3 tables");
let ordered: Vec<(TableId, u64)> = run.iter().map(|t| (t.id(), t.file_size())).collect();
let payload = Input {
table_ids: ordered.iter().map(|(id, _)| *id).collect(),
dest_level: 6,
canonical_level: 6,
target_size: 64 * 1024 * 1024,
};
let candidates = super::narrow_merge_candidates(&version, &payload);
assert_eq!(
candidates.len(),
ordered.len() - 1,
"one candidate per run-adjacent pair"
);
for c in &candidates {
assert_eq!(c.table_ids.len(), 2, "each candidate is an adjacent pair");
assert_eq!(c.dest_level, 6, "destination preserved");
}
let combined = |c: &Input| -> u64 {
c.table_ids
.iter()
.filter_map(|id| version.get_table(*id))
.map(Table::file_size)
.sum()
};
let sums: Vec<u64> = candidates.iter().map(combined).collect();
let mut sorted = sums.clone();
sorted.sort_unstable();
assert_eq!(sums, sorted, "candidates sorted ascending by SST size");
let smallest_pair = ordered
.windows(2)
.map(|w| w[0].1 + w[1].1)
.min()
.expect(">= 2 tables");
assert_eq!(sums[0], smallest_pair, "smallest-Σ pair is tried first");
Ok(())
}
#[test]
fn space_fits_two_layer_combines_shared_volume_outputs_and_separates_routed_ones()
-> crate::Result<()> {
use crate::fs::MemFs;
const MIB: u64 = 1024 * 1024;
let dir = tempfile::tempdir()?;
let cfg = Config::new(
&dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.with_shared_fs(Arc::new(MemFs::with_capacity(100 * MIB)));
assert!(
!super::space_fits_two_layer(&cfg, u64::MAX, 60 * MIB, 6, 60 * MIB),
"shared-volume outputs must be summed, not checked independently"
);
assert!(super::space_fits_two_layer(
&cfg,
u64::MAX,
60 * MIB,
6,
30 * MIB
));
assert!(!super::space_fits_two_layer(
&cfg,
80 * MIB,
50 * MIB,
6,
40 * MIB
));
let routed = Config::new(
&dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.with_shared_fs(Arc::new(MemFs::with_capacity(100 * MIB)))
.level_routes(vec![crate::config::LevelRoute {
levels: 6..7,
path: crate::path::PathBuf::from("/cold-tier"),
fs: Arc::new(MemFs::with_capacity(100 * MIB)),
}]);
assert!(
super::space_fits_two_layer(&routed, u64::MAX, 60 * MIB, 6, 60 * MIB),
"proven-independent volumes are checked independently"
);
assert!(!super::space_fits_two_layer(
&routed,
u64::MAX,
60 * MIB,
6,
130 * MIB
));
let shared: Arc<dyn crate::fs::Fs> = Arc::new(MemFs::with_capacity(100 * MIB));
let routed_same_mount = Config::new(
&dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.with_shared_fs(Arc::clone(&shared))
.level_routes(vec![crate::config::LevelRoute {
levels: 6..7,
path: crate::path::PathBuf::from("/same-mount-subdir"),
fs: Arc::clone(&shared),
}]);
assert!(
!super::space_fits_two_layer(&routed_same_mount, u64::MAX, 60 * MIB, 6, 60 * MIB),
"a route on the same volume must combine budgets, not admit each independently"
);
Ok(())
}
#[expect(
clippy::expect_used,
reason = "test asserts over known-good fixtures; failure surfaces via panic"
)]
#[test]
fn space_gate_for_merge_narrows_a_full_run_that_exceeds_free() -> crate::Result<()> {
use crate::fs::MemFs;
let dir = tempfile::tempdir()?;
let mem = MemFs::with_capacity(u64::MAX);
let any = Config::new(
&dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.with_shared_fs(Arc::new(mem.clone()))
.data_block_size_policy(BlockSizePolicy::all(512))
.open()?;
let crate::AnyTree::Standard(tree) = any else {
panic!("expected Standard tree");
};
for i in 0..3_000u64 {
tree.insert(format!("k{i:08}"), "v".repeat(40), i);
}
tree.flush_active_memtable(0)?;
tree.major_compact(16 * 1024, 0)?;
let version = tree.current_version();
let run = version
.iter_levels()
.flat_map(|level| level.iter())
.find(|run| run.len() >= 3)
.expect("a bottom-level run with >= 3 tables");
let run_sigma: u64 = run.iter().map(Table::file_size).sum();
let payload = Input {
table_ids: run.iter().map(Table::id).collect(),
dest_level: 6,
canonical_level: 6,
target_size: 64 * 1024 * 1024,
};
let probe_capacity = 1u64 << 40;
mem.set_capacity(probe_capacity);
let stored =
probe_capacity - crate::fs::Fs::available_space(&mem, dir.path()).unwrap_or(probe_capacity);
mem.set_capacity(stored + run_sigma - 1);
tree.update_runtime_config(|c| {
c.storage_admission_check = true;
c.storage_limit_bytes = None;
})?;
let opts = super::Options::from_tree(
&tree,
Arc::new(crate::compaction::major::Strategy::new(64 * 1024 * 1024)),
);
match super::space_gate_for_merge(&version, &opts, &payload)? {
super::SpaceGate::Narrowed(narrowed) => {
assert_eq!(narrowed.table_ids.len(), 2, "narrowed to an adjacent pair");
}
super::SpaceGate::Run => {
panic!("expected Narrowed, got Run (full run wrongly admitted)")
}
super::SpaceGate::Skip => panic!("expected Narrowed, got Skip (no pair admitted)"),
}
Ok(())
}