pub mod filter;
pub mod merge;
#[cfg(test)]
mod merge_tests;
use crate::sstable::{SSTable, SSTableBuilder};
use bytes::Bytes;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use thiserror::Error;
pub use filter::{CompactionFilter, FilterDecision};
pub use merge::MergeIterator;
#[derive(Debug, Error)]
pub enum CompactionError {
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("SSTable error: {0}")]
SSTable(#[from] crate::sstable::SSTableError),
#[error("No SSTables to compact")]
NoInput,
}
pub type Result<T> = std::result::Result<T, CompactionError>;
pub fn compact_sstables(
input_paths: &[PathBuf],
output_path: impl AsRef<Path>,
compaction_level: usize,
filter: Option<Arc<dyn CompactionFilter>>,
oldest_snapshot: u64,
) -> Result<(PathBuf, u64)> {
if input_paths.is_empty() {
return Err(CompactionError::NoInput);
}
let mut sstables = Vec::new();
for path in input_paths {
let sstable = SSTable::open(path)?;
sstables.push(sstable);
}
let merge = MergeIterator::with_gc(sstables, compaction_level, filter, oldest_snapshot)?;
let output_path = output_path.as_ref().to_path_buf();
let mut builder = SSTableBuilder::create(&output_path)?;
for result in merge {
let (key, value) = result?;
builder.add_raw_mvcc(key, value)?;
}
builder.finish()?;
let metadata = std::fs::metadata(&output_path)?;
let size = metadata.len();
Ok((output_path, size))
}
pub fn compact_sstables_buffered(
input_paths: &[PathBuf],
compaction_level: usize,
filter: Option<Arc<dyn CompactionFilter>>,
oldest_snapshot: u64,
) -> Result<Bytes> {
if input_paths.is_empty() {
return Err(CompactionError::NoInput);
}
let mut sstables = Vec::new();
for path in input_paths {
let sstable = SSTable::open(path)?;
sstables.push(sstable);
}
let merge = MergeIterator::with_gc(sstables, compaction_level, filter, oldest_snapshot)?;
let mut builder = SSTableBuilder::new_buffered();
for result in merge {
let (key, value) = result?;
builder.add_raw_mvcc(key, value)?;
}
let bytes = builder.finish_to_bytes()?;
Ok(bytes)
}
#[derive(Debug, Clone)]
pub struct Level {
level_num: usize,
sstables: Vec<PathBuf>,
size: u64,
size_threshold: u64,
}
impl Level {
#[must_use]
pub const fn new(level_num: usize, size_threshold: u64) -> Self {
Self {
level_num,
sstables: Vec::new(),
size: 0,
size_threshold,
}
}
pub fn add_sstable(&mut self, path: PathBuf, size: u64) {
self.sstables.push(path);
self.size += size;
}
#[must_use]
pub const fn needs_compaction(&self) -> bool {
self.size >= self.size_threshold
}
#[must_use]
pub const fn num_sstables(&self) -> usize {
self.sstables.len()
}
#[must_use]
pub const fn size(&self) -> u64 {
self.size
}
#[must_use]
pub const fn level_num(&self) -> usize {
self.level_num
}
#[must_use]
pub fn sstables(&self) -> &[PathBuf] {
&self.sstables
}
}
#[derive(Debug, Clone, Copy)]
pub enum CompactionStrategy {
Fixed(u64),
Adaptive {
current_ratio: u64,
min_ratio: u64,
max_ratio: u64,
},
}
impl CompactionStrategy {
#[must_use]
pub const fn current_ratio(&self) -> u64 {
match self {
Self::Fixed(ratio) => *ratio,
Self::Adaptive { current_ratio, .. } => *current_ratio,
}
}
pub fn adjust_for_workload(&mut self, writes: u64, reads: u64) {
if let Self::Adaptive {
current_ratio,
min_ratio,
max_ratio,
} = self
{
if reads == 0 || writes == 0 {
return; }
let z = 1.5;
let ratio_f64 = ((z * writes as f64) / reads as f64).sqrt();
let new_ratio = ratio_f64.round() as u64;
*current_ratio = new_ratio.max(*min_ratio).min(*max_ratio);
}
}
}
#[derive(Clone)]
pub struct LSMTree {
levels: Vec<Level>,
strategy: CompactionStrategy,
base_size: u64,
data_dir: PathBuf,
last_workload: (u64, u64),
}
impl LSMTree {
pub fn new(
data_dir: impl AsRef<Path>,
base_size: u64,
size_ratio: u64,
num_levels: usize,
) -> Self {
Self::with_strategy(
data_dir,
base_size,
num_levels,
CompactionStrategy::Fixed(size_ratio),
)
}
pub fn new_adaptive(
data_dir: impl AsRef<Path>,
base_size: u64,
num_levels: usize,
min_ratio: u64,
max_ratio: u64,
) -> Self {
let initial_ratio = u64::midpoint(min_ratio, max_ratio); let strategy = CompactionStrategy::Adaptive {
current_ratio: initial_ratio,
min_ratio,
max_ratio,
};
Self::with_strategy(data_dir, base_size, num_levels, strategy)
}
fn with_strategy(
data_dir: impl AsRef<Path>,
base_size: u64,
num_levels: usize,
strategy: CompactionStrategy,
) -> Self {
let size_ratio = strategy.current_ratio();
let mut levels = Vec::with_capacity(num_levels);
levels.push(Level::new(0, u64::MAX));
for i in 1..num_levels {
let exponent = (i - 1) as u32;
let multiplier = size_ratio.saturating_pow(exponent);
let threshold = base_size.saturating_mul(multiplier);
levels.push(Level::new(i, threshold));
}
Self {
levels,
strategy,
base_size,
data_dir: data_dir.as_ref().to_path_buf(),
last_workload: (0, 0),
}
}
pub fn add_l0_sstable(&mut self, path: PathBuf, size: u64) {
self.levels[0].add_sstable(path, size);
}
#[must_use]
pub fn needs_compaction(&self) -> Option<usize> {
if self.levels[0].num_sstables() >= 4 {
return Some(0);
}
for (i, level) in self.levels.iter().enumerate().skip(1) {
if level.needs_compaction() {
return Some(i);
}
}
None
}
#[must_use]
pub fn level(&self, level_num: usize) -> Option<&Level> {
self.levels.get(level_num)
}
#[must_use]
pub const fn num_levels(&self) -> usize {
self.levels.len()
}
#[must_use]
pub fn data_dir(&self) -> &Path {
&self.data_dir
}
pub fn adjust_for_workload(&mut self, writes: u64, reads: u64) -> bool {
let (last_writes, last_reads) = self.last_workload;
let delta_writes = writes.saturating_sub(last_writes);
let delta_reads = reads.saturating_sub(last_reads);
if delta_writes + delta_reads < 1000 {
return false;
}
let old_ratio = self.strategy.current_ratio();
self.strategy.adjust_for_workload(writes, reads);
let new_ratio = self.strategy.current_ratio();
self.last_workload = (writes, reads);
let changed = new_ratio != old_ratio;
if changed {
self.update_level_thresholds(new_ratio);
}
changed
}
fn update_level_thresholds(&mut self, size_ratio: u64) {
for i in 1..self.levels.len() {
let exponent = (i - 1) as u32;
let multiplier = size_ratio.saturating_pow(exponent);
let threshold = self.base_size.saturating_mul(multiplier);
self.levels[i].size_threshold = threshold;
}
}
#[must_use]
pub const fn strategy(&self) -> &CompactionStrategy {
&self.strategy
}
pub fn add_to_level(&mut self, level_num: usize, path: PathBuf, size: u64) {
if let Some(level) = self.levels.get_mut(level_num) {
level.add_sstable(path, size);
}
}
pub fn remove_sstables_from_level(&mut self, level_num: usize, paths: &[PathBuf]) {
if let Some(level) = self.levels.get_mut(level_num) {
for path in paths {
if let Some(pos) = level.sstables.iter().position(|p| p == path) {
let removed_path = level.sstables.remove(pos);
if let Ok(metadata) = std::fs::metadata(&removed_path) {
let size = metadata.len();
level.size = level.size.saturating_sub(size);
}
}
}
}
}
pub fn clear_level(&mut self, level_num: usize) -> Vec<PathBuf> {
if let Some(level) = self.levels.get_mut(level_num) {
let paths = std::mem::take(&mut level.sstables);
level.size = 0;
paths
} else {
Vec::new()
}
}
#[must_use]
pub fn all_sstable_paths(&self) -> Vec<PathBuf> {
let mut paths = Vec::new();
for level in &self.levels {
paths.extend(level.sstables.iter().cloned());
}
paths
}
pub fn load_existing_sstables(&mut self) -> std::io::Result<()> {
use crate::sstable::SSTable;
let entries = std::fs::read_dir(&self.data_dir)?;
let mut sstable_paths: Vec<(PathBuf, usize)> = Vec::new();
for entry in entries {
let entry = entry?;
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) == Some("sst") {
let level = path
.file_stem()
.and_then(|s| s.to_str())
.and_then(|name| {
name.strip_prefix('L')
.and_then(|rest| rest.split('_').next()?.parse::<usize>().ok())
})
.unwrap_or(0);
sstable_paths.push((path, level));
}
}
sstable_paths.sort_by(|a, b| a.0.cmp(&b.0));
for (path, level) in sstable_paths {
let metadata = std::fs::metadata(&path)?;
let size = metadata.len();
let mut sstable = SSTable::open(&path).map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Corrupt SSTable: {e}"),
)
})?;
sstable.validate().map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Corrupt SSTable: {e}"),
)
})?;
if level < self.levels.len() {
self.levels[level].add_sstable(path, size);
} else {
self.levels[0].add_sstable(path, size);
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use tempfile::tempdir;
#[test]
fn test_level_creation() {
let level = Level::new(1, 10_000_000);
assert_eq!(level.level_num(), 1);
assert_eq!(level.size(), 0);
assert!(!level.needs_compaction());
}
#[test]
fn test_level_compaction_trigger() {
let mut level = Level::new(1, 1000);
assert!(!level.needs_compaction());
level.add_sstable(PathBuf::from("test.sst"), 500);
assert!(!level.needs_compaction());
level.add_sstable(PathBuf::from("test2.sst"), 600);
assert!(level.needs_compaction());
}
#[test]
fn test_lsm_tree_creation() {
let dir = tempdir().unwrap();
let lsm = LSMTree::new(dir.path(), 10_000_000, 10, 7);
assert_eq!(lsm.num_levels(), 7);
assert_eq!(lsm.level(0).unwrap().level_num(), 0);
assert_eq!(lsm.level(1).unwrap().size_threshold, 10_000_000);
assert_eq!(lsm.level(2).unwrap().size_threshold, 100_000_000);
}
#[test]
fn test_l0_compaction_trigger() {
let dir = tempdir().unwrap();
let mut lsm = LSMTree::new(dir.path(), 10_000_000, 10, 7);
assert!(lsm.needs_compaction().is_none());
for i in 0..4 {
lsm.add_l0_sstable(PathBuf::from(format!("test{}.sst", i)), 1000);
}
assert_eq!(lsm.needs_compaction(), Some(0));
}
#[test]
fn test_level_size_compaction_trigger() {
let dir = tempdir().unwrap();
let mut lsm = LSMTree::new(dir.path(), 1000, 10, 7);
lsm.levels[1].add_sstable(PathBuf::from("test.sst"), 1200);
assert_eq!(lsm.needs_compaction(), Some(1));
}
#[test]
fn test_compact_sstables() {
use crate::sstable::SSTableBuilder;
let dir = tempdir().unwrap();
let path1 = dir.path().join("input1.sst");
let mut builder1 = SSTableBuilder::create(&path1).unwrap();
builder1
.add(Bytes::from("key1"), Bytes::from("value1"))
.unwrap();
builder1
.add(Bytes::from("key3"), Bytes::from("value3"))
.unwrap();
builder1.finish().unwrap();
let path2 = dir.path().join("input2.sst");
let mut builder2 = SSTableBuilder::create(&path2).unwrap();
builder2
.add(Bytes::from("key2"), Bytes::from("value2"))
.unwrap();
builder2
.add(Bytes::from("key4"), Bytes::from("value4"))
.unwrap();
builder2.finish().unwrap();
let output_path = dir.path().join("output.sst");
let (result_path, size) =
compact_sstables(&[path1, path2], &output_path, 0, None, u64::MAX).unwrap();
assert_eq!(result_path, output_path);
assert!(size > 0);
let mut output_sst = SSTable::open(&output_path).unwrap();
assert_eq!(output_sst.len(), 4);
assert_eq!(
output_sst.get(b"key1").unwrap(),
Some(Bytes::from("value1"))
);
assert_eq!(
output_sst.get(b"key2").unwrap(),
Some(Bytes::from("value2"))
);
assert_eq!(
output_sst.get(b"key3").unwrap(),
Some(Bytes::from("value3"))
);
assert_eq!(
output_sst.get(b"key4").unwrap(),
Some(Bytes::from("value4"))
);
}
#[test]
fn test_compact_with_duplicates() {
use crate::sstable::SSTableBuilder;
let dir = tempdir().unwrap();
let path1 = dir.path().join("input1.sst");
let mut builder1 = SSTableBuilder::create(&path1).unwrap();
builder1
.add(Bytes::from("key1"), Bytes::from("old_value"))
.unwrap();
builder1
.add(Bytes::from("key2"), Bytes::from("value2"))
.unwrap();
builder1.finish().unwrap();
let path2 = dir.path().join("input2.sst");
let mut builder2 = SSTableBuilder::create(&path2).unwrap();
builder2
.add(Bytes::from("key1"), Bytes::from("new_value"))
.unwrap();
builder2.finish().unwrap();
let output_path = dir.path().join("output.sst");
compact_sstables(&[path1, path2], &output_path, 0, None, u64::MAX).unwrap();
let mut output_sst = SSTable::open(&output_path).unwrap();
assert_eq!(output_sst.len(), 2);
assert_eq!(
output_sst.get(b"key1").unwrap(),
Some(Bytes::from("new_value"))
);
assert_eq!(
output_sst.get(b"key2").unwrap(),
Some(Bytes::from("value2"))
);
}
#[test]
fn test_adaptive_compaction_strategy() {
let mut strategy = CompactionStrategy::Adaptive {
current_ratio: 8,
min_ratio: 4,
max_ratio: 20,
};
strategy.adjust_for_workload(100_000, 1000);
let ratio = strategy.current_ratio();
assert!(
ratio > 8,
"Write-heavy should increase ratio, got {}",
ratio
);
assert!(ratio <= 20, "Should respect max ratio");
assert_eq!(ratio, 12, "Expected ratio ~12 for 100:1 w:r");
strategy.adjust_for_workload(101_000, 10_100_000);
let ratio = strategy.current_ratio();
assert!(ratio < 8, "Read-heavy should decrease ratio, got {}", ratio);
assert_eq!(ratio, 4, "Should clamp to min_ratio for read-heavy");
}
#[test]
fn test_lsm_adaptive_workload_adjustment() {
let dir = tempdir().unwrap();
let mut lsm = LSMTree::new_adaptive(dir.path(), 1000, 7, 4, 20);
assert_eq!(lsm.strategy().current_ratio(), 12);
let adjusted = lsm.adjust_for_workload(200_000, 1000);
assert!(adjusted, "Should adjust on significant workload change");
let new_ratio = lsm.strategy().current_ratio();
assert!(new_ratio > 12, "Write-heavy should increase ratio");
assert_eq!(new_ratio, 17, "Expected ratio ~17 for 200:1 w:r");
let l1_threshold = lsm.level(1).unwrap().size_threshold;
assert_eq!(l1_threshold, 1000);
let l2_threshold = lsm.level(2).unwrap().size_threshold;
assert_eq!(l2_threshold, 1000 * new_ratio); }
#[test]
fn test_fixed_strategy_doesnt_adapt() {
let strategy = CompactionStrategy::Fixed(10);
assert_eq!(strategy.current_ratio(), 10);
let mut strategy_mut = strategy;
strategy_mut.adjust_for_workload(10000, 1000);
assert_eq!(strategy_mut.current_ratio(), 10);
}
}