use std::path::{Path, PathBuf};
use tracing::info;
#[derive(Debug, Clone)]
pub struct CompactionConfig {
pub min_segments_to_compact: usize,
pub max_segments_per_pass: usize,
pub target_segment_bytes: usize,
pub tombstone_ratio_threshold: f64,
}
impl Default for CompactionConfig {
fn default() -> Self {
Self {
min_segments_to_compact: 4,
max_segments_per_pass: 8,
target_segment_bytes: 256 * 1024 * 1024, tombstone_ratio_threshold: 0.3,
}
}
}
#[derive(Debug, Clone)]
pub struct SegmentMeta {
pub path: PathBuf,
pub size_bytes: u64,
pub min_lsn: u64,
pub max_lsn: u64,
pub live_entries: u64,
pub tombstone_entries: u64,
pub created_at: u64,
}
impl SegmentMeta {
pub fn tombstone_ratio(&self) -> f64 {
let total = self.live_entries + self.tombstone_entries;
if total == 0 {
0.0
} else {
self.tombstone_entries as f64 / total as f64
}
}
pub fn needs_compaction(&self, threshold: f64) -> bool {
self.tombstone_ratio() > threshold
}
}
#[derive(Debug)]
pub struct CompactionResult {
pub input_segments: Vec<PathBuf>,
pub output_segment: PathBuf,
pub tombstones_removed: u64,
pub bytes_reclaimed: u64,
pub min_lsn: u64,
pub max_lsn: u64,
}
pub fn select_segments_for_compaction(
segments: &[SegmentMeta],
config: &CompactionConfig,
) -> Vec<usize> {
if segments.len() < config.min_segments_to_compact {
return Vec::new();
}
let mut candidates: Vec<(usize, &SegmentMeta)> = segments
.iter()
.enumerate()
.filter(|(_, s)| {
s.size_bytes < config.target_segment_bytes as u64
|| s.needs_compaction(config.tombstone_ratio_threshold)
})
.collect();
candidates.sort_by_key(|(_, s)| s.min_lsn);
candidates
.iter()
.take(config.max_segments_per_pass)
.map(|(i, _)| *i)
.collect()
}
pub fn plan_compaction(segments: &[SegmentMeta], output_dir: &Path) -> Option<CompactionResult> {
if segments.is_empty() {
return None;
}
let min_lsn = segments.iter().map(|s| s.min_lsn).min().unwrap_or(0);
let max_lsn = segments.iter().map(|s| s.max_lsn).max().unwrap_or(0);
let tombstones: u64 = segments.iter().map(|s| s.tombstone_entries).sum();
let total_bytes: u64 = segments.iter().map(|s| s.size_bytes).sum();
let tombstone_bytes = total_bytes * tombstones
/ (segments
.iter()
.map(|s| s.live_entries + s.tombstone_entries)
.sum::<u64>())
.max(1);
let output_path = output_dir.join(format!("segment-{min_lsn}-{max_lsn}.dat"));
Some(CompactionResult {
input_segments: segments.iter().map(|s| s.path.clone()).collect(),
output_segment: output_path,
tombstones_removed: tombstones,
bytes_reclaimed: tombstone_bytes,
min_lsn,
max_lsn,
})
}
pub fn merge_hnsw_vectors(
segment_vectors: &[Vec<(Vec<f32>, bool)>], ) -> (Vec<Vec<f32>>, usize) {
let mut merged = Vec::new();
let mut dropped = 0;
for segment in segment_vectors {
for (vector, deleted) in segment {
if *deleted {
dropped += 1;
} else {
merged.push(vector.clone());
}
}
}
info!(
merged = merged.len(),
dropped,
segments = segment_vectors.len(),
"HNSW segment merge: vectors merged"
);
(merged, dropped)
}
#[cfg(test)]
mod tests {
use super::*;
fn make_segments(n: usize) -> Vec<SegmentMeta> {
(0..n)
.map(|i| SegmentMeta {
path: PathBuf::from(format!("seg-{i}.dat")),
size_bytes: 10 * 1024 * 1024, min_lsn: (i * 1000) as u64,
max_lsn: ((i + 1) * 1000 - 1) as u64,
live_entries: 900,
tombstone_entries: 100,
created_at: 0,
})
.collect()
}
#[test]
fn select_segments_respects_min_count() {
let segments = make_segments(2);
let config = CompactionConfig {
min_segments_to_compact: 4,
..Default::default()
};
let selected = select_segments_for_compaction(&segments, &config);
assert!(selected.is_empty()); }
#[test]
fn select_segments_picks_small_and_tombstoned() {
let segments = make_segments(6);
let config = CompactionConfig {
min_segments_to_compact: 4,
max_segments_per_pass: 4,
target_segment_bytes: 256 * 1024 * 1024,
tombstone_ratio_threshold: 0.05, };
let selected = select_segments_for_compaction(&segments, &config);
assert_eq!(selected.len(), 4); }
#[test]
fn plan_compaction_output() {
let segments = make_segments(3);
let result = plan_compaction(&segments, Path::new("/tmp")).unwrap();
assert_eq!(result.input_segments.len(), 3);
assert_eq!(result.min_lsn, 0);
assert_eq!(result.max_lsn, 2999);
assert!(result.tombstones_removed > 0);
}
#[test]
fn hnsw_merge_drops_tombstones() {
let seg1 = vec![
(vec![1.0, 0.0], false),
(vec![2.0, 0.0], true), (vec![3.0, 0.0], false),
];
let seg2 = vec![
(vec![4.0, 0.0], false),
(vec![5.0, 0.0], true), ];
let (merged, dropped) = merge_hnsw_vectors(&[seg1, seg2]);
assert_eq!(merged.len(), 3); assert_eq!(dropped, 2);
}
#[test]
fn tombstone_ratio() {
let seg = SegmentMeta {
path: PathBuf::from("test.dat"),
size_bytes: 1000,
min_lsn: 0,
max_lsn: 100,
live_entries: 700,
tombstone_entries: 300,
created_at: 0,
};
let ratio = seg.tombstone_ratio();
assert!((ratio - 0.3).abs() < 0.01);
assert!(seg.needs_compaction(0.25));
assert!(!seg.needs_compaction(0.35));
}
}