use super::*;
use crate::{
AbstractTree, Config, MAX_SEQNO, SequenceNumberCounter, compaction::CompactionStrategy,
};
use std::sync::Arc;
use test_log::test;
fn flush_overlapping(
tree: &impl crate::AbstractTree,
count: u8,
seqno_base: u64,
) -> crate::Result<()> {
for i in 0..count {
let seqno = seqno_base + u64::from(i);
tree.insert("a", "v", seqno);
tree.insert([b'k', i].as_slice(), "v", seqno);
tree.insert("z", "v", seqno);
tree.flush_active_memtable(seqno)?;
}
Ok(())
}
#[test]
fn stcs_empty_levels() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
let strategy = Arc::new(Strategy::default());
tree.compact(strategy, 0)?;
assert_eq!(0, tree.table_count());
Ok(())
}
#[test]
fn stcs_below_min_merge_width() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
flush_overlapping(&tree, 2, 0)?;
assert_eq!(2, tree.table_count());
assert!(tree.l0_run_count() > 1, "runs should be separate");
let strategy = Arc::new(Strategy::default());
tree.compact(strategy, 2)?;
assert_eq!(2, tree.table_count());
Ok(())
}
#[test]
fn stcs_triggers_merge() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
flush_overlapping(&tree, 4, 0)?;
assert_eq!(4, tree.table_count());
let strategy = Arc::new(Strategy::default().with_min_merge_width(4));
tree.compact(strategy, 4)?;
assert_eq!(1, tree.table_count());
for i in 0..4u8 {
assert!(
tree.get([b'k', i].as_slice(), MAX_SEQNO)?.is_some(),
"key k{i} should exist after compaction",
);
}
Ok(())
}
#[test]
fn stcs_min_merge_width_2() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
flush_overlapping(&tree, 2, 0)?;
assert_eq!(2, tree.table_count());
let strategy = Arc::new(Strategy::default().with_min_merge_width(2));
tree.compact(strategy, 2)?;
assert_eq!(1, tree.table_count());
Ok(())
}
#[test]
fn stcs_space_amp_full_compaction() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
flush_overlapping(&tree, 5, 0)?;
assert_eq!(5, tree.table_count());
let strategy = Arc::new(
Strategy::default()
.with_min_merge_width(100)
.with_max_space_amplification_percent(200),
);
tree.compact(strategy, 5)?;
assert_eq!(1, tree.table_count());
Ok(())
}
#[test]
fn stcs_max_merge_width_cap() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
flush_overlapping(&tree, 8, 0)?;
assert_eq!(8, tree.table_count());
let strategy = Arc::new(
Strategy::default()
.with_min_merge_width(2)
.with_max_merge_width(3)
.with_max_space_amplification_percent(u64::MAX),
);
tree.compact(strategy, 8)?;
assert_eq!(6, tree.table_count());
Ok(())
}
#[test]
fn stcs_data_integrity_multi_compact() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
for batch in 0..5u64 {
tree.insert("a", format!("v{batch}").as_bytes(), batch);
for k in 0..4u8 {
let key = [b'k', k];
let val = format!("v{batch}");
tree.insert(key.as_slice(), val.as_bytes(), batch);
}
tree.insert("z", format!("v{batch}").as_bytes(), batch);
tree.flush_active_memtable(batch)?;
}
assert_eq!(5, tree.table_count());
let strategy = Arc::new(Strategy::default().with_min_merge_width(2));
for seqno in 5..8u64 {
tree.compact(strategy.clone(), seqno)?;
}
for k in 0..4u8 {
let val = tree.get([b'k', k].as_slice(), MAX_SEQNO)?;
assert!(val.is_some(), "key k{k} should exist");
assert_eq!(val.as_deref(), Some(b"v4".as_slice()));
}
Ok(())
}
#[test]
fn stcs_no_space_amp_trigger_below_threshold() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
flush_overlapping(&tree, 2, 0)?;
let strategy = Arc::new(
Strategy::default()
.with_min_merge_width(100)
.with_max_space_amplification_percent(200),
);
tree.compact(strategy, 2)?;
assert_eq!(2, tree.table_count());
Ok(())
}
#[test]
fn stcs_multiple_compaction_cycles() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
let strategy = Arc::new(Strategy::default().with_min_merge_width(2));
let mut seqno = 0u64;
for _cycle in 0..3 {
for _k in 0..3 {
tree.insert("a", "val", seqno);
tree.insert(format!("key_{seqno}").as_bytes(), "val", seqno);
tree.insert("z", "val", seqno);
tree.flush_active_memtable(seqno)?;
seqno += 1;
}
tree.compact(strategy.clone(), seqno)?;
}
for s in 0..seqno {
assert!(
tree.get(format!("key_{s}").as_bytes(), MAX_SEQNO)?
.is_some(),
"key_{s} should exist after multiple compaction cycles",
);
}
Ok(())
}
#[test]
fn stcs_get_name() {
let strategy = Strategy::default();
assert_eq!(strategy.get_name(), "SizeTieredCompaction");
}
#[test]
fn stcs_get_config_serialization() {
let strategy = Strategy::default()
.with_size_ratio(0.5)
.with_min_merge_width(8)
.with_max_merge_width(16)
.with_max_space_amplification_percent(300)
.with_table_target_size(128 * 1024 * 1024);
let config = strategy.get_config();
assert_eq!(config.len(), 5, "should serialize all 5 parameters");
let keys: Vec<_> = config.iter().map(|(k, _)| k.as_ref()).collect();
assert!(keys.iter().any(|k| k == b"tiered_size_ratio"));
assert!(keys.iter().any(|k| k == b"tiered_min_merge_width"));
assert!(keys.iter().any(|k| k == b"tiered_max_merge_width"));
assert!(keys.iter().any(|k| k == b"tiered_max_space_amp_pct"));
assert!(keys.iter().any(|k| k == b"tiered_target_size"));
}
#[test]
fn stcs_builder_with_size_ratio() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
flush_overlapping(&tree, 4, 0)?;
let strategy = Arc::new(
Strategy::default()
.with_size_ratio(0.01)
.with_min_merge_width(2)
.with_max_space_amplification_percent(u64::MAX),
);
tree.compact(strategy, 4)?;
assert!(tree.table_count() < 4);
Ok(())
}
#[test]
fn stcs_max_merge_width_less_than_min_no_merge() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
flush_overlapping(&tree, 4, 0)?;
assert_eq!(4, tree.table_count());
let strategy = Arc::new(
Strategy::default()
.with_min_merge_width(4)
.with_max_merge_width(2)
.with_max_space_amplification_percent(u64::MAX),
);
tree.compact(strategy, 4)?;
assert_eq!(4, tree.table_count());
Ok(())
}
#[test]
fn stcs_single_run_no_compaction() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
tree.insert("a", "v", 0);
tree.flush_active_memtable(0)?;
assert_eq!(1, tree.table_count());
let strategy = Arc::new(Strategy::default().with_min_merge_width(2));
tree.compact(strategy, 1)?;
assert_eq!(1, tree.table_count());
Ok(())
}
#[test]
fn stcs_dissimilar_sizes_break() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
tree.insert("a", "v", 0);
tree.insert("z", "v", 0);
tree.flush_active_memtable(0)?;
for k in 0..200u16 {
tree.insert("a", "v", 1);
tree.insert(k.to_be_bytes().as_slice(), "large_value_padding_xxxxx", 1);
tree.insert("z", "v", 1);
}
tree.flush_active_memtable(1)?;
let strategy = Arc::new(
Strategy::default()
.with_size_ratio(0.01)
.with_min_merge_width(2)
.with_max_space_amplification_percent(u64::MAX),
);
tree.compact(strategy, 2)?;
assert_eq!(2, tree.table_count());
Ok(())
}