kstone_core/
compaction.rs

1/// Background compaction for LSM engine
2///
3/// Compaction merges multiple SST files into one, keeping only the latest version
4/// of each key. This reduces read amplification and reclaims disk space.
5///
6/// # Compaction Strategy
7///
8/// Uses a simple level-based compaction approach:
9/// - Triggers when a stripe has too many SST files (default: ≥10 SSTs)
10/// - Merges all SSTs in a stripe into a single new SST
11/// - Removes tombstones (deleted records) during merge
12/// - Keeps newest version of each key (highest SeqNo)
13
14use crate::{Error, Result, Record, sst::{SstWriter, SstReader}};
15use std::collections::BTreeMap;
16use std::path::PathBuf;
17use std::fs;
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::sync::Arc;
20
21/// Default compaction trigger: compact when stripe has this many SSTs
22pub const DEFAULT_SST_THRESHOLD: usize = 10;
23
24/// Minimum SSTs required to trigger compaction (must have at least 2 to merge)
25pub const MIN_SSTS_TO_COMPACT: usize = 2;
26
27/// Legacy constant for backward compatibility
28pub const COMPACTION_THRESHOLD: usize = DEFAULT_SST_THRESHOLD;
29
30/// Compaction configuration
31#[derive(Clone, Debug)]
32pub struct CompactionConfig {
33    /// Enable/disable automatic background compaction
34    pub enabled: bool,
35
36    /// Trigger compaction when stripe has this many SSTs
37    pub sst_threshold: usize,
38
39    /// How often to check for compaction opportunities (in seconds)
40    pub check_interval_secs: u64,
41
42    /// Maximum number of stripes to compact concurrently
43    pub max_concurrent_compactions: usize,
44}
45
46impl Default for CompactionConfig {
47    fn default() -> Self {
48        Self {
49            enabled: true,
50            sst_threshold: DEFAULT_SST_THRESHOLD,
51            check_interval_secs: 60, // Check every minute
52            max_concurrent_compactions: 4, // Compact up to 4 stripes at once
53        }
54    }
55}
56
57impl CompactionConfig {
58    /// Create a new compaction config with all defaults
59    pub fn new() -> Self {
60        Self::default()
61    }
62
63    /// Disable automatic compaction
64    pub fn disabled() -> Self {
65        Self {
66            enabled: false,
67            ..Default::default()
68        }
69    }
70
71    /// Set SST threshold for triggering compaction
72    pub fn with_sst_threshold(mut self, threshold: usize) -> Self {
73        self.sst_threshold = threshold.max(MIN_SSTS_TO_COMPACT);
74        self
75    }
76
77    /// Set check interval in seconds
78    pub fn with_check_interval(mut self, seconds: u64) -> Self {
79        self.check_interval_secs = seconds;
80        self
81    }
82
83    /// Set maximum concurrent compactions
84    pub fn with_max_concurrent(mut self, max: usize) -> Self {
85        self.max_concurrent_compactions = max.max(1);
86        self
87    }
88}
89
90/// Statistics about compaction operations
91#[derive(Clone, Debug, Default)]
92pub struct CompactionStats {
93    /// Total number of compactions performed
94    pub total_compactions: u64,
95
96    /// Total number of SSTs merged
97    pub total_ssts_merged: u64,
98
99    /// Total number of SSTs created
100    pub total_ssts_created: u64,
101
102    /// Total bytes read during compaction
103    pub total_bytes_read: u64,
104
105    /// Total bytes written during compaction
106    pub total_bytes_written: u64,
107
108    /// Total bytes reclaimed (deleted records)
109    pub total_bytes_reclaimed: u64,
110
111    /// Total number of records deduplicated
112    pub total_records_deduplicated: u64,
113
114    /// Total number of tombstones removed
115    pub total_tombstones_removed: u64,
116
117    /// Number of currently active compactions
118    pub active_compactions: u64,
119}
120
121/// Thread-safe compaction statistics using atomics
122#[derive(Clone)]
123pub struct CompactionStatsAtomic {
124    total_compactions: Arc<AtomicU64>,
125    total_ssts_merged: Arc<AtomicU64>,
126    total_ssts_created: Arc<AtomicU64>,
127    total_bytes_read: Arc<AtomicU64>,
128    total_bytes_written: Arc<AtomicU64>,
129    total_bytes_reclaimed: Arc<AtomicU64>,
130    total_records_deduplicated: Arc<AtomicU64>,
131    total_tombstones_removed: Arc<AtomicU64>,
132    active_compactions: Arc<AtomicU64>,
133}
134
135impl Default for CompactionStatsAtomic {
136    fn default() -> Self {
137        Self::new()
138    }
139}
140
141impl CompactionStatsAtomic {
142    /// Create new atomic statistics
143    pub fn new() -> Self {
144        Self {
145            total_compactions: Arc::new(AtomicU64::new(0)),
146            total_ssts_merged: Arc::new(AtomicU64::new(0)),
147            total_ssts_created: Arc::new(AtomicU64::new(0)),
148            total_bytes_read: Arc::new(AtomicU64::new(0)),
149            total_bytes_written: Arc::new(AtomicU64::new(0)),
150            total_bytes_reclaimed: Arc::new(AtomicU64::new(0)),
151            total_records_deduplicated: Arc::new(AtomicU64::new(0)),
152            total_tombstones_removed: Arc::new(AtomicU64::new(0)),
153            active_compactions: Arc::new(AtomicU64::new(0)),
154        }
155    }
156
157    /// Get a snapshot of current statistics
158    pub fn snapshot(&self) -> CompactionStats {
159        CompactionStats {
160            total_compactions: self.total_compactions.load(Ordering::Relaxed),
161            total_ssts_merged: self.total_ssts_merged.load(Ordering::Relaxed),
162            total_ssts_created: self.total_ssts_created.load(Ordering::Relaxed),
163            total_bytes_read: self.total_bytes_read.load(Ordering::Relaxed),
164            total_bytes_written: self.total_bytes_written.load(Ordering::Relaxed),
165            total_bytes_reclaimed: self.total_bytes_reclaimed.load(Ordering::Relaxed),
166            total_records_deduplicated: self.total_records_deduplicated.load(Ordering::Relaxed),
167            total_tombstones_removed: self.total_tombstones_removed.load(Ordering::Relaxed),
168            active_compactions: self.active_compactions.load(Ordering::Relaxed),
169        }
170    }
171
172    /// Increment compaction counter and return guard that decrements active count on drop
173    pub fn start_compaction(&self) -> CompactionGuard {
174        self.total_compactions.fetch_add(1, Ordering::Relaxed);
175        self.active_compactions.fetch_add(1, Ordering::Relaxed);
176        CompactionGuard {
177            stats: self.clone(),
178        }
179    }
180
181    /// Record SSTs merged
182    pub fn record_ssts_merged(&self, count: u64) {
183        self.total_ssts_merged.fetch_add(count, Ordering::Relaxed);
184    }
185
186    /// Record SSTs created
187    pub fn record_ssts_created(&self, count: u64) {
188        self.total_ssts_created.fetch_add(count, Ordering::Relaxed);
189    }
190
191    /// Record bytes read
192    pub fn record_bytes_read(&self, bytes: u64) {
193        self.total_bytes_read.fetch_add(bytes, Ordering::Relaxed);
194    }
195
196    /// Record bytes written
197    pub fn record_bytes_written(&self, bytes: u64) {
198        self.total_bytes_written.fetch_add(bytes, Ordering::Relaxed);
199    }
200
201    /// Record bytes reclaimed
202    pub fn record_bytes_reclaimed(&self, bytes: u64) {
203        self.total_bytes_reclaimed.fetch_add(bytes, Ordering::Relaxed);
204    }
205
206    /// Record records deduplicated
207    pub fn record_records_deduplicated(&self, count: u64) {
208        self.total_records_deduplicated
209            .fetch_add(count, Ordering::Relaxed);
210    }
211
212    /// Record tombstones removed
213    pub fn record_tombstones_removed(&self, count: u64) {
214        self.total_tombstones_removed
215            .fetch_add(count, Ordering::Relaxed);
216    }
217}
218
219/// RAII guard that decrements active compaction count on drop
220pub struct CompactionGuard {
221    stats: CompactionStatsAtomic,
222}
223
224impl Drop for CompactionGuard {
225    fn drop(&mut self) {
226        self.stats
227            .active_compactions
228            .fetch_sub(1, Ordering::Relaxed);
229    }
230}
231
232/// Compaction manager for a stripe
233pub struct CompactionManager {
234    stripe_id: usize,
235    dir: PathBuf,
236}
237
238impl CompactionManager {
239    /// Create a new compaction manager
240    pub fn new(stripe_id: usize, dir: PathBuf) -> Self {
241        Self { stripe_id, dir }
242    }
243
244    /// Check if compaction is needed for this stripe
245    pub fn needs_compaction(&self, sst_count: usize) -> bool {
246        sst_count >= COMPACTION_THRESHOLD
247    }
248
249    /// Compact multiple SST files into a single new SST
250    ///
251    /// Algorithm:
252    /// 1. Read all records from all input SSTs
253    /// 2. Merge by key, keeping only the latest version (highest SeqNo)
254    /// 3. Filter out tombstones (deleted records)
255    /// 4. Write merged records to new SST
256    /// 5. Return new SST reader and paths of old SSTs to delete
257    pub fn compact(
258        &self,
259        ssts: &[SstReader],
260        next_sst_id: u64,
261    ) -> Result<(SstReader, Vec<PathBuf>)> {
262        if ssts.is_empty() {
263            return Err(Error::InvalidArgument("Cannot compact zero SSTs".into()));
264        }
265
266        // Step 1: Collect all records from all SSTs into a map
267        // Key: encoded key, Value: latest record for that key
268        let mut records_by_key: BTreeMap<Vec<u8>, Record> = BTreeMap::new();
269
270        for sst in ssts {
271            for record in sst.scan()? {
272                let encoded_key = record.key.encode().to_vec();
273
274                // Keep record with highest SeqNo (latest version)
275                records_by_key
276                    .entry(encoded_key)
277                    .and_modify(|existing| {
278                        if record.seq > existing.seq {
279                            *existing = record.clone();
280                        }
281                    })
282                    .or_insert(record);
283            }
284        }
285
286        // Step 2: Filter out tombstones and collect records to write
287        let mut records_to_write: Vec<Record> = records_by_key
288            .into_values()
289            .filter(|record| !record.is_tombstone())
290            .collect();
291
292        // Sort by encoded key (already sorted from BTreeMap, but ensure consistency)
293        records_to_write.sort_by(|a, b| a.key.encode().cmp(&b.key.encode()));
294
295        // Step 3: Write new SST
296        let new_sst_path = self.dir.join(format!("{:03}-{}.sst", self.stripe_id, next_sst_id));
297        let mut writer = SstWriter::new();
298
299        for record in records_to_write {
300            writer.add(record);
301        }
302
303        writer.finish(&new_sst_path)?;
304
305        // Step 4: Open new SST reader
306        let new_reader = SstReader::open(&new_sst_path)?;
307
308        // Step 5: Collect paths of old SSTs to delete
309        let old_sst_paths: Vec<PathBuf> = ssts
310            .iter()
311            .map(|sst| sst.path().to_path_buf())
312            .collect();
313
314        Ok((new_reader, old_sst_paths))
315    }
316
317    /// Delete old SST files after successful compaction
318    pub fn cleanup_old_ssts(&self, old_sst_paths: Vec<PathBuf>) -> Result<()> {
319        for path in old_sst_paths {
320            if path.exists() {
321                fs::remove_file(&path).map_err(|e| {
322                    Error::Internal(format!("Failed to delete old SST {:?}: {}", path, e))
323                })?;
324            }
325        }
326        Ok(())
327    }
328}
329
330#[cfg(test)]
331mod tests {
332    use super::*;
333    use crate::{Key, Value, SeqNo, sst::SstWriter};
334    use tempfile::TempDir;
335    use std::collections::HashMap;
336
337    fn create_test_record(pk: &[u8], seq: SeqNo, value: &str) -> Record {
338        let mut item = HashMap::new();
339        item.insert("value".to_string(), Value::string(value));
340        Record::put(Key::new(pk.to_vec()), item, seq)
341    }
342
343    fn create_delete_record(pk: &[u8], seq: SeqNo) -> Record {
344        Record::delete(Key::new(pk.to_vec()), seq)
345    }
346
347    #[test]
348    fn test_compaction_manager_needs_compaction() {
349        let dir = TempDir::new().unwrap();
350        let manager = CompactionManager::new(0, dir.path().to_path_buf());
351
352        assert!(!manager.needs_compaction(5));
353        assert!(!manager.needs_compaction(9));
354        assert!(manager.needs_compaction(10));
355        assert!(manager.needs_compaction(15));
356    }
357
358    #[test]
359    fn test_compact_multiple_versions() {
360        let dir = TempDir::new().unwrap();
361        let manager = CompactionManager::new(0, dir.path().to_path_buf());
362
363        // Create SST 1: key1=v1 (seq 1), key2=v1 (seq 1)
364        let sst1_path = dir.path().join("000-1.sst");
365        let mut writer1 = SstWriter::new();
366        writer1.add(create_test_record(b"key1", 1, "v1"));
367        writer1.add(create_test_record(b"key2", 1, "v1"));
368        writer1.finish(&sst1_path).unwrap();
369
370        // Create SST 2: key1=v2 (seq 2), key3=v1 (seq 2)
371        let sst2_path = dir.path().join("000-2.sst");
372        let mut writer2 = SstWriter::new();
373        writer2.add(create_test_record(b"key1", 2, "v2"));
374        writer2.add(create_test_record(b"key3", 2, "v1"));
375        writer2.finish(&sst2_path).unwrap();
376
377        // Compact
378        let sst1 = SstReader::open(&sst1_path).unwrap();
379        let sst2 = SstReader::open(&sst2_path).unwrap();
380        let (new_sst, old_paths) = manager.compact(&[sst1, sst2], 3).unwrap();
381
382        // Verify: should have 3 keys, with key1 having latest version (v2)
383        let records: Vec<Record> = new_sst.scan().unwrap().collect();
384        assert_eq!(records.len(), 3);
385
386        // Find key1
387        let key1_record = records.iter().find(|r| r.key.pk.as_ref() == b"key1").unwrap();
388        assert_eq!(key1_record.seq, 2);
389        assert_eq!(
390            key1_record.value.as_ref().unwrap().get("value").unwrap().as_string(),
391            Some("v2")
392        );
393
394        assert_eq!(old_paths.len(), 2);
395    }
396
397    #[test]
398    fn test_compact_filters_tombstones() {
399        let dir = TempDir::new().unwrap();
400        let manager = CompactionManager::new(0, dir.path().to_path_buf());
401
402        // Create SST 1: key1=v1 (seq 1), key2=v1 (seq 1)
403        let sst1_path = dir.path().join("000-1.sst");
404        let mut writer1 = SstWriter::new();
405        writer1.add(create_test_record(b"key1", 1, "v1"));
406        writer1.add(create_test_record(b"key2", 1, "v1"));
407        writer1.finish(&sst1_path).unwrap();
408
409        // Create SST 2: key1=DELETE (seq 2)
410        let sst2_path = dir.path().join("000-2.sst");
411        let mut writer2 = SstWriter::new();
412        writer2.add(create_delete_record(b"key1", 2));
413        writer2.finish(&sst2_path).unwrap();
414
415        // Compact
416        let sst1 = SstReader::open(&sst1_path).unwrap();
417        let sst2 = SstReader::open(&sst2_path).unwrap();
418        let (new_sst, _) = manager.compact(&[sst1, sst2], 3).unwrap();
419
420        // Verify: should only have key2 (key1 was deleted)
421        let records: Vec<Record> = new_sst.scan().unwrap().collect();
422        assert_eq!(records.len(), 1);
423        assert_eq!(records[0].key.pk.as_ref(), b"key2");
424    }
425
426    #[test]
427    fn test_compact_empty_result() {
428        let dir = TempDir::new().unwrap();
429        let manager = CompactionManager::new(0, dir.path().to_path_buf());
430
431        // Create SST with only a deletion
432        let sst1_path = dir.path().join("000-1.sst");
433        let mut writer1 = SstWriter::new();
434        writer1.add(create_delete_record(b"key1", 1));
435        writer1.finish(&sst1_path).unwrap();
436
437        // Compact
438        let sst1 = SstReader::open(&sst1_path).unwrap();
439        let (new_sst, _) = manager.compact(&[sst1], 2).unwrap();
440
441        // Verify: should be empty (all tombstones filtered)
442        let records: Vec<Record> = new_sst.scan().unwrap().collect();
443        assert_eq!(records.len(), 0);
444    }
445
446    #[test]
447    fn test_cleanup_old_ssts() {
448        let dir = TempDir::new().unwrap();
449        let manager = CompactionManager::new(0, dir.path().to_path_buf());
450
451        // Create test files
452        let file1 = dir.path().join("000-1.sst");
453        let file2 = dir.path().join("000-2.sst");
454        std::fs::write(&file1, b"test").unwrap();
455        std::fs::write(&file2, b"test").unwrap();
456
457        assert!(file1.exists());
458        assert!(file2.exists());
459
460        // Cleanup
461        manager.cleanup_old_ssts(vec![file1.clone(), file2.clone()]).unwrap();
462
463        assert!(!file1.exists());
464        assert!(!file2.exists());
465    }
466
467    #[test]
468    fn test_compact_preserves_order() {
469        let dir = TempDir::new().unwrap();
470        let manager = CompactionManager::new(0, dir.path().to_path_buf());
471
472        // Create SST with multiple keys in order
473        let sst1_path = dir.path().join("000-1.sst");
474        let mut writer1 = SstWriter::new();
475        writer1.add(create_test_record(b"key1", 1, "v1"));
476        writer1.add(create_test_record(b"key3", 1, "v3"));
477        writer1.finish(&sst1_path).unwrap();
478
479        let sst2_path = dir.path().join("000-2.sst");
480        let mut writer2 = SstWriter::new();
481        writer2.add(create_test_record(b"key2", 2, "v2"));
482        writer2.add(create_test_record(b"key4", 2, "v4"));
483        writer2.finish(&sst2_path).unwrap();
484
485        // Compact
486        let sst1 = SstReader::open(&sst1_path).unwrap();
487        let sst2 = SstReader::open(&sst2_path).unwrap();
488        let (new_sst, _) = manager.compact(&[sst1, sst2], 3).unwrap();
489
490        // Verify: records should be sorted
491        let records: Vec<Record> = new_sst.scan().unwrap().collect();
492        assert_eq!(records.len(), 4);
493        assert_eq!(records[0].key.pk.as_ref(), b"key1");
494        assert_eq!(records[1].key.pk.as_ref(), b"key2");
495        assert_eq!(records[2].key.pk.as_ref(), b"key3");
496        assert_eq!(records[3].key.pk.as_ref(), b"key4");
497    }
498
499    #[test]
500    fn test_compaction_config_defaults() {
501        let config = CompactionConfig::default();
502        assert!(config.enabled);
503        assert_eq!(config.sst_threshold, DEFAULT_SST_THRESHOLD);
504        assert_eq!(config.check_interval_secs, 60);
505        assert_eq!(config.max_concurrent_compactions, 4);
506    }
507
508    #[test]
509    fn test_compaction_config_disabled() {
510        let config = CompactionConfig::disabled();
511        assert!(!config.enabled);
512    }
513
514    #[test]
515    fn test_compaction_config_builder() {
516        let config = CompactionConfig::new()
517            .with_sst_threshold(5)
518            .with_check_interval(30)
519            .with_max_concurrent(2);
520
521        assert_eq!(config.sst_threshold, 5);
522        assert_eq!(config.check_interval_secs, 30);
523        assert_eq!(config.max_concurrent_compactions, 2);
524    }
525
526    #[test]
527    fn test_compaction_stats_atomic_snapshot() {
528        let stats = CompactionStatsAtomic::new();
529
530        stats.record_ssts_merged(5);
531        stats.record_ssts_created(1);
532        stats.record_bytes_read(1024);
533        stats.record_bytes_written(512);
534        stats.record_tombstones_removed(3);
535
536        let snapshot = stats.snapshot();
537        assert_eq!(snapshot.total_ssts_merged, 5);
538        assert_eq!(snapshot.total_ssts_created, 1);
539        assert_eq!(snapshot.total_bytes_read, 1024);
540        assert_eq!(snapshot.total_bytes_written, 512);
541        assert_eq!(snapshot.total_tombstones_removed, 3);
542    }
543
544    #[test]
545    fn test_compaction_guard_decrements_on_drop() {
546        let stats = CompactionStatsAtomic::new();
547
548        {
549            let _guard = stats.start_compaction();
550            assert_eq!(stats.active_compactions.load(Ordering::Relaxed), 1);
551            assert_eq!(stats.total_compactions.load(Ordering::Relaxed), 1);
552        }
553
554        // Guard dropped, active should decrement
555        assert_eq!(stats.active_compactions.load(Ordering::Relaxed), 0);
556        assert_eq!(stats.total_compactions.load(Ordering::Relaxed), 1);
557    }
558
559    #[test]
560    fn test_multiple_compaction_guards() {
561        let stats = CompactionStatsAtomic::new();
562
563        let _guard1 = stats.start_compaction();
564        let _guard2 = stats.start_compaction();
565        let _guard3 = stats.start_compaction();
566
567        assert_eq!(stats.active_compactions.load(Ordering::Relaxed), 3);
568        assert_eq!(stats.total_compactions.load(Ordering::Relaxed), 3);
569
570        drop(_guard2);
571        assert_eq!(stats.active_compactions.load(Ordering::Relaxed), 2);
572    }
573}