use crate::error::{AmateRSError, ErrorContext, Result};
use crate::storage::{SSTableConfig, SSTableMetadata, SSTableReader, SSTableWriter};
use crate::types::{CipherBlob, Key};
use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompactionStrategy {
LevelBased,
SizeTiered,
}
#[derive(Debug, Clone)]
pub struct CompactionConfig {
pub strategy: CompactionStrategy,
pub l0_threshold: usize,
pub level_multiplier: usize,
pub base_level_size: u64,
pub max_compaction_bytes: u64,
pub min_sstable_size: u64,
pub size_ratio: f64,
pub min_tier_size: usize,
pub max_compaction_bytes_per_sec: u64,
pub tombstone_ttl: Duration,
}
impl Default for CompactionConfig {
fn default() -> Self {
Self {
strategy: CompactionStrategy::LevelBased,
l0_threshold: 4,
level_multiplier: 10,
base_level_size: 10 * 1024 * 1024, max_compaction_bytes: 100 * 1024 * 1024, min_sstable_size: 1024, size_ratio: 2.0,
min_tier_size: 4,
max_compaction_bytes_per_sec: 0, tombstone_ttl: Duration::from_secs(7 * 24 * 3600), }
}
}
#[derive(Debug, Clone)]
pub struct CompactionTask {
pub source_level: usize,
pub target_level: usize,
pub source_sstables: Vec<SSTableMetadata>,
pub target_sstables: Vec<SSTableMetadata>,
}
pub struct CompactionStats {
pub bytes_read: AtomicU64,
pub bytes_written: AtomicU64,
pub files_merged: AtomicU64,
pub compactions_completed: AtomicU64,
pub total_duration_ms: AtomicU64,
pub keys_processed: AtomicU64,
pub tombstones_removed: AtomicU64,
}
impl Default for CompactionStats {
fn default() -> Self {
Self {
bytes_read: AtomicU64::new(0),
bytes_written: AtomicU64::new(0),
files_merged: AtomicU64::new(0),
compactions_completed: AtomicU64::new(0),
total_duration_ms: AtomicU64::new(0),
keys_processed: AtomicU64::new(0),
tombstones_removed: AtomicU64::new(0),
}
}
}
impl std::fmt::Debug for CompactionStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CompactionStats")
.field("bytes_read", &self.bytes_read.load(Ordering::Relaxed))
.field("bytes_written", &self.bytes_written.load(Ordering::Relaxed))
.field("files_merged", &self.files_merged.load(Ordering::Relaxed))
.field(
"compactions_completed",
&self.compactions_completed.load(Ordering::Relaxed),
)
.field(
"total_duration_ms",
&self.total_duration_ms.load(Ordering::Relaxed),
)
.field(
"keys_processed",
&self.keys_processed.load(Ordering::Relaxed),
)
.field(
"tombstones_removed",
&self.tombstones_removed.load(Ordering::Relaxed),
)
.finish()
}
}
impl CompactionStats {
pub fn snapshot(&self) -> CompactionStatsSnapshot {
CompactionStatsSnapshot {
bytes_read: self.bytes_read.load(Ordering::Relaxed),
bytes_written: self.bytes_written.load(Ordering::Relaxed),
files_merged: self.files_merged.load(Ordering::Relaxed),
compactions_completed: self.compactions_completed.load(Ordering::Relaxed),
total_duration_ms: self.total_duration_ms.load(Ordering::Relaxed),
keys_processed: self.keys_processed.load(Ordering::Relaxed),
tombstones_removed: self.tombstones_removed.load(Ordering::Relaxed),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct CompactionStatsSnapshot {
pub bytes_read: u64,
pub bytes_written: u64,
pub files_merged: u64,
pub compactions_completed: u64,
pub total_duration_ms: u64,
pub keys_processed: u64,
pub tombstones_removed: u64,
}
#[derive(Debug)]
pub struct CompactionThrottler {
max_bytes_per_sec: u64,
bytes_in_window: u64,
window_start: Instant,
}
impl CompactionThrottler {
pub fn new(max_bytes_per_sec: u64) -> Self {
Self {
max_bytes_per_sec,
bytes_in_window: 0,
window_start: Instant::now(),
}
}
pub fn throttle(&mut self, bytes_written: u64) {
if self.max_bytes_per_sec == 0 {
return; }
self.bytes_in_window += bytes_written;
let elapsed = self.window_start.elapsed();
let elapsed_secs = elapsed.as_secs_f64();
let expected_secs = self.bytes_in_window as f64 / self.max_bytes_per_sec as f64;
if expected_secs > elapsed_secs {
let sleep_duration = Duration::from_secs_f64(expected_secs - elapsed_secs);
std::thread::sleep(sleep_duration);
}
if elapsed_secs >= 1.0 {
self.bytes_in_window = 0;
self.window_start = Instant::now();
}
}
pub fn is_enabled(&self) -> bool {
self.max_bytes_per_sec > 0
}
}
#[derive(Debug, Clone)]
pub struct SizeTier {
pub sstables: Vec<SSTableMetadata>,
pub avg_size: u64,
}
pub struct CompactionPlanner {
config: CompactionConfig,
}
impl CompactionPlanner {
pub fn new(config: CompactionConfig) -> Self {
Self { config }
}
pub fn needs_l0_compaction(&self, l0_sstable_count: usize) -> bool {
l0_sstable_count >= self.config.l0_threshold
}
pub fn needs_level_compaction(&self, level: usize, level_size: u64) -> bool {
if level == 0 {
return false; }
let target_size = self.level_target_size(level);
level_size > target_size
}
pub fn level_target_size(&self, level: usize) -> u64 {
if level == 0 {
return 0; }
self.config.base_level_size * (self.config.level_multiplier as u64).pow(level as u32 - 1)
}
pub fn plan_compaction(
&self,
source_level: usize,
source_sstables: Vec<SSTableMetadata>,
target_sstables: Vec<SSTableMetadata>,
) -> Option<CompactionTask> {
if source_sstables.is_empty() {
return None;
}
let source_to_compact = if source_level == 0 {
source_sstables
} else {
self.select_sstables_for_compaction(source_sstables)
};
if source_to_compact.is_empty() {
return None;
}
let target_to_merge = self.find_overlapping_sstables(&source_to_compact, &target_sstables);
Some(CompactionTask {
source_level,
target_level: source_level + 1,
source_sstables: source_to_compact,
target_sstables: target_to_merge,
})
}
pub fn plan_size_tiered_compaction(
&self,
sstables: Vec<SSTableMetadata>,
) -> Option<CompactionTask> {
let tiers = self.group_by_size_tier(sstables);
for tier in tiers {
if tier.sstables.len() >= self.config.min_tier_size {
let max_level = tier.sstables.iter().map(|s| s.level).max().unwrap_or(0);
let target_level = max_level + 1;
return Some(CompactionTask {
source_level: max_level,
target_level,
source_sstables: tier.sstables,
target_sstables: Vec::new(),
});
}
}
None
}
pub fn group_by_size_tier(&self, mut sstables: Vec<SSTableMetadata>) -> Vec<SizeTier> {
if sstables.is_empty() {
return Vec::new();
}
sstables.sort_by_key(|s| s.file_size);
let sstables: Vec<SSTableMetadata> = sstables
.into_iter()
.filter(|s| s.file_size >= self.config.min_sstable_size)
.collect();
if sstables.is_empty() {
return Vec::new();
}
let mut tiers: Vec<SizeTier> = Vec::new();
let mut current_tier_sstables: Vec<SSTableMetadata> = Vec::new();
let mut tier_min_size: u64 = 0;
for sstable in sstables {
if current_tier_sstables.is_empty() {
tier_min_size = sstable.file_size;
current_tier_sstables.push(sstable);
} else if (sstable.file_size as f64) <= (tier_min_size as f64 * self.config.size_ratio)
{
current_tier_sstables.push(sstable);
} else {
let avg_size = current_tier_sstables
.iter()
.map(|s| s.file_size)
.sum::<u64>()
/ current_tier_sstables.len().max(1) as u64;
tiers.push(SizeTier {
sstables: std::mem::take(&mut current_tier_sstables),
avg_size,
});
tier_min_size = sstable.file_size;
current_tier_sstables.push(sstable);
}
}
if !current_tier_sstables.is_empty() {
let avg_size = current_tier_sstables
.iter()
.map(|s| s.file_size)
.sum::<u64>()
/ current_tier_sstables.len().max(1) as u64;
tiers.push(SizeTier {
sstables: current_tier_sstables,
avg_size,
});
}
tiers
}
fn select_sstables_for_compaction(
&self,
sstables: Vec<SSTableMetadata>,
) -> Vec<SSTableMetadata> {
let mut selected = Vec::new();
let mut total_size = 0u64;
for sstable in sstables {
if total_size + sstable.file_size > self.config.max_compaction_bytes {
break;
}
total_size += sstable.file_size;
selected.push(sstable);
if selected.len() >= 2 {
break;
}
}
selected
}
pub fn find_overlapping_sstables(
&self,
source_sstables: &[SSTableMetadata],
target_sstables: &[SSTableMetadata],
) -> Vec<SSTableMetadata> {
if source_sstables.is_empty() {
return Vec::new();
}
let min_key = source_sstables
.iter()
.map(|s| &s.min_key)
.min()
.expect("source_sstables is non-empty");
let max_key = source_sstables
.iter()
.map(|s| &s.max_key)
.max()
.expect("source_sstables is non-empty");
target_sstables
.iter()
.filter(|sstable| {
!(&sstable.max_key < min_key || &sstable.min_key > max_key)
})
.cloned()
.collect()
}
}
#[derive(Debug, Clone)]
pub struct TombstoneEntry {
pub key: Key,
pub created_at: Instant,
}
pub struct CompactionExecutor {
config: SSTableConfig,
compaction_config: CompactionConfig,
stats: Arc<CompactionStats>,
throttler: CompactionThrottler,
tombstones: BTreeMap<Key, Instant>,
}
impl CompactionExecutor {
pub fn new(config: SSTableConfig) -> Self {
Self {
config,
compaction_config: CompactionConfig::default(),
stats: Arc::new(CompactionStats::default()),
throttler: CompactionThrottler::new(0),
tombstones: BTreeMap::new(),
}
}
pub fn with_compaction_config(
config: SSTableConfig,
compaction_config: CompactionConfig,
) -> Self {
let throttler = CompactionThrottler::new(compaction_config.max_compaction_bytes_per_sec);
Self {
config,
compaction_config,
stats: Arc::new(CompactionStats::default()),
throttler,
tombstones: BTreeMap::new(),
}
}
pub fn register_tombstone(&mut self, key: Key, created_at: Instant) {
self.tombstones.insert(key, created_at);
}
fn is_tombstone_expired(&self, key: &Key) -> bool {
if let Some(created_at) = self.tombstones.get(key) {
created_at.elapsed() >= self.compaction_config.tombstone_ttl
} else {
false
}
}
pub fn execute_compaction(
&mut self,
task: CompactionTask,
output_dir: &Path,
next_sstable_id: &mut u64,
) -> Result<Vec<SSTableMetadata>> {
let start_time = Instant::now();
let files_merged = (task.source_sstables.len() + task.target_sstables.len()) as u64;
self.stats
.files_merged
.fetch_add(files_merged, Ordering::Relaxed);
let mut all_entries: BTreeMap<Key, Option<CipherBlob>> = BTreeMap::new();
for sstable in &task.source_sstables {
self.read_sstable_entries(&sstable.path, &mut all_entries)?;
self.stats
.bytes_read
.fetch_add(sstable.file_size, Ordering::Relaxed);
}
for sstable in &task.target_sstables {
self.read_sstable_entries(&sstable.path, &mut all_entries)?;
self.stats
.bytes_read
.fetch_add(sstable.file_size, Ordering::Relaxed);
}
let output_sstables = self.write_compacted_sstables(
all_entries,
task.target_level,
output_dir,
next_sstable_id,
)?;
self.stats
.compactions_completed
.fetch_add(1, Ordering::Relaxed);
let duration_ms = start_time.elapsed().as_millis() as u64;
self.stats
.total_duration_ms
.fetch_add(duration_ms, Ordering::Relaxed);
Ok(output_sstables)
}
fn read_sstable_entries(
&self,
path: &Path,
entries: &mut BTreeMap<Key, Option<CipherBlob>>,
) -> Result<()> {
let reader = SSTableReader::open(path)?;
let sstable_entries = reader.iter()?;
for (key, value) in sstable_entries {
self.stats.keys_processed.fetch_add(1, Ordering::Relaxed);
entries.insert(key, Some(value));
}
Ok(())
}
fn write_compacted_sstables(
&mut self,
entries: BTreeMap<Key, Option<CipherBlob>>,
target_level: usize,
output_dir: &Path,
next_id: &mut u64,
) -> Result<Vec<SSTableMetadata>> {
let mut output_sstables = Vec::new();
let mut current_writer: Option<SSTableWriter> = None;
let mut current_path: Option<PathBuf> = None;
let mut current_size = 0usize;
let mut current_min_key: Option<Key> = None;
let mut current_max_key: Option<Key> = None;
let mut current_entries = 0usize;
const MAX_SSTABLE_SIZE: usize = 2 * 1024 * 1024;
for (key, value_opt) in entries {
let value = match value_opt {
Some(v) => v,
None => {
if self.is_tombstone_expired(&key) {
self.stats
.tombstones_removed
.fetch_add(1, Ordering::Relaxed);
self.tombstones.remove(&key);
continue;
}
self.stats
.tombstones_removed
.fetch_add(1, Ordering::Relaxed);
continue;
}
};
if current_writer.is_none() || current_size >= MAX_SSTABLE_SIZE {
if let Some(writer) = current_writer.take() {
writer.finish()?;
if let (Some(path), Some(min_key), Some(max_key)) = (
current_path.take(),
current_min_key.take(),
current_max_key.take(),
) {
let file_size = std::fs::metadata(&path)
.map_err(|e| {
AmateRSError::StorageIntegrity(ErrorContext::new(format!(
"Failed to get SSTable size: {}",
e
)))
})?
.len();
self.stats
.bytes_written
.fetch_add(file_size, Ordering::Relaxed);
self.throttler.throttle(file_size);
output_sstables.push(SSTableMetadata {
path,
min_key,
max_key,
num_entries: current_entries,
file_size,
level: target_level,
});
}
}
let id = *next_id;
*next_id += 1;
let path = output_dir.join(format!("L{}_{:08}.sst", target_level, id));
let writer = SSTableWriter::new(&path, self.config.clone())?;
current_writer = Some(writer);
current_path = Some(path);
current_size = 0;
current_min_key = None;
current_max_key = None;
current_entries = 0;
}
if let Some(ref mut writer) = current_writer {
let entry_size = 16 + key.as_bytes().len() + value.as_bytes().len();
writer.add(key.clone(), value)?;
current_size += entry_size;
current_entries += 1;
if current_min_key.is_none() {
current_min_key = Some(key.clone());
}
current_max_key = Some(key);
}
}
if let Some(writer) = current_writer {
writer.finish()?;
if let (Some(path), Some(min_key), Some(max_key)) =
(current_path, current_min_key, current_max_key)
{
let file_size = std::fs::metadata(&path)
.map_err(|e| {
AmateRSError::StorageIntegrity(ErrorContext::new(format!(
"Failed to get SSTable size: {}",
e
)))
})?
.len();
self.stats
.bytes_written
.fetch_add(file_size, Ordering::Relaxed);
self.throttler.throttle(file_size);
output_sstables.push(SSTableMetadata {
path,
min_key,
max_key,
num_entries: current_entries,
file_size,
level: target_level,
});
}
}
Ok(output_sstables)
}
pub fn stats(&self) -> &CompactionStats {
&self.stats
}
pub fn stats_snapshot(&self) -> CompactionStatsSnapshot {
self.stats.snapshot()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_compaction_planner_l0_threshold() {
let config = CompactionConfig::default();
let planner = CompactionPlanner::new(config);
assert!(!planner.needs_l0_compaction(3));
assert!(planner.needs_l0_compaction(4));
assert!(planner.needs_l0_compaction(5));
}
#[test]
fn test_compaction_planner_level_sizes() {
let config = CompactionConfig {
base_level_size: 10 * 1024 * 1024, level_multiplier: 10,
..Default::default()
};
let planner = CompactionPlanner::new(config);
assert_eq!(planner.level_target_size(1), 10 * 1024 * 1024); assert_eq!(planner.level_target_size(2), 100 * 1024 * 1024); assert_eq!(planner.level_target_size(3), 1000 * 1024 * 1024); }
#[test]
fn test_compaction_planner_needs_compaction() {
let config = CompactionConfig::default();
let planner = CompactionPlanner::new(config);
assert!(!planner.needs_level_compaction(0, 100 * 1024 * 1024));
assert!(!planner.needs_level_compaction(1, 5 * 1024 * 1024));
assert!(planner.needs_level_compaction(1, 15 * 1024 * 1024));
}
#[test]
fn test_find_overlapping_sstables() {
let config = CompactionConfig::default();
let planner = CompactionPlanner::new(config);
let source = vec![SSTableMetadata {
path: PathBuf::from("s1.sst"),
min_key: Key::from_str("key_005"),
max_key: Key::from_str("key_015"),
num_entries: 10,
file_size: 1000,
level: 0,
}];
let target = vec![
SSTableMetadata {
path: PathBuf::from("t1.sst"),
min_key: Key::from_str("key_000"),
max_key: Key::from_str("key_010"),
num_entries: 10,
file_size: 1000,
level: 1,
},
SSTableMetadata {
path: PathBuf::from("t2.sst"),
min_key: Key::from_str("key_020"),
max_key: Key::from_str("key_030"),
num_entries: 10,
file_size: 1000,
level: 1,
},
];
let overlapping = planner.find_overlapping_sstables(&source, &target);
assert_eq!(overlapping.len(), 1);
assert_eq!(overlapping[0].path, PathBuf::from("t1.sst"));
}
#[test]
fn test_size_tiered_grouping_basic() {
let config = CompactionConfig {
strategy: CompactionStrategy::SizeTiered,
min_sstable_size: 100,
size_ratio: 2.0,
min_tier_size: 4,
..Default::default()
};
let planner = CompactionPlanner::new(config);
let sstables = vec![
make_metadata("a.sst", 1000, 0),
make_metadata("b.sst", 1200, 0),
make_metadata("c.sst", 1500, 0),
make_metadata("d.sst", 1800, 0),
];
let tiers = planner.group_by_size_tier(sstables);
assert_eq!(tiers.len(), 1);
assert_eq!(tiers[0].sstables.len(), 4);
}
#[test]
fn test_size_tiered_grouping_multiple_tiers() {
let config = CompactionConfig {
strategy: CompactionStrategy::SizeTiered,
min_sstable_size: 100,
size_ratio: 2.0,
min_tier_size: 2,
..Default::default()
};
let planner = CompactionPlanner::new(config);
let sstables = vec![
make_metadata("small1.sst", 1000, 0),
make_metadata("small2.sst", 1500, 0),
make_metadata("big1.sst", 10000, 0),
make_metadata("big2.sst", 15000, 0),
];
let tiers = planner.group_by_size_tier(sstables);
assert_eq!(tiers.len(), 2);
assert_eq!(tiers[0].sstables.len(), 2); assert_eq!(tiers[1].sstables.len(), 2); }
#[test]
fn test_size_tiered_merge_trigger() {
let config = CompactionConfig {
strategy: CompactionStrategy::SizeTiered,
min_sstable_size: 100,
size_ratio: 2.0,
min_tier_size: 4,
..Default::default()
};
let planner = CompactionPlanner::new(config);
let sstables = vec![
make_metadata("a.sst", 1000, 0),
make_metadata("b.sst", 1200, 0),
make_metadata("c.sst", 1500, 0),
];
let task = planner.plan_size_tiered_compaction(sstables);
assert!(task.is_none(), "Should not trigger with only 3 SSTables");
let sstables = vec![
make_metadata("a.sst", 1000, 0),
make_metadata("b.sst", 1200, 0),
make_metadata("c.sst", 1500, 0),
make_metadata("d.sst", 1800, 0),
];
let task = planner.plan_size_tiered_compaction(sstables);
assert!(
task.is_some(),
"Should trigger with 4 SSTables in same tier"
);
let task = task.expect("task should be Some");
assert_eq!(task.source_sstables.len(), 4);
assert_eq!(task.target_level, 1);
}
#[test]
fn test_size_tiered_filters_small_sstables() {
let config = CompactionConfig {
strategy: CompactionStrategy::SizeTiered,
min_sstable_size: 500,
size_ratio: 2.0,
min_tier_size: 4,
..Default::default()
};
let planner = CompactionPlanner::new(config);
let sstables = vec![
make_metadata("a.sst", 100, 0),
make_metadata("b.sst", 200, 0),
make_metadata("c.sst", 300, 0),
make_metadata("d.sst", 400, 0),
];
let tiers = planner.group_by_size_tier(sstables);
assert!(tiers.is_empty());
}
#[test]
fn test_compaction_stats_default() {
let stats = CompactionStats::default();
let snapshot = stats.snapshot();
assert_eq!(snapshot.bytes_read, 0);
assert_eq!(snapshot.bytes_written, 0);
assert_eq!(snapshot.files_merged, 0);
assert_eq!(snapshot.compactions_completed, 0);
assert_eq!(snapshot.total_duration_ms, 0);
assert_eq!(snapshot.keys_processed, 0);
assert_eq!(snapshot.tombstones_removed, 0);
}
#[test]
fn test_compaction_stats_atomic_updates() {
let stats = CompactionStats::default();
stats.bytes_read.fetch_add(1000, Ordering::Relaxed);
stats.bytes_written.fetch_add(500, Ordering::Relaxed);
stats.files_merged.fetch_add(3, Ordering::Relaxed);
stats.compactions_completed.fetch_add(1, Ordering::Relaxed);
stats.total_duration_ms.fetch_add(42, Ordering::Relaxed);
stats.keys_processed.fetch_add(100, Ordering::Relaxed);
stats.tombstones_removed.fetch_add(5, Ordering::Relaxed);
let snapshot = stats.snapshot();
assert_eq!(snapshot.bytes_read, 1000);
assert_eq!(snapshot.bytes_written, 500);
assert_eq!(snapshot.files_merged, 3);
assert_eq!(snapshot.compactions_completed, 1);
assert_eq!(snapshot.total_duration_ms, 42);
assert_eq!(snapshot.keys_processed, 100);
assert_eq!(snapshot.tombstones_removed, 5);
}
#[test]
fn test_compaction_stats_thread_safety() {
let stats = Arc::new(CompactionStats::default());
let handles: Vec<_> = (0..10)
.map(|_| {
let stats_clone = Arc::clone(&stats);
std::thread::spawn(move || {
for _ in 0..100 {
stats_clone.bytes_read.fetch_add(1, Ordering::Relaxed);
stats_clone.keys_processed.fetch_add(1, Ordering::Relaxed);
}
})
})
.collect();
for handle in handles {
handle.join().expect("thread should complete");
}
let snapshot = stats.snapshot();
assert_eq!(snapshot.bytes_read, 1000);
assert_eq!(snapshot.keys_processed, 1000);
}
#[test]
fn test_throttler_disabled() {
let mut throttler = CompactionThrottler::new(0);
assert!(!throttler.is_enabled());
let start = Instant::now();
throttler.throttle(1_000_000);
let elapsed = start.elapsed();
assert!(elapsed < Duration::from_millis(50));
}
#[test]
fn test_throttler_enabled() {
let mut throttler = CompactionThrottler::new(10_000); assert!(throttler.is_enabled());
let start = Instant::now();
throttler.throttle(20_000);
let elapsed = start.elapsed();
assert!(
elapsed >= Duration::from_millis(1500),
"Expected >= 1.5s delay, got {:?}",
elapsed
);
}
#[test]
fn test_throttler_small_writes_no_delay() {
let mut throttler = CompactionThrottler::new(1_000_000); assert!(throttler.is_enabled());
let start = Instant::now();
throttler.throttle(100);
let elapsed = start.elapsed();
assert!(elapsed < Duration::from_millis(50));
}
#[test]
fn test_tombstone_registration() {
let config = SSTableConfig::default();
let mut executor = CompactionExecutor::new(config);
let key = Key::from_str("test_key");
let created_at = Instant::now();
executor.register_tombstone(key.clone(), created_at);
assert!(!executor.is_tombstone_expired(&key));
}
#[test]
fn test_tombstone_expiry_with_short_ttl() {
let config = SSTableConfig::default();
let compaction_config = CompactionConfig {
tombstone_ttl: Duration::from_millis(1), ..Default::default()
};
let mut executor = CompactionExecutor::with_compaction_config(config, compaction_config);
let key = Key::from_str("expired_key");
let old_time = Instant::now() - Duration::from_millis(10);
executor.register_tombstone(key.clone(), old_time);
assert!(executor.is_tombstone_expired(&key));
}
#[test]
fn test_tombstone_not_expired() {
let config = SSTableConfig::default();
let compaction_config = CompactionConfig {
tombstone_ttl: Duration::from_secs(3600), ..Default::default()
};
let mut executor = CompactionExecutor::with_compaction_config(config, compaction_config);
let key = Key::from_str("fresh_key");
executor.register_tombstone(key.clone(), Instant::now());
assert!(!executor.is_tombstone_expired(&key));
}
#[test]
fn test_unknown_tombstone_not_expired() {
let config = SSTableConfig::default();
let executor = CompactionExecutor::new(config);
let key = Key::from_str("unknown_key");
assert!(!executor.is_tombstone_expired(&key));
}
#[test]
fn test_plan_compaction_empty_source() {
let config = CompactionConfig::default();
let planner = CompactionPlanner::new(config);
let task = planner.plan_compaction(0, Vec::new(), Vec::new());
assert!(task.is_none());
}
#[test]
fn test_plan_compaction_single_sstable() {
let config = CompactionConfig::default();
let planner = CompactionPlanner::new(config);
let source = vec![SSTableMetadata {
path: PathBuf::from("single.sst"),
min_key: Key::from_str("key_001"),
max_key: Key::from_str("key_010"),
num_entries: 10,
file_size: 1000,
level: 0,
}];
let task = planner.plan_compaction(0, source, Vec::new());
assert!(task.is_some());
let task = task.expect("task should be Some for L0");
assert_eq!(task.source_sstables.len(), 1);
}
#[test]
fn test_size_tiered_empty_input() {
let config = CompactionConfig {
strategy: CompactionStrategy::SizeTiered,
..Default::default()
};
let planner = CompactionPlanner::new(config);
let task = planner.plan_size_tiered_compaction(Vec::new());
assert!(task.is_none());
}
#[test]
fn test_find_overlapping_empty_source() {
let config = CompactionConfig::default();
let planner = CompactionPlanner::new(config);
let target = vec![SSTableMetadata {
path: PathBuf::from("t1.sst"),
min_key: Key::from_str("key_000"),
max_key: Key::from_str("key_010"),
num_entries: 10,
file_size: 1000,
level: 1,
}];
let overlapping = planner.find_overlapping_sstables(&[], &target);
assert!(overlapping.is_empty());
}
#[test]
fn test_find_overlapping_no_overlap() {
let config = CompactionConfig::default();
let planner = CompactionPlanner::new(config);
let source = vec![SSTableMetadata {
path: PathBuf::from("s1.sst"),
min_key: Key::from_str("aaa"),
max_key: Key::from_str("bbb"),
num_entries: 10,
file_size: 1000,
level: 0,
}];
let target = vec![SSTableMetadata {
path: PathBuf::from("t1.sst"),
min_key: Key::from_str("zzz_000"),
max_key: Key::from_str("zzz_999"),
num_entries: 10,
file_size: 1000,
level: 1,
}];
let overlapping = planner.find_overlapping_sstables(&source, &target);
assert!(overlapping.is_empty());
}
#[test]
fn test_compaction_config_defaults() {
let config = CompactionConfig::default();
assert_eq!(config.strategy, CompactionStrategy::LevelBased);
assert_eq!(config.l0_threshold, 4);
assert_eq!(config.level_multiplier, 10);
assert_eq!(config.min_sstable_size, 1024);
assert_eq!(config.size_ratio, 2.0);
assert_eq!(config.min_tier_size, 4);
assert_eq!(config.max_compaction_bytes_per_sec, 0);
assert_eq!(config.tombstone_ttl, Duration::from_secs(7 * 24 * 3600));
}
#[test]
fn test_executor_stats_accessible() {
let executor = CompactionExecutor::new(SSTableConfig::default());
let snapshot = executor.stats_snapshot();
assert_eq!(snapshot.compactions_completed, 0);
assert_eq!(snapshot.bytes_read, 0);
}
#[test]
fn test_size_tiered_preserves_level_info() {
let config = CompactionConfig {
strategy: CompactionStrategy::SizeTiered,
min_sstable_size: 100,
size_ratio: 2.0,
min_tier_size: 2,
..Default::default()
};
let planner = CompactionPlanner::new(config);
let sstables = vec![
make_metadata("a.sst", 1000, 2),
make_metadata("b.sst", 1500, 2),
];
let task = planner.plan_size_tiered_compaction(sstables);
assert!(task.is_some());
let task = task.expect("task should be Some");
assert_eq!(task.source_level, 2);
assert_eq!(task.target_level, 3);
}
#[test]
fn test_level_target_size_l0() {
let config = CompactionConfig::default();
let planner = CompactionPlanner::new(config);
assert_eq!(planner.level_target_size(0), 0);
}
#[test]
fn test_executor_compaction_with_stats() {
let temp_dir =
std::env::temp_dir().join(format!("amaters_compaction_test_{}", std::process::id()));
std::fs::create_dir_all(&temp_dir).expect("should create temp dir");
let sstable_config = SSTableConfig::default();
let path1 = temp_dir.join("L0_00000001.sst");
let path2 = temp_dir.join("L0_00000002.sst");
create_test_sstable(
&path1,
&sstable_config,
&[("key_01", "val_01"), ("key_02", "val_02")],
);
create_test_sstable(
&path2,
&sstable_config,
&[("key_03", "val_03"), ("key_04", "val_04")],
);
let meta1 = make_file_metadata(&path1, 0);
let meta2 = make_file_metadata(&path2, 0);
let task = CompactionTask {
source_level: 0,
target_level: 1,
source_sstables: vec![meta1, meta2],
target_sstables: Vec::new(),
};
let mut executor = CompactionExecutor::new(sstable_config);
let mut next_id = 100u64;
let result = executor.execute_compaction(task, &temp_dir, &mut next_id);
assert!(result.is_ok(), "compaction should succeed");
let output = result.expect("should have output");
assert!(!output.is_empty(), "should produce output SSTables");
let snapshot = executor.stats_snapshot();
assert!(snapshot.bytes_read > 0, "should have read bytes");
assert!(snapshot.bytes_written > 0, "should have written bytes");
assert_eq!(snapshot.compactions_completed, 1);
assert_eq!(snapshot.files_merged, 2);
assert!(
snapshot.keys_processed >= 4,
"should have processed at least 4 keys"
);
assert!(
snapshot.total_duration_ms < 10_000,
"should complete quickly"
);
let _ = std::fs::remove_dir_all(&temp_dir);
}
#[test]
fn test_executor_compaction_with_throttling() {
let temp_dir =
std::env::temp_dir().join(format!("amaters_throttle_test_{}", std::process::id()));
std::fs::create_dir_all(&temp_dir).expect("should create temp dir");
let sstable_config = SSTableConfig::default();
let path1 = temp_dir.join("L0_00000001.sst");
create_test_sstable(
&path1,
&sstable_config,
&[("key_01", "val_01"), ("key_02", "val_02")],
);
let meta1 = make_file_metadata(&path1, 0);
let compaction_config = CompactionConfig {
max_compaction_bytes_per_sec: 0, ..Default::default()
};
let task = CompactionTask {
source_level: 0,
target_level: 1,
source_sstables: vec![meta1],
target_sstables: Vec::new(),
};
let mut executor =
CompactionExecutor::with_compaction_config(sstable_config, compaction_config);
let mut next_id = 200u64;
let start = Instant::now();
let result = executor.execute_compaction(task, &temp_dir, &mut next_id);
let elapsed = start.elapsed();
assert!(result.is_ok());
assert!(elapsed < Duration::from_secs(5));
let _ = std::fs::remove_dir_all(&temp_dir);
}
fn make_metadata(name: &str, file_size: u64, level: usize) -> SSTableMetadata {
SSTableMetadata {
path: PathBuf::from(name),
min_key: Key::from_str(&format!("{}_min", name)),
max_key: Key::from_str(&format!("{}_max", name)),
num_entries: 10,
file_size,
level,
}
}
fn create_test_sstable(path: &Path, config: &SSTableConfig, entries: &[(&str, &str)]) {
let mut writer =
SSTableWriter::new(path, config.clone()).expect("should create SSTable writer");
for (k, v) in entries {
let key = Key::from_str(k);
let value = CipherBlob::new(v.as_bytes().to_vec());
writer.add(key, value).expect("should add entry");
}
writer.finish().expect("should finish writing");
}
fn make_file_metadata(path: &Path, level: usize) -> SSTableMetadata {
let file_size = std::fs::metadata(path)
.expect("SSTable file should exist")
.len();
let reader = SSTableReader::open(path).expect("should open SSTable");
let entries = reader.iter().expect("should read entries");
let min_key = entries
.first()
.map(|(k, _)| k.clone())
.unwrap_or_else(|| Key::from_str(""));
let max_key = entries
.last()
.map(|(k, _)| k.clone())
.unwrap_or_else(|| Key::from_str(""));
SSTableMetadata {
path: path.to_path_buf(),
min_key,
max_key,
num_entries: entries.len(),
file_size,
level,
}
}
}