use super::*;
use crate::{AbstractTree, Config, MAX_SEQNO, SequenceNumberCounter};
use std::sync::Arc;
use test_log::test;
#[test]
fn leveled_empty_levels() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
let strategy = Arc::new(Strategy::default());
tree.compact(strategy, 0)?;
assert_eq!(0, tree.table_count());
Ok(())
}
#[test]
fn leveled_l0_below_limit() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
for i in 0..3u8 {
tree.insert([b'k', i].as_slice(), "v", 0);
tree.flush_active_memtable(0)?;
}
let before = tree.table_count();
assert_eq!(3, before);
let strategy = Arc::new(Strategy::default());
tree.compact(strategy, 0)?;
assert_eq!(before, tree.table_count());
Ok(())
}
#[test]
fn leveled_intra_l0_compaction() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
for i in 0..3u8 {
tree.insert("a", [b'v', i].as_slice(), u64::from(i));
tree.insert([b'k', i].as_slice(), "v", 0);
tree.insert("z", [b'v', i].as_slice(), u64::from(i));
tree.flush_active_memtable(0)?;
}
assert_eq!(3, tree.table_count());
assert!(
tree.l0_run_count() > 1,
"L0 should have multiple overlapping runs"
);
let strategy = Arc::new(
Strategy::default()
.with_l0_threshold(4)
.with_table_target_size(128 * 1024 * 1024),
);
tree.compact(strategy, 0)?;
assert_eq!(
1,
tree.l0_run_count(),
"L0 should have exactly 1 run after intra-L0 compaction"
);
assert_eq!(
1,
tree.table_count(),
"Tables should be merged into 1 after intra-L0 compaction"
);
for i in 0..3u8 {
assert!(tree.get([b'k', i].as_slice(), MAX_SEQNO)?.is_some());
}
assert_eq!(
tree.get("a", MAX_SEQNO)?.as_deref(),
Some([b'v', 2].as_slice()),
);
assert_eq!(
tree.get("z", MAX_SEQNO)?.as_deref(),
Some([b'v', 2].as_slice()),
);
assert!(
tree.current_version().level(1).is_none_or(|l| l.is_empty()),
"L1 should remain empty after intra-L0 compaction"
);
Ok(())
}
#[test]
fn leveled_intra_l0_preserves_newer_run_ordering() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
tree.insert("key", "old_1", 0);
tree.flush_active_memtable(0)?;
tree.insert("key", "old_2", 1);
tree.flush_active_memtable(0)?;
assert_eq!(2, tree.l0_run_count());
let strategy = Arc::new(
Strategy::default()
.with_l0_threshold(4)
.with_table_target_size(128 * 1024 * 1024),
);
tree.compact(strategy, 0)?;
assert_eq!(1, tree.l0_run_count());
tree.insert("key", "newest", 2);
tree.flush_active_memtable(0)?;
assert_eq!(2, tree.l0_run_count());
assert_eq!(
tree.get("key", MAX_SEQNO)?.as_deref(),
Some(b"newest".as_slice()),
"newer L0 run must be found before merged (older) run"
);
Ok(())
}
#[test]
fn leveled_l0_reached_limit() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
for i in 0..4u8 {
tree.insert("a", "v", 0);
tree.insert([b'k', i].as_slice(), "v", 0);
tree.insert("z", "v", 0);
tree.flush_active_memtable(0)?;
}
assert_eq!(4, tree.table_count());
let strategy = Arc::new(Strategy::default());
tree.compact(strategy, 0)?;
assert_eq!(1, tree.table_count());
Ok(())
}
#[test]
fn leveled_l0_reached_limit_disjoint() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
for i in 0..4u8 {
tree.insert([b'k', i].as_slice(), "v", 0);
tree.flush_active_memtable(0)?;
}
assert_eq!(4, tree.table_count());
let strategy = Arc::new(Strategy::default());
tree.compact(strategy, 0)?;
assert_eq!(4, tree.table_count());
Ok(())
}
#[test]
fn leveled_l0_reached_limit_disjoint_l1() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
for i in 0..4 {
tree.insert("a", "v", i);
tree.insert("b", "v", i);
tree.flush_active_memtable(0)?;
}
let fifo = Arc::new(Strategy::default());
tree.compact(fifo, 0)?;
assert_eq!(1, tree.table_count());
for i in 0..4u8 {
tree.insert([b'k', i].as_slice(), "v", 0);
tree.flush_active_memtable(0)?;
}
assert_eq!(5, tree.table_count());
let strategy = Arc::new(Strategy::default());
tree.compact(strategy, 0)?;
assert_eq!(5, tree.table_count());
Ok(())
}
#[test]
#[expect(clippy::unwrap_used)]
fn leveled_sequential_inserts() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
let strategy = Arc::new(Strategy {
target_size: 1,
..Default::default()
});
let mut table_count = 0;
for k in 0u64..30 {
table_count += 1;
tree.insert(k.to_be_bytes(), "", 0);
tree.flush_active_memtable(0)?;
assert_eq!(table_count, tree.table_count());
tree.compact(strategy.clone(), 0)?;
assert_eq!(table_count, tree.table_count());
for idx in 1..=5 {
assert_eq!(
0,
tree.current_version().level(idx).unwrap().len(),
"no tables should be in intermediary level (L{idx})",
);
}
}
Ok(())
}
#[test]
fn dynamic_leveling_empty() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
let strategy = Arc::new(Strategy::default().with_dynamic_level_bytes(true));
tree.compact(strategy, 0)?;
assert_eq!(0, tree.table_count());
Ok(())
}
#[test]
fn dynamic_leveling_basic_compaction() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
for i in 0..4u8 {
tree.insert("a", "v", u64::from(i));
tree.insert([b'k', i].as_slice(), "v", u64::from(i));
tree.insert("z", "v", u64::from(i));
tree.flush_active_memtable(u64::from(i))?;
}
assert_eq!(4, tree.table_count());
let strategy = Arc::new(Strategy::default().with_dynamic_level_bytes(true));
tree.compact(strategy, 4)?;
assert!(tree.table_count() < 4, "tables should be compacted");
for i in 0..4u8 {
assert!(tree.get([b'k', i].as_slice(), MAX_SEQNO)?.is_some());
}
Ok(())
}
#[test]
fn dynamic_leveling_data_integrity() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
let strategy = Arc::new(
Strategy::default()
.with_dynamic_level_bytes(true)
.with_table_target_size(1), );
for round in 0..5u64 {
for k in 0..4u8 {
tree.insert(
"a",
format!("r{round}").as_bytes(),
round * 4 + u64::from(k),
);
tree.insert(
[b'k', k].as_slice(),
format!("r{round}").as_bytes(),
round * 4 + u64::from(k),
);
tree.insert(
"z",
format!("r{round}").as_bytes(),
round * 4 + u64::from(k),
);
tree.flush_active_memtable(round * 4 + u64::from(k))?;
}
tree.compact(strategy.clone(), (round + 1) * 4)?;
}
for k in 0..4u8 {
let val = tree.get([b'k', k].as_slice(), MAX_SEQNO)?;
assert!(val.is_some(), "key k{k} should exist");
assert_eq!(val.as_deref(), Some(b"r4".as_slice()));
}
Ok(())
}
#[test]
fn multi_level_no_skip_when_l1_has_room() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
for i in 0..4u8 {
tree.insert("a", "v", u64::from(i));
tree.insert([b'k', i].as_slice(), "v", u64::from(i));
tree.insert("z", "v", u64::from(i));
tree.flush_active_memtable(u64::from(i))?;
}
let strategy = Arc::new(Strategy::default().with_multi_level(true));
tree.compact(strategy, 4)?;
assert!(tree.table_count() < 4);
assert!(
tree.current_version().level(2).is_none_or(|l| l.is_empty()),
"L2 should remain empty when L1 has room (no multi-level skip)",
);
for i in 0..4u8 {
assert!(tree.get([b'k', i].as_slice(), MAX_SEQNO)?.is_some());
}
Ok(())
}
#[test]
fn multi_level_data_integrity() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
let strategy = Arc::new(
Strategy::default()
.with_multi_level(true)
.with_table_target_size(1), );
for round in 0..8u64 {
for k in 0..4u8 {
tree.insert(
"a",
format!("r{round}").as_bytes(),
round * 4 + u64::from(k),
);
tree.insert(
[b'k', k].as_slice(),
format!("r{round}").as_bytes(),
round * 4 + u64::from(k),
);
tree.insert(
"z",
format!("r{round}").as_bytes(),
round * 4 + u64::from(k),
);
tree.flush_active_memtable(round * 4 + u64::from(k))?;
}
tree.compact(strategy.clone(), (round + 1) * 4)?;
}
for k in 0..4u8 {
let val = tree.get([b'k', k].as_slice(), MAX_SEQNO)?;
assert!(val.is_some(), "key k{k} should exist");
assert_eq!(val.as_deref(), Some(b"r7".as_slice()));
}
Ok(())
}
#[test]
fn leveled_get_name() {
use crate::compaction::CompactionStrategy;
let strategy = Strategy::default();
assert_eq!(strategy.get_name(), "LeveledCompaction");
}
#[test]
fn leveled_get_config_includes_new_fields() {
use crate::compaction::CompactionStrategy;
let strategy = Strategy::default()
.with_dynamic_level_bytes(true)
.with_multi_level(true);
let config = strategy.get_config();
let keys: Vec<_> = config.iter().map(|(k, _)| k.as_ref()).collect();
assert!(
keys.iter().any(|k| k == b"leveled_dynamic"),
"should have leveled_dynamic key",
);
assert!(
keys.iter().any(|k| k == b"leveled_multi_level"),
"should have leveled_multi_level key",
);
assert!(
keys.iter().any(|k| k == b"leveled_l0_threshold"),
"should have leveled_l0_threshold key",
);
assert!(
keys.iter().any(|k| k == b"leveled_target_size"),
"should have leveled_target_size key",
);
assert!(
keys.iter().any(|k| k == b"leveled_level_ratio_policy"),
"should have leveled_level_ratio_policy key",
);
}
#[test]
fn leveled_builder_chaining() {
use crate::compaction::CompactionStrategy;
let strategy = Strategy::default()
.with_l0_threshold(8)
.with_table_target_size(32 * 1024 * 1024)
.with_level_ratio_policy(vec![8.0, 10.0])
.with_dynamic_level_bytes(true)
.with_multi_level(true);
let config = strategy.get_config();
assert!(!config.is_empty());
assert_eq!(strategy.get_name(), "LeveledCompaction");
}
#[test]
fn dynamic_leveling_multiple_levels() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
let strategy = Arc::new(
Strategy::default()
.with_dynamic_level_bytes(true)
.with_l0_threshold(4)
.with_table_target_size(1), );
let mut seqno = 0u64;
for _round in 0..12 {
for _k in 0..4 {
tree.insert("a", "val", seqno);
tree.insert(format!("key_{seqno}").as_bytes(), "val", seqno);
tree.insert("z", "val", seqno);
tree.flush_active_memtable(seqno)?;
seqno += 1;
}
for _ in 0..3 {
tree.compact(strategy.clone(), seqno)?;
}
}
for s in 0..seqno {
assert!(
tree.get(format!("key_{s}").as_bytes(), MAX_SEQNO)?
.is_some(),
"key_{s} should exist",
);
}
Ok(())
}
#[test]
fn multi_level_skip_fires_when_l1_oversized() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
let leveled = Arc::new(Strategy::default().with_l0_threshold(4));
let mut seqno = 0u64;
for _round in 0..3 {
for _k in 0..4 {
tree.insert("a", "val", seqno);
tree.insert(format!("k_{seqno}").as_bytes(), "val", seqno);
tree.insert("z", "val", seqno);
tree.flush_active_memtable(seqno)?;
seqno += 1;
}
tree.compact(leveled.clone(), seqno)?;
}
let multi = Arc::new(
Strategy::default()
.with_multi_level(true)
.with_table_target_size(64)
.with_l0_threshold(4),
);
for _k in 0..8 {
tree.insert("a", "val", seqno);
tree.insert(format!("k_{seqno}").as_bytes(), "val", seqno);
tree.insert("z", "val", seqno);
tree.flush_active_memtable(seqno)?;
seqno += 1;
}
let result = tree.compact(multi, seqno)?;
assert_eq!(
result.action,
crate::compaction::CompactionAction::Merged,
"multi-level skip should produce a Merged action",
);
assert!(
result.dest_level.is_some_and(|lvl| lvl >= 2),
"multi-level skip should target L2+, got {:?}",
result.dest_level,
);
let version = tree.current_version();
let has_deep_data =
(2..version.level_count()).any(|idx| version.level(idx).is_some_and(|l| !l.is_empty()));
assert!(
has_deep_data,
"data should exist in L2+ after compaction with multi_level",
);
for s in 0..seqno {
assert!(
tree.get(format!("k_{s}").as_bytes(), MAX_SEQNO)?.is_some(),
"key k_{s} should exist",
);
}
Ok(())
}
#[test]
fn multi_level_sparse_keyspace_data_integrity() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
let mut seqno = 0u64;
let leveled = Arc::new(Strategy::default().with_l0_threshold(4));
for _round in 0..3 {
for _k in 0..4 {
tree.insert("a", "val", seqno);
tree.insert(format!("k_{seqno}").as_bytes(), "val", seqno);
tree.insert("z", "val", seqno);
tree.flush_active_memtable(seqno)?;
seqno += 1;
}
tree.compact(leveled.clone(), seqno)?;
}
let multi = Arc::new(
Strategy::default()
.with_multi_level(true)
.with_table_target_size(64)
.with_l0_threshold(4),
);
for _k in 0..8 {
tree.insert("a", "val", seqno);
tree.insert(format!("k_{seqno}").as_bytes(), "val", seqno);
tree.insert("z", "val", seqno);
tree.flush_active_memtable(seqno)?;
seqno += 1;
}
let result = tree.compact(multi, seqno)?;
assert_eq!(
result.action,
crate::compaction::CompactionAction::Merged,
"compaction should produce a Merged action",
);
assert!(
result.dest_level.is_some_and(|lvl| lvl >= 2),
"compaction should target L2+, got {:?}",
result.dest_level,
);
let version = tree.current_version();
let has_deep_data =
(2..version.level_count()).any(|idx| version.level(idx).is_some_and(|l| !l.is_empty()));
assert!(
has_deep_data,
"data should exist in L2+ after compaction with multi_level",
);
for s in 0..seqno {
assert!(
tree.get(format!("k_{s}").as_bytes(), MAX_SEQNO)?.is_some(),
"key k_{s} should exist",
);
}
Ok(())
}
#[test]
fn dynamic_leveling_fallback_to_static() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
let strategy = Arc::new(
Strategy::default()
.with_dynamic_level_bytes(true)
.with_table_target_size(64 * 1024 * 1024) .with_l0_threshold(4),
);
for i in 0..4u8 {
tree.insert("a", "v", u64::from(i));
tree.insert([b'k', i].as_slice(), "v", u64::from(i));
tree.insert("z", "v", u64::from(i));
tree.flush_active_memtable(u64::from(i))?;
}
let result = tree.compact(strategy, 4)?;
assert_ne!(
result.action,
crate::compaction::CompactionAction::Nothing,
"compaction should have done work",
);
for i in 0..4u8 {
assert!(tree.get([b'k', i].as_slice(), MAX_SEQNO)?.is_some());
}
Ok(())
}
#[test]
fn multi_level_with_both_flags() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
let strategy = Arc::new(
Strategy::default()
.with_dynamic_level_bytes(true)
.with_multi_level(true)
.with_table_target_size(1),
);
let mut seqno = 0u64;
for _round in 0..10 {
for _k in 0..4 {
tree.insert("a", "val", seqno);
tree.insert(format!("key_{seqno}").as_bytes(), "val", seqno);
tree.insert("z", "val", seqno);
tree.flush_active_memtable(seqno)?;
seqno += 1;
}
for _ in 0..3 {
tree.compact(strategy.clone(), seqno)?;
}
}
for s in 0..seqno {
assert!(
tree.get(format!("key_{s}").as_bytes(), MAX_SEQNO)?
.is_some(),
"key_{s} should exist",
);
}
Ok(())
}