use crate::compaction::{
CompactionError, CompactionResult, MergeIterator, finalize_compaction, full_range_scan_iters,
};
use crate::engine::EngineConfig;
use crate::engine::RangeTombstone;
use crate::engine::utils::Record;
use crate::manifest::Manifest;
use crate::sstable::{PointEntry, SSTable};
use std::sync::Arc;
use tracing::{debug, info, trace};
pub fn compact(
sstables: &[Arc<SSTable>],
manifest: &mut Manifest,
data_dir: &str,
_config: &EngineConfig,
) -> Result<Option<CompactionResult>, CompactionError> {
if sstables.len() < 2 {
debug!(
sstable_count = sstables.len(),
"major compaction: fewer than 2 SSTables, skipping"
);
return Ok(None);
}
let ids: Vec<u64> = sstables.iter().map(|s| s.id()).collect();
info!(
sstable_count = sstables.len(),
?ids,
"major compaction: starting full merge"
);
let result = execute(sstables, manifest, data_dir)?;
info!(
new_sst_id = ?result.new_sst_id,
removed_count = result.removed_ids.len(),
"major compaction: complete"
);
Ok(Some(result))
}
fn execute(
sstables: &[Arc<SSTable>],
manifest: &mut Manifest,
data_dir: &str,
) -> Result<CompactionResult, CompactionError> {
let sst_refs: Vec<&SSTable> = sstables.iter().map(|s| &**s).collect();
let removed_ids: Vec<u64> = sstables.iter().map(|s| s.id()).collect();
let mut all_range_tombstones: Vec<RangeTombstone> = Vec::new();
for sst in sstables {
all_range_tombstones.extend(sst.range_tombstone_iter());
}
let iters = full_range_scan_iters(&sst_refs)?;
let merge_iter = MergeIterator::new(iters);
let mut point_entries: Vec<PointEntry> = Vec::new();
let mut last_key: Option<Vec<u8>> = None;
for record in merge_iter {
match record {
Record::RangeDelete { .. } => {
}
Record::Delete { key, lsn, .. } => {
if last_key.as_ref() == Some(&key) {
continue;
}
last_key = Some(key.clone());
trace!(key = ?key, lsn, "major: dropping point tombstone");
}
Record::Put {
key,
value,
lsn,
timestamp,
} => {
if last_key.as_ref() == Some(&key) {
continue;
}
last_key = Some(key.clone());
if is_suppressed_by_range(&key, lsn, &all_range_tombstones) {
trace!(key = ?key, lsn, "major: Put suppressed by range tombstone");
continue;
}
point_entries.push(PointEntry {
key,
value: Some(value),
lsn,
timestamp,
});
}
}
}
finalize_compaction(manifest, data_dir, removed_ids, point_entries, Vec::new())
}
fn is_suppressed_by_range(key: &[u8], put_lsn: u64, range_tombstones: &[RangeTombstone]) -> bool {
for rt in range_tombstones {
if key >= rt.start.as_slice() && key < rt.end.as_slice() && rt.lsn > put_lsn {
return true;
}
}
false
}