rustlite_storage/
compaction.rs

1//! Compaction - Background merging and level management
2//!
3//! Compaction merges SSTables to reduce read amplification and
4//! reclaim space from deleted entries (tombstones).
5
6use crate::manifest::Manifest;
7use crate::sstable::{delete_sstable, SSTableEntry, SSTableMeta, SSTableReader, SSTableWriter};
8use rustlite_core::Result;
9use std::cmp::Ordering;
10use std::collections::BinaryHeap;
11use std::path::{Path, PathBuf};
12use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
13use std::sync::Arc;
14use tracing::{info, instrument, warn};
15
16/// Compaction configuration
17#[derive(Debug, Clone)]
18pub struct CompactionConfig {
19    /// Maximum number of SSTables at level 0 before triggering compaction
20    pub level0_trigger: usize,
21    /// Size multiplier between levels (e.g., 10 means level N+1 is 10x larger)
22    pub level_multiplier: usize,
23    /// Maximum size for level 1 in bytes
24    pub level1_max_size: u64,
25    /// Maximum number of levels
26    pub max_levels: u32,
27    /// Target file size for output SSTables
28    pub target_file_size: u64,
29}
30
31impl Default for CompactionConfig {
32    fn default() -> Self {
33        Self {
34            level0_trigger: 4,
35            level_multiplier: 10,
36            level1_max_size: 10 * 1024 * 1024, // 10MB
37            max_levels: 7,
38            target_file_size: 2 * 1024 * 1024, // 2MB
39        }
40    }
41}
42
43/// Statistics for compaction
44#[derive(Debug, Clone, Default)]
45pub struct CompactionStats {
46    /// Total bytes read during compaction
47    pub bytes_read: u64,
48    /// Total bytes written during compaction
49    pub bytes_written: u64,
50    /// Number of compactions performed
51    pub compaction_count: u64,
52    /// Number of entries removed (tombstones + overwritten)
53    pub entries_removed: u64,
54}
55
56/// Entry for merge iterator (with ordering)
57#[derive(Debug)]
58struct MergeEntry {
59    key: Vec<u8>,
60    entry: SSTableEntry,
61    source_idx: usize,
62}
63
64impl PartialEq for MergeEntry {
65    fn eq(&self, other: &Self) -> bool {
66        self.key == other.key && self.source_idx == other.source_idx
67    }
68}
69
70impl Eq for MergeEntry {}
71
72impl PartialOrd for MergeEntry {
73    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
74        Some(self.cmp(other))
75    }
76}
77
78impl Ord for MergeEntry {
79    fn cmp(&self, other: &Self) -> Ordering {
80        // BinaryHeap is a max-heap, so we reverse key comparison for min-heap behavior
81        // For equal keys, higher source_idx (newer files) should come first
82        match other.key.cmp(&self.key) {
83            Ordering::Equal => self.source_idx.cmp(&other.source_idx),
84            ord => ord,
85        }
86    }
87}
88
89/// Compaction worker
90pub struct CompactionWorker {
91    /// Database directory
92    dir: PathBuf,
93    /// Configuration
94    config: CompactionConfig,
95    /// Statistics
96    stats: CompactionStats,
97    /// Counter for generating unique SSTable names
98    file_counter: AtomicU64,
99    /// Flag to stop compaction
100    stop_flag: Arc<AtomicBool>,
101}
102
103impl CompactionWorker {
104    /// Create a new compaction worker
105    pub fn new(dir: impl AsRef<Path>, config: CompactionConfig) -> Self {
106        Self {
107            dir: dir.as_ref().to_path_buf(),
108            config,
109            stats: CompactionStats::default(),
110            file_counter: AtomicU64::new(0),
111            stop_flag: Arc::new(AtomicBool::new(false)),
112        }
113    }
114
115    /// Get the stop flag for external control
116    pub fn stop_flag(&self) -> Arc<AtomicBool> {
117        Arc::clone(&self.stop_flag)
118    }
119
120    /// Check if compaction is needed for level 0
121    pub fn needs_compaction(&self, manifest: &Manifest) -> bool {
122        let level0_count = manifest.sstables_at_level(0).len();
123        level0_count >= self.config.level0_trigger
124    }
125
126    /// Check which level needs compaction
127    pub fn pick_compaction_level(&self, manifest: &Manifest) -> Option<u32> {
128        // Check level 0 first
129        let level0_count = manifest.sstables_at_level(0).len();
130        if level0_count >= self.config.level0_trigger {
131            return Some(0);
132        }
133
134        // Check other levels by size
135        for level in 1..self.config.max_levels {
136            let level_size: u64 = manifest
137                .sstables_at_level(level)
138                .iter()
139                .map(|s| s.file_size)
140                .sum();
141
142            let max_size = self.max_size_for_level(level);
143            if level_size > max_size {
144                return Some(level);
145            }
146        }
147
148        None
149    }
150
151    /// Get the maximum size for a level
152    fn max_size_for_level(&self, level: u32) -> u64 {
153        if level == 0 {
154            return u64::MAX; // Level 0 is count-based, not size-based
155        }
156
157        let mut size = self.config.level1_max_size;
158        for _ in 1..level {
159            size *= self.config.level_multiplier as u64;
160        }
161        size
162    }
163
164    /// Generate a unique SSTable path
165    fn next_sstable_path(&self, level: u32) -> PathBuf {
166        let counter = self.file_counter.fetch_add(1, AtomicOrdering::SeqCst);
167        let timestamp = std::time::SystemTime::now()
168            .duration_since(std::time::UNIX_EPOCH)
169            .unwrap_or_default()
170            .as_millis();
171
172        self.dir
173            .join("sst")
174            .join(format!("L{}_{}_{}.sst", level, timestamp, counter))
175    }
176
177    /// Compact level 0 to level 1
178    #[instrument(skip(self, manifest))]
179    pub fn compact_level0(&mut self, manifest: &mut Manifest) -> Result<()> {
180        let level0_sstables = manifest.sstables_at_level(0);
181        if level0_sstables.is_empty() {
182            return Ok(());
183        }
184
185        info!(
186            level0_count = level0_sstables.len(),
187            "Starting level 0 compaction"
188        );
189
190        // Collect all level 0 SSTable paths
191        let input_paths: Vec<PathBuf> = level0_sstables
192            .iter()
193            .map(|s| PathBuf::from(&s.path))
194            .collect();
195
196        // Find overlapping level 1 SSTables
197        let level1_sstables = manifest.sstables_at_level(1);
198
199        // For simplicity, merge all level 0 with overlapping level 1
200        let mut all_inputs: Vec<PathBuf> = input_paths.clone();
201
202        // Get min/max key range from level 0
203        let min_key: Vec<u8> = level0_sstables
204            .iter()
205            .map(|s| s.min_key.clone())
206            .min()
207            .unwrap_or_default();
208        let max_key: Vec<u8> = level0_sstables
209            .iter()
210            .map(|s| s.max_key.clone())
211            .max()
212            .unwrap_or_default();
213
214        // Add overlapping level 1 SSTables
215        for sst in level1_sstables {
216            if sst.max_key >= min_key && sst.min_key <= max_key {
217                all_inputs.push(PathBuf::from(&sst.path));
218            }
219        }
220
221        // Perform the merge
222        let outputs = self.merge_sstables(&all_inputs, 1)?;
223
224        // Update manifest
225        manifest.record_compaction(0, all_inputs.clone(), outputs)?;
226
227        // Delete old files
228        for path in all_inputs {
229            let _ = delete_sstable(&path);
230        }
231
232        self.stats.compaction_count += 1;
233
234        Ok(())
235    }
236
237    /// Merge multiple SSTables into new SSTables at the target level
238    fn merge_sstables(
239        &mut self,
240        inputs: &[PathBuf],
241        target_level: u32,
242    ) -> Result<Vec<SSTableMeta>> {
243        if inputs.is_empty() {
244            return Ok(Vec::new());
245        }
246
247        // Create SST directory if needed
248        let sst_dir = self.dir.join("sst");
249        std::fs::create_dir_all(&sst_dir)?;
250
251        // Open all input SSTables
252        let mut readers: Vec<SSTableReader> = Vec::new();
253        for path in inputs {
254            if path.exists() {
255                match SSTableReader::open(path) {
256                    Ok(reader) => {
257                        self.stats.bytes_read += reader.metadata().file_size;
258                        readers.push(reader);
259                    }
260                    Err(_) => continue, // Skip corrupted files
261                }
262            }
263        }
264
265        if readers.is_empty() {
266            return Ok(Vec::new());
267        }
268
269        // Initialize merge heap
270        let mut heap: BinaryHeap<MergeEntry> = BinaryHeap::new();
271        let mut iterators: Vec<_> = readers
272            .iter_mut()
273            .map(|r| r.iter())
274            .collect::<Result<Vec<_>>>()?;
275
276        // Prime the heap with first entry from each SSTable
277        for (idx, iter) in iterators.iter_mut().enumerate() {
278            if let Some(entry) = iter.next_entry()? {
279                heap.push(MergeEntry {
280                    key: entry.key.clone(),
281                    entry,
282                    source_idx: idx,
283                });
284            }
285        }
286
287        // Output SSTables
288        let mut outputs: Vec<SSTableMeta> = Vec::new();
289        let mut current_writer: Option<SSTableWriter> = None;
290        let mut current_size: u64 = 0;
291        let mut last_key: Option<Vec<u8>> = None;
292
293        while let Some(merge_entry) = heap.pop() {
294            // Skip duplicate keys (keep the newest - higher source_idx)
295            if last_key.as_ref() == Some(&merge_entry.key) {
296                self.stats.entries_removed += 1;
297                // Advance the iterator that provided this entry
298                if let Some(next) = iterators[merge_entry.source_idx].next_entry()? {
299                    heap.push(MergeEntry {
300                        key: next.key.clone(),
301                        entry: next,
302                        source_idx: merge_entry.source_idx,
303                    });
304                }
305                continue;
306            }
307
308            // Start a new SSTable if needed
309            if current_writer.is_none() || current_size >= self.config.target_file_size {
310                // Finish current writer
311                if let Some(writer) = current_writer.take() {
312                    let meta = writer.finish()?;
313                    self.stats.bytes_written += meta.file_size;
314                    outputs.push(meta);
315                }
316
317                // Start new writer
318                let path = self.next_sstable_path(target_level);
319                current_writer = Some(SSTableWriter::new(&path)?);
320                current_size = 0;
321            }
322
323            // Write entry
324            if let Some(ref mut writer) = current_writer {
325                let entry_size = merge_entry.entry.key.len() + merge_entry.entry.value.len() + 10;
326                writer.add(merge_entry.entry.clone())?;
327                current_size += entry_size as u64;
328            }
329
330            last_key = Some(merge_entry.key);
331
332            // Advance the iterator that provided this entry
333            if let Some(next) = iterators[merge_entry.source_idx].next_entry()? {
334                heap.push(MergeEntry {
335                    key: next.key.clone(),
336                    entry: next,
337                    source_idx: merge_entry.source_idx,
338                });
339            }
340        }
341
342        // Finish last writer
343        if let Some(writer) = current_writer {
344            let meta = writer.finish()?;
345            self.stats.bytes_written += meta.file_size;
346            outputs.push(meta);
347        }
348
349        // Update level in output metadata
350        let outputs: Vec<SSTableMeta> = outputs
351            .into_iter()
352            .map(|mut m| {
353                m.level = target_level;
354                m
355            })
356            .collect();
357
358        Ok(outputs)
359    }
360
361    /// Get compaction statistics
362    pub fn stats(&self) -> &CompactionStats {
363        &self.stats
364    }
365
366    /// Run a single compaction pass
367    pub fn run_once(&mut self, manifest: &mut Manifest) -> Result<bool> {
368        if self.stop_flag.load(AtomicOrdering::Relaxed) {
369            return Ok(false);
370        }
371
372        if let Some(level) = self.pick_compaction_level(manifest) {
373            if level == 0 {
374                self.compact_level0(manifest)?;
375                return Ok(true);
376            }
377            // TODO: Implement higher level compaction
378        }
379
380        Ok(false)
381    }
382}
383
384#[cfg(test)]
385mod tests {
386    use super::*;
387    use crate::sstable::SSTableWriter;
388    use tempfile::tempdir;
389
390    #[test]
391    fn test_compaction_config_default() {
392        let config = CompactionConfig::default();
393        assert_eq!(config.level0_trigger, 4);
394        assert_eq!(config.max_levels, 7);
395    }
396
397    #[test]
398    fn test_merge_entry_ordering() {
399        let e1 = MergeEntry {
400            key: b"a".to_vec(),
401            entry: SSTableEntry::value(b"a".to_vec(), b"1".to_vec()),
402            source_idx: 0,
403        };
404        let e2 = MergeEntry {
405            key: b"b".to_vec(),
406            entry: SSTableEntry::value(b"b".to_vec(), b"2".to_vec()),
407            source_idx: 0,
408        };
409
410        // In a max-heap, larger values come first
411        // We want min-heap behavior, so e1 (key "a") should be "greater"
412        assert!(e1 > e2);
413    }
414
415    #[test]
416    fn test_needs_compaction() {
417        let dir = tempdir().unwrap();
418        let config = CompactionConfig {
419            level0_trigger: 2,
420            ..Default::default()
421        };
422        let worker = CompactionWorker::new(dir.path(), config);
423        let mut manifest = Manifest::open(dir.path()).unwrap();
424
425        assert!(!worker.needs_compaction(&manifest));
426
427        // Add level 0 SSTables
428        for i in 0..2 {
429            let meta = SSTableMeta {
430                path: PathBuf::from(format!("test{}.sst", i)),
431                min_key: vec![],
432                max_key: vec![],
433                entry_count: 0,
434                file_size: 0,
435                level: 0,
436                sequence: 0,
437            };
438            manifest.add_sstable(&meta).unwrap();
439        }
440
441        assert!(worker.needs_compaction(&manifest));
442    }
443
444    #[test]
445    fn test_merge_sstables() {
446        let dir = tempdir().unwrap();
447        let sst_dir = dir.path().join("sst");
448        std::fs::create_dir_all(&sst_dir).unwrap();
449
450        // Create two SSTables with overlapping keys
451        let path1 = sst_dir.join("test1.sst");
452        let mut writer1 = SSTableWriter::new(&path1).unwrap();
453        writer1
454            .add(SSTableEntry::value(b"a".to_vec(), b"1".to_vec()))
455            .unwrap();
456        writer1
457            .add(SSTableEntry::value(b"c".to_vec(), b"3".to_vec()))
458            .unwrap();
459        writer1.finish().unwrap();
460
461        let path2 = sst_dir.join("test2.sst");
462        let mut writer2 = SSTableWriter::new(&path2).unwrap();
463        writer2
464            .add(SSTableEntry::value(b"b".to_vec(), b"2".to_vec()))
465            .unwrap();
466        writer2
467            .add(SSTableEntry::value(b"c".to_vec(), b"3-new".to_vec()))
468            .unwrap(); // Overwrites
469        writer2.finish().unwrap();
470
471        // Merge
472        let config = CompactionConfig::default();
473        let mut worker = CompactionWorker::new(dir.path(), config);
474        let outputs = worker.merge_sstables(&[path1, path2], 1).unwrap();
475
476        assert!(!outputs.is_empty());
477
478        // Verify merged content
479        let mut reader = SSTableReader::open(&outputs[0].path).unwrap();
480        assert_eq!(reader.get(b"a").unwrap().unwrap().value, b"1".to_vec());
481        assert_eq!(reader.get(b"b").unwrap().unwrap().value, b"2".to_vec());
482        // "c" should have the newer value from the second SSTable
483        assert_eq!(reader.get(b"c").unwrap().unwrap().value, b"3-new".to_vec());
484    }
485}