use super::{bucket_sstables, select_compaction_bucket};
use crate::compaction::{
CompactionError, CompactionResult, MergeIterator, dedup_records, finalize_compaction,
full_range_scan_iters,
};
use crate::engine::EngineConfig;
use crate::manifest::Manifest;
use crate::sstable::SSTable;
use std::sync::Arc;
use tracing::{debug, info};
pub fn maybe_compact(
sstables: &[Arc<SSTable>],
manifest: &mut Manifest,
data_dir: &str,
config: &EngineConfig,
) -> Result<Option<CompactionResult>, CompactionError> {
let buckets = bucket_sstables(sstables, config);
let selected = match select_compaction_bucket(&buckets, config) {
Some(s) => s,
None => {
debug!(
sstable_count = sstables.len(),
"minor compaction: no bucket met threshold"
);
return Ok(None);
}
};
let selected_ids: Vec<u64> = selected.iter().map(|&i| sstables[i].id()).collect();
info!(
selected_count = selected.len(),
?selected_ids,
"minor compaction: starting merge"
);
let result = execute(sstables, &selected, manifest, data_dir)?;
info!(
new_sst_id = ?result.new_sst_id,
removed_count = result.removed_ids.len(),
"minor compaction: complete"
);
Ok(Some(result))
}
fn execute(
sstables: &[Arc<SSTable>],
selected_indices: &[usize],
manifest: &mut Manifest,
data_dir: &str,
) -> Result<CompactionResult, CompactionError> {
let selected_ssts: Vec<&SSTable> = selected_indices.iter().map(|&i| &*sstables[i]).collect();
let removed_ids: Vec<u64> = selected_ssts.iter().map(|s| s.id()).collect();
let iters = full_range_scan_iters(&selected_ssts)?;
let merge_iter = MergeIterator::new(iters);
let (point_entries, range_tombstones) = dedup_records(merge_iter);
finalize_compaction(
manifest,
data_dir,
removed_ids,
point_entries,
range_tombstones,
)
}