#[cfg(test)]
mod tests;
pub mod major;
pub mod minor;
pub mod tombstone;
use std::sync::Arc;
use crate::engine::EngineConfig;
use crate::sstable::SSTable;
use crate::compaction::{CompactionError, CompactionResult, CompactionStrategy};
use crate::manifest::Manifest;
pub fn bucket_sstables(sstables: &[Arc<SSTable>], config: &EngineConfig) -> Vec<Vec<usize>> {
if sstables.is_empty() {
return Vec::new();
}
let mut indices: Vec<usize> = (0..sstables.len()).collect();
indices.sort_by_key(|&i| sstables[i].file_size());
let mut small_bucket: Vec<usize> = Vec::new();
let mut regular: Vec<usize> = Vec::new();
for &idx in &indices {
if sstables[idx].file_size() < config.min_sstable_size as u64 {
small_bucket.push(idx);
} else {
regular.push(idx);
}
}
let mut buckets: Vec<Vec<usize>> = Vec::new();
if !small_bucket.is_empty() {
buckets.push(small_bucket);
}
let mut current_bucket: Vec<usize> = Vec::new();
let mut current_avg: f64 = 0.0;
for &idx in ®ular {
let size = sstables[idx].file_size() as f64;
if current_bucket.is_empty() {
current_bucket.push(idx);
current_avg = size;
} else {
let low = current_avg * config.bucket_low;
let high = current_avg * config.bucket_high;
if size >= low && size <= high {
current_bucket.push(idx);
let total: f64 = current_bucket
.iter()
.map(|&i| sstables[i].file_size() as f64)
.sum();
current_avg = total / current_bucket.len() as f64;
} else {
buckets.push(std::mem::take(&mut current_bucket));
current_bucket.push(idx);
current_avg = size;
}
}
}
if !current_bucket.is_empty() {
buckets.push(current_bucket);
}
buckets
}
pub fn select_compaction_bucket(
buckets: &[Vec<usize>],
config: &EngineConfig,
) -> Option<Vec<usize>> {
let mut best_bucket: Option<&Vec<usize>> = None;
let mut best_count = 0usize;
for bucket in buckets {
if bucket.len() >= config.min_threshold && bucket.len() > best_count {
best_bucket = Some(bucket);
best_count = bucket.len();
}
}
best_bucket.map(|bucket| bucket.iter().take(config.max_threshold).copied().collect())
}
pub struct MinorCompaction;
impl CompactionStrategy for MinorCompaction {
fn compact(
&self,
sstables: &[Arc<SSTable>],
manifest: &mut Manifest,
data_dir: &str,
config: &EngineConfig,
) -> Result<Option<CompactionResult>, CompactionError> {
minor::maybe_compact(sstables, manifest, data_dir, config)
}
}
pub struct TombstoneCompaction;
impl CompactionStrategy for TombstoneCompaction {
fn compact(
&self,
sstables: &[Arc<SSTable>],
manifest: &mut Manifest,
data_dir: &str,
config: &EngineConfig,
) -> Result<Option<CompactionResult>, CompactionError> {
tombstone::maybe_compact(sstables, manifest, data_dir, config)
}
}
pub struct MajorCompaction;
impl CompactionStrategy for MajorCompaction {
fn compact(
&self,
sstables: &[Arc<SSTable>],
manifest: &mut Manifest,
data_dir: &str,
config: &EngineConfig,
) -> Result<Option<CompactionResult>, CompactionError> {
major::compact(sstables, manifest, data_dir, config)
}
}