use super::{Choice, CompactionStrategy, Input as CompactionInput};
use crate::{
HashSet, KvPair, TableId, compaction::state::CompactionState, config::Config, table::Table,
version::Version,
};
#[cfg(test)]
mod tests;
#[doc(hidden)]
pub const NAME: &str = "SizeTieredCompaction";
#[derive(Clone)]
pub struct Strategy {
size_ratio: f64,
min_merge_width: usize,
max_merge_width: usize,
max_space_amplification_percent: u64,
target_size: u64,
}
impl Default for Strategy {
fn default() -> Self {
Self {
size_ratio: 1.0,
min_merge_width: 4,
max_merge_width: usize::MAX,
max_space_amplification_percent: 200,
target_size: 64 * 1_024 * 1_024,
}
}
}
impl Strategy {
#[must_use]
pub fn with_size_ratio(mut self, ratio: f64) -> Self {
self.size_ratio = if ratio.is_finite() && ratio >= 0.0 {
ratio
} else {
1.0
};
self
}
#[must_use]
pub fn with_min_merge_width(mut self, width: usize) -> Self {
self.min_merge_width = width.max(2);
self
}
#[must_use]
pub fn with_max_merge_width(mut self, width: usize) -> Self {
self.max_merge_width = width.max(2);
self
}
#[must_use]
pub fn with_max_space_amplification_percent(mut self, percent: u64) -> Self {
self.max_space_amplification_percent = percent;
self
}
#[must_use]
pub fn with_table_target_size(mut self, bytes: u64) -> Self {
self.target_size = bytes;
self
}
}
struct RunInfo {
size: u64,
table_ids: Vec<TableId>,
}
fn collect_available_runs(version: &Version, state: &CompactionState) -> Vec<RunInfo> {
let l0 = version.l0();
l0.iter()
.filter_map(|run| {
if run
.iter()
.any(|table| state.hidden_set().is_hidden(table.id()))
{
return None;
}
let size = run.iter().map(Table::file_size).sum::<u64>();
let table_ids = run.iter().map(Table::id).collect();
Some(RunInfo { size, table_ids })
})
.collect()
}
impl CompactionStrategy for Strategy {
fn get_name(&self) -> &'static str {
NAME
}
fn get_config(&self) -> Vec<KvPair> {
use byteorder::{LittleEndian, WriteBytesExt};
let mut size_ratio_bytes = vec![];
#[expect(clippy::expect_used, reason = "writing into Vec should not fail")]
size_ratio_bytes
.write_f64::<LittleEndian>(self.size_ratio)
.expect("cannot fail");
vec![
(
crate::UserKey::from("tiered_size_ratio"),
crate::UserValue::from(size_ratio_bytes),
),
(
crate::UserKey::from("tiered_min_merge_width"),
crate::UserValue::from(
#[expect(
clippy::cast_possible_truncation,
reason = "min_merge_width fits in u32 for persistence; usize::MAX maps to u32::MAX"
)]
(self.min_merge_width.min(u32::MAX as usize) as u32).to_le_bytes(),
),
),
(
crate::UserKey::from("tiered_max_merge_width"),
crate::UserValue::from(
#[expect(
clippy::cast_possible_truncation,
reason = "max_merge_width fits in u32 for persistence; usize::MAX maps to u32::MAX"
)]
(self.max_merge_width.min(u32::MAX as usize) as u32).to_le_bytes(),
),
),
(
crate::UserKey::from("tiered_max_space_amp_pct"),
crate::UserValue::from(self.max_space_amplification_percent.to_le_bytes()),
),
(
crate::UserKey::from("tiered_target_size"),
crate::UserValue::from(self.target_size.to_le_bytes()),
),
]
}
fn choose(&self, version: &Version, _: &Config, state: &CompactionState) -> Choice {
let runs = collect_available_runs(version, state);
if runs.len() < 2 {
return Choice::DoNothing;
}
let total_size: u64 = runs.iter().map(|r| r.size).sum();
let largest_run_size = runs.iter().map(|r| r.size).max().unwrap_or(0);
if largest_run_size > 0 {
let lhs = u128::from(total_size).saturating_mul(100);
let rhs = u128::from(largest_run_size)
.saturating_mul(100 + u128::from(self.max_space_amplification_percent));
if lhs >= rhs {
let table_ids: HashSet<TableId> = runs
.iter()
.flat_map(|r| r.table_ids.iter().copied())
.collect();
return Choice::Merge(CompactionInput {
table_ids,
dest_level: 0,
canonical_level: 0,
target_size: self.target_size,
});
}
}
let mut sorted_runs = runs;
sorted_runs.sort_by(|a, b| a.size.cmp(&b.size));
let mut prefix_len = 1;
for window in sorted_runs.windows(2) {
let (Some(smaller), Some(larger)) = (window.first(), window.get(1)) else {
unreachable!("windows(2) always yields slices of length 2");
};
if smaller.size == 0 {
prefix_len += 1;
continue;
}
#[expect(
clippy::cast_precision_loss,
reason = "precision loss is acceptable for ratio comparison"
)]
let ratio = larger.size as f64 / smaller.size as f64;
if ratio <= 1.0 + self.size_ratio {
prefix_len += 1;
} else {
break;
}
}
if prefix_len >= self.min_merge_width {
let merge_count = prefix_len.min(self.max_merge_width);
if merge_count >= self.min_merge_width {
let table_ids: HashSet<TableId> = sorted_runs
.iter()
.take(merge_count)
.flat_map(|r| r.table_ids.iter().copied())
.collect();
return Choice::Merge(CompactionInput {
table_ids,
dest_level: 0,
canonical_level: 0,
target_size: self.target_size,
});
}
}
Choice::DoNothing
}
}