use std::collections::HashSet;
use crate::{
compaction::{
executor::{CompactionError, CompactionOutcome},
planner::{CompactionInput, CompactionPlanner, CompactionSnapshot, CompactionTask},
},
id::FileId,
manifest::{GcPlanState, GcSstRef, ManifestError, VersionState, WalSegmentRef},
ondisk::sstable::{SsTableDescriptor, SsTableId, storage_path_from_manifest},
};
pub(crate) fn plan_from_version<P>(planner: &P, version: &VersionState) -> Option<CompactionTask>
where
P: CompactionPlanner,
{
let layout: CompactionSnapshot = version.into();
if layout.is_empty() {
return None;
}
planner.plan(&layout)
}
pub(crate) fn plan_from_version_with_min_level<P>(
planner: &P,
version: &VersionState,
min_level: usize,
) -> Option<CompactionTask>
where
P: CompactionPlanner,
{
let layout: CompactionSnapshot = version.into();
if layout.is_empty() {
return None;
}
planner.plan_with_min_level(&layout, min_level)
}
pub(crate) fn resolve_inputs(
sst_root: &fusio::path::Path,
version: &VersionState,
task: &CompactionTask,
) -> Result<Vec<SsTableDescriptor>, CompactionError> {
let mut descriptors = Vec::with_capacity(task.input.len());
for CompactionInput { level, sst_id } in &task.input {
let Some(bucket) = version.ssts().get(*level) else {
return Err(CompactionError::Manifest(ManifestError::Invariant(
"planner selected level missing in manifest",
)));
};
let Some(entry) = bucket.iter().find(|entry| entry.sst_id() == sst_id) else {
return Err(CompactionError::Manifest(ManifestError::Invariant(
"planner selected SST missing in manifest",
)));
};
let mut descriptor = SsTableDescriptor::new(entry.sst_id().clone(), *level);
if let Some(stats) = entry.stats().cloned() {
descriptor = descriptor.with_stats(stats);
}
descriptor = descriptor.with_wal_ids(entry.wal_segments().map(|ids| ids.to_vec()));
descriptor = descriptor.with_storage_paths(
storage_path_from_manifest(sst_root, entry.data_path()),
entry
.delete_path()
.map(|path| storage_path_from_manifest(sst_root, path)),
);
descriptors.push(descriptor);
}
Ok(descriptors)
}
pub(crate) fn wal_ids_for_remaining_ssts(
version: &VersionState,
removed: &HashSet<SsTableId>,
added: &[SsTableDescriptor],
) -> Option<HashSet<FileId>> {
let mut wal_ids = HashSet::new();
for bucket in version.ssts() {
for entry in bucket {
if removed.contains(entry.sst_id()) {
continue;
}
let Some(ids) = entry.wal_segments() else {
return None;
};
wal_ids.extend(ids.iter().cloned());
}
}
for desc in added {
let Some(ids) = desc.wal_ids() else {
return None;
};
wal_ids.extend(ids.iter().cloned());
}
Some(wal_ids)
}
pub(crate) fn wal_segments_after_compaction(
version: &VersionState,
removed_ssts: &[SsTableDescriptor],
added_ssts: &[SsTableDescriptor],
) -> Option<Vec<WalSegmentRef>> {
let removed_ids: HashSet<SsTableId> = removed_ssts.iter().map(|d| d.id().clone()).collect();
let wal_ids = wal_ids_for_remaining_ssts(version, &removed_ids, added_ssts)?;
let existing = version.wal_segments();
if existing.is_empty() {
return None;
}
let mut filtered: Vec<WalSegmentRef> = existing
.iter()
.filter(|seg| wal_ids.contains(seg.file_id()))
.cloned()
.collect();
if filtered == existing {
return None;
}
if filtered.is_empty() {
return Some(Vec::new());
}
let has_seq_gaps = existing
.windows(2)
.any(|pair| pair[1].seq() > pair[0].seq().saturating_add(1));
if has_seq_gaps {
if let Some(first_retained) = filtered.first()
&& let Some(first_idx) = existing
.iter()
.position(|seg| seg.file_id() == first_retained.file_id())
&& first_idx > 0
{
let mut with_gap = existing[..first_idx].to_vec();
with_gap.append(&mut filtered);
Some(with_gap)
} else {
Some(existing.to_vec())
}
} else {
Some(filtered)
}
}
pub(crate) fn reconcile_wal_segments(
version: &VersionState,
outcome: &mut CompactionOutcome,
existing_wal_segments: &[WalSegmentRef],
wal_floor: Option<WalSegmentRef>,
) {
let wal_from_helper =
wal_segments_after_compaction(version, &outcome.remove_ssts, &outcome.outputs);
let (final_wal_segments, obsolete_wal_segments) = match wal_from_helper {
Some(filtered) => {
outcome.wal_segments = Some(filtered.clone());
let obsolete = existing_wal_segments
.iter()
.filter(|seg| !filtered.iter().any(|s| s == *seg))
.cloned()
.collect();
(filtered, obsolete)
}
None => {
outcome.wal_segments = Some(existing_wal_segments.to_vec());
(existing_wal_segments.to_vec(), Vec::new())
}
};
outcome.wal_floor = if final_wal_segments.is_empty() {
wal_floor
} else {
wal_floor.or_else(|| final_wal_segments.first().cloned())
};
if outcome.wal_segments.is_none() && !final_wal_segments.is_empty() {
outcome.wal_segments = Some(final_wal_segments.clone());
}
outcome.obsolete_wal_segments = obsolete_wal_segments;
}
pub(crate) fn gc_plan_from_outcome(
outcome: &CompactionOutcome,
) -> Result<Option<GcPlanState>, CompactionError> {
if outcome.remove_ssts.is_empty() && outcome.obsolete_wal_segments.is_empty() {
return Ok(None);
}
let mut obsolete_ssts = Vec::new();
for desc in &outcome.remove_ssts {
let data_path = desc
.data_path()
.cloned()
.ok_or(CompactionError::MissingPath("data"))?;
obsolete_ssts.push(GcSstRef {
id: desc.id().clone(),
level: desc.level() as u32,
data_path,
delete_path: desc.delete_path().cloned(),
});
}
Ok(Some(GcPlanState {
obsolete_ssts,
obsolete_wal_segments: outcome.obsolete_wal_segments.clone(),
}))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{compaction::planner::CompactionSnapshot, id::FileIdGenerator, manifest::TableId};
struct AlwaysCompactPlanner;
impl CompactionPlanner for AlwaysCompactPlanner {
fn plan(&self, snapshot: &CompactionSnapshot) -> Option<CompactionTask> {
let level0 = snapshot.level(0)?;
if level0.is_empty() {
return None;
}
let input = level0
.files()
.iter()
.map(|f| CompactionInput {
level: 0,
sst_id: f.sst_id.clone(),
})
.collect();
Some(CompactionTask {
source_level: 0,
target_level: 1,
input,
key_range: None,
})
}
}
#[test]
fn plan_from_version_empty_returns_none() {
let generator = FileIdGenerator::default();
let table_id = TableId::new(&generator);
let version = VersionState::empty(table_id);
let planner = AlwaysCompactPlanner;
assert!(plan_from_version(&planner, &version).is_none());
}
#[test]
fn gc_plan_from_empty_outcome_returns_none() {
let outcome = CompactionOutcome {
add_ssts: Vec::new(),
remove_ssts: Vec::new(),
target_level: 0,
wal_segments: None,
tombstone_watermark: None,
outputs: Vec::new(),
obsolete_sst_ids: Vec::new(),
wal_floor: None,
obsolete_wal_segments: Vec::new(),
};
assert!(
gc_plan_from_outcome(&outcome)
.expect("gc plan should build for empty outcome")
.is_none()
);
}
}