Skip to main content

rvf_runtime/
compaction.rs

1//! Background compaction for dead space reclamation.
2//!
3//! Compaction scheduling policy (from spec 10, section 7):
4//! - IO budget: max 30% of IOPS (60% in emergency)
5//! - Priority: queries > ingest > compaction
6//! - Triggers: dead_space > 20%, segment_count > 32, time > 60s
7//! - Emergency: dead_space > 70% -> preempt ingest
8//!
9//! Segment selection order:
10//! 1. Tombstoned segments (reclaim dead space)
11//! 2. Small VEC_SEGs (< 1MB, merge into larger)
12//! 3. High-overlap INDEX_SEGs
13//! 4. Cold OVERLAY_SEGs
14
15/// Compaction trigger thresholds.
16#[allow(dead_code)]
17pub(crate) struct CompactionThresholds {
18    /// Minimum dead space ratio to trigger compaction.
19    pub dead_space_ratio: f64,
20    /// Maximum segment count before compaction.
21    pub max_segment_count: u32,
22    /// Minimum seconds since last compaction.
23    pub min_interval_secs: u64,
24    /// Emergency dead space ratio (preempts ingest).
25    pub emergency_ratio: f64,
26}
27
28impl Default for CompactionThresholds {
29    fn default() -> Self {
30        Self {
31            dead_space_ratio: 0.20,
32            max_segment_count: 32,
33            min_interval_secs: 60,
34            emergency_ratio: 0.70,
35        }
36    }
37}
38
39/// Compaction decision.
40#[derive(Clone, Copy, Debug, PartialEq, Eq)]
41#[allow(dead_code)]
42pub(crate) enum CompactionDecision {
43    /// No compaction needed.
44    None,
45    /// Normal compaction should run.
46    Normal,
47    /// Emergency compaction (high dead space).
48    Emergency,
49}
50
51/// Evaluate whether compaction should run.
52#[allow(dead_code)]
53pub(crate) fn evaluate_triggers(
54    dead_space_ratio: f64,
55    segment_count: u32,
56    secs_since_last: u64,
57    thresholds: &CompactionThresholds,
58) -> CompactionDecision {
59    // Emergency check first.
60    if dead_space_ratio > thresholds.emergency_ratio {
61        return CompactionDecision::Emergency;
62    }
63
64    // Check all normal conditions.
65    if secs_since_last < thresholds.min_interval_secs {
66        return CompactionDecision::None;
67    }
68
69    if dead_space_ratio > thresholds.dead_space_ratio {
70        return CompactionDecision::Normal;
71    }
72
73    if segment_count > thresholds.max_segment_count {
74        return CompactionDecision::Normal;
75    }
76
77    CompactionDecision::None
78}
79
80/// Represents a compaction plan: which segments to compact and how.
81#[derive(Clone, Debug)]
82#[allow(dead_code)]
83pub(crate) struct CompactionPlan {
84    /// Segment IDs to compact (input).
85    pub source_segments: Vec<u64>,
86    /// Whether this is emergency compaction.
87    pub emergency: bool,
88    /// IO budget as a fraction (0.30 normal, 0.60 emergency).
89    pub io_budget: f64,
90}
91
92impl CompactionPlan {
93    /// Create a normal compaction plan.
94    #[allow(dead_code)]
95    pub(crate) fn normal(segments: Vec<u64>) -> Self {
96        Self {
97            source_segments: segments,
98            emergency: false,
99            io_budget: 0.30,
100        }
101    }
102
103    /// Create an emergency compaction plan.
104    #[allow(dead_code)]
105    pub(crate) fn emergency(segments: Vec<u64>) -> Self {
106        Self {
107            source_segments: segments,
108            emergency: true,
109            io_budget: 0.60,
110        }
111    }
112}
113
114/// Select segments for compaction based on the tiered strategy.
115///
116/// Priority:
117/// 1. Tombstoned segments
118/// 2. Small VEC_SEGs (< threshold)
119/// 3. Remaining segments by age
120#[allow(dead_code)]
121pub(crate) fn select_segments(
122    segment_dir: &[(u64, u64, u8, bool)], // (seg_id, payload_len, seg_type, is_tombstoned)
123    max_segments: usize,
124) -> Vec<u64> {
125    let mut selected = Vec::new();
126
127    // Phase 1: tombstoned segments.
128    for &(seg_id, _, _, tombstoned) in segment_dir {
129        if tombstoned && selected.len() < max_segments {
130            selected.push(seg_id);
131        }
132    }
133
134    // Phase 2: small VEC_SEGs (< 1MB).
135    let small_threshold = 1024 * 1024;
136    for &(seg_id, payload_len, seg_type, _) in segment_dir {
137        if seg_type == 0x01 && payload_len < small_threshold && selected.len() < max_segments
138            && !selected.contains(&seg_id)
139        {
140            selected.push(seg_id);
141        }
142    }
143
144    // Phase 3: fill remaining with oldest segments.
145    for &(seg_id, _, _, _) in segment_dir {
146        if selected.len() >= max_segments {
147            break;
148        }
149        if !selected.contains(&seg_id) {
150            selected.push(seg_id);
151        }
152    }
153
154    selected
155}
156
157#[cfg(test)]
158mod tests {
159    use super::*;
160
161    #[test]
162    fn no_compaction_when_fresh() {
163        let decision = evaluate_triggers(0.10, 10, 30, &CompactionThresholds::default());
164        assert_eq!(decision, CompactionDecision::None);
165    }
166
167    #[test]
168    fn normal_compaction_on_dead_space() {
169        let decision = evaluate_triggers(0.25, 10, 120, &CompactionThresholds::default());
170        assert_eq!(decision, CompactionDecision::Normal);
171    }
172
173    #[test]
174    fn normal_compaction_on_segment_count() {
175        let decision = evaluate_triggers(0.10, 50, 120, &CompactionThresholds::default());
176        assert_eq!(decision, CompactionDecision::Normal);
177    }
178
179    #[test]
180    fn emergency_compaction_on_high_dead_space() {
181        let decision = evaluate_triggers(0.75, 10, 10, &CompactionThresholds::default());
182        assert_eq!(decision, CompactionDecision::Emergency);
183    }
184
185    #[test]
186    fn no_compaction_before_interval() {
187        let decision = evaluate_triggers(0.25, 50, 30, &CompactionThresholds::default());
188        // Even though dead_space and segment_count exceed thresholds,
189        // interval hasn't passed.
190        assert_eq!(decision, CompactionDecision::None);
191    }
192
193    #[test]
194    fn select_tombstoned_first() {
195        let segments = vec![
196            (1, 500_000, 0x01, false),
197            (2, 100_000, 0x01, true),  // tombstoned
198            (3, 200_000, 0x01, false),
199            (4, 50_000, 0x01, true),   // tombstoned
200        ];
201        let selected = select_segments(&segments, 3);
202        // Tombstoned segments (2, 4) should come first.
203        assert_eq!(selected[0], 2);
204        assert_eq!(selected[1], 4);
205        assert_eq!(selected.len(), 3);
206    }
207}