use crate::compaction::{CompactionError, CompactionResult, finalize_compaction};
use crate::engine::EngineConfig;
use crate::engine::RangeTombstone;
use crate::manifest::Manifest;
use crate::sstable::{GetResult, PointEntry, SSTable, SSTableError};
use std::sync::Arc;
use tracing::{debug, info, trace};
pub fn maybe_compact(
sstables: &[Arc<SSTable>],
manifest: &mut Manifest,
data_dir: &str,
config: &EngineConfig,
) -> Result<Option<CompactionResult>, CompactionError> {
let target_idx = match select_candidate(sstables, config) {
Some(idx) => idx,
None => {
debug!(
sstable_count = sstables.len(),
"tombstone compaction: no candidate met threshold"
);
return Ok(None);
}
};
let target = &sstables[target_idx];
let tombstone_total =
target.properties.tombstone_count + target.properties.range_tombstones_count;
info!(
target_id = target.id(),
tombstone_count = tombstone_total,
record_count = target.properties.record_count,
"tombstone compaction: starting rewrite"
);
let result = execute(sstables, target_idx, manifest, data_dir, config)?;
if result.removed_ids.is_empty() {
debug!(
target_id = target.id(),
"tombstone compaction: candidate selected but no tombstones could be dropped"
);
return Ok(None);
}
info!(
new_sst_id = ?result.new_sst_id,
removed_count = result.removed_ids.len(),
"tombstone compaction: complete"
);
Ok(Some(result))
}
fn select_candidate(sstables: &[Arc<SSTable>], config: &EngineConfig) -> Option<usize> {
let now_secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let mut best: Option<(usize, f64)> = None;
for (i, sst) in sstables.iter().enumerate() {
let props = &sst.properties;
let creation_secs = props.creation_timestamp / 1_000_000_000; let age_secs = now_secs.saturating_sub(creation_secs);
if age_secs < config.tombstone_compaction_interval as u64 {
continue;
}
let tombstone_total = props.tombstone_count + props.range_tombstones_count;
if tombstone_total == 0 {
continue;
}
let ratio = tombstone_total as f64 / props.record_count.max(1) as f64;
if ratio < config.tombstone_ratio_threshold {
continue;
}
match &best {
Some((_, best_ratio)) if ratio <= *best_ratio => {}
_ => {
best = Some((i, ratio));
}
}
}
best.map(|(idx, _)| idx)
}
fn execute(
sstables: &[Arc<SSTable>],
target_idx: usize,
manifest: &mut Manifest,
data_dir: &str,
config: &EngineConfig,
) -> Result<CompactionResult, CompactionError> {
let target = &*sstables[target_idx];
let older_sstables: Vec<&SSTable> = sstables
.iter()
.enumerate()
.filter(|(i, _)| *i != target_idx && sstables[*i].id() < target.id())
.map(|(_, s)| &**s)
.collect();
let min_key = target.properties.min_key.clone();
let mut max_key = target.properties.max_key.clone();
max_key.push(0xFF);
let scan_iter = target.scan(&min_key, &max_key)?;
let mut point_entries: Vec<PointEntry> = Vec::new();
let mut range_tombstones: Vec<RangeTombstone> = Vec::new();
let mut range_candidates: Vec<RangeTombstone> = Vec::new();
let mut last_key: Option<Vec<u8>> = None;
let mut dropped_anything = false;
for record in scan_iter {
match record {
crate::engine::utils::Record::Put {
key,
value,
lsn,
timestamp,
} => {
if last_key.as_ref() == Some(&key) {
dropped_anything = true;
continue;
}
last_key = Some(key.clone());
point_entries.push(PointEntry {
key,
value: Some(value),
lsn,
timestamp,
});
}
crate::engine::utils::Record::Delete {
key,
lsn,
timestamp,
} => {
if last_key.as_ref() == Some(&key) {
dropped_anything = true;
continue;
}
last_key = Some(key.clone());
if can_drop_point_tombstone(&key, &older_sstables, config)? {
trace!(key = ?key, lsn, "dropping point tombstone — no older data found");
dropped_anything = true;
continue;
}
point_entries.push(PointEntry {
key,
value: None,
lsn,
timestamp,
});
}
crate::engine::utils::Record::RangeDelete {
start,
end,
lsn,
timestamp,
} => {
if config.tombstone_range_drop {
range_candidates.push(RangeTombstone {
start,
end,
lsn,
timestamp,
});
} else {
range_tombstones.push(RangeTombstone {
start,
end,
lsn,
timestamp,
});
}
}
}
}
for rt in range_candidates {
let safe_in_older = can_drop_range_tombstone(&rt.start, &rt.end, rt.lsn, &older_sstables)?;
let covers_own_puts = safe_in_older
&& point_entries.iter().any(|pe| {
pe.value.is_some()
&& pe.key.as_slice() >= rt.start.as_slice()
&& pe.key.as_slice() < rt.end.as_slice()
&& pe.lsn < rt.lsn
});
if safe_in_older && !covers_own_puts {
trace!(
start = ?rt.start, end = ?rt.end, lsn = rt.lsn,
"dropping range tombstone — no covered keys in older SSTables or same SSTable"
);
dropped_anything = true;
} else {
range_tombstones.push(rt);
}
}
if !dropped_anything {
return Ok(CompactionResult {
removed_ids: Vec::new(),
new_sst_path: None,
new_sst_id: None,
});
}
let removed_ids = vec![target.id()];
finalize_compaction(
manifest,
data_dir,
removed_ids,
point_entries,
range_tombstones,
)
}
fn can_drop_point_tombstone(
key: &[u8],
others: &[&SSTable],
config: &EngineConfig,
) -> Result<bool, SSTableError> {
for sst in others {
if !sst.bloom_may_contain(key) {
continue;
}
if config.tombstone_bloom_fallback {
let result = sst.get(key)?;
match result {
GetResult::NotFound => {} _ => return Ok(false), }
} else {
return Ok(false);
}
}
Ok(true)
}
fn can_drop_range_tombstone(
start: &[u8],
end: &[u8],
tombstone_lsn: u64,
others: &[&SSTable],
) -> Result<bool, SSTableError> {
for sst in others {
if sst.properties.max_key.as_slice() < start || sst.properties.min_key.as_slice() >= end {
continue; }
let scan_iter = sst.scan(start, end)?;
for record in scan_iter {
match &record {
crate::engine::utils::Record::Put { lsn, .. }
| crate::engine::utils::Record::Delete { lsn, .. } => {
if *lsn < tombstone_lsn {
return Ok(false);
}
}
crate::engine::utils::Record::RangeDelete { .. } => {
}
}
}
}
Ok(true)
}