rustlite_storage/
compaction.rs

1//! Compaction - Background merging and level management
2//!
3//! Compaction merges SSTables to reduce read amplification and
4//! reclaim space from deleted entries (tombstones).
5
6use crate::manifest::Manifest;
7use crate::sstable::{delete_sstable, SSTableEntry, SSTableMeta, SSTableReader, SSTableWriter};
8use rustlite_core::Result;
9use std::collections::BinaryHeap;
10use std::cmp::Ordering;
11use std::path::{Path, PathBuf};
12use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
13use std::sync::Arc;
14
15/// Compaction configuration
16#[derive(Debug, Clone)]
17pub struct CompactionConfig {
18    /// Maximum number of SSTables at level 0 before triggering compaction
19    pub level0_trigger: usize,
20    /// Size multiplier between levels (e.g., 10 means level N+1 is 10x larger)
21    pub level_multiplier: usize,
22    /// Maximum size for level 1 in bytes
23    pub level1_max_size: u64,
24    /// Maximum number of levels
25    pub max_levels: u32,
26    /// Target file size for output SSTables
27    pub target_file_size: u64,
28}
29
30impl Default for CompactionConfig {
31    fn default() -> Self {
32        Self {
33            level0_trigger: 4,
34            level_multiplier: 10,
35            level1_max_size: 10 * 1024 * 1024, // 10MB
36            max_levels: 7,
37            target_file_size: 2 * 1024 * 1024, // 2MB
38        }
39    }
40}
41
42/// Statistics for compaction
43#[derive(Debug, Clone, Default)]
44pub struct CompactionStats {
45    /// Total bytes read during compaction
46    pub bytes_read: u64,
47    /// Total bytes written during compaction
48    pub bytes_written: u64,
49    /// Number of compactions performed
50    pub compaction_count: u64,
51    /// Number of entries removed (tombstones + overwritten)
52    pub entries_removed: u64,
53}
54
55/// Entry for merge iterator (with ordering)
56#[derive(Debug)]
57struct MergeEntry {
58    key: Vec<u8>,
59    entry: SSTableEntry,
60    source_idx: usize,
61}
62
63impl PartialEq for MergeEntry {
64    fn eq(&self, other: &Self) -> bool {
65        self.key == other.key && self.source_idx == other.source_idx
66    }
67}
68
69impl Eq for MergeEntry {}
70
71impl PartialOrd for MergeEntry {
72    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
73        Some(self.cmp(other))
74    }
75}
76
77impl Ord for MergeEntry {
78    fn cmp(&self, other: &Self) -> Ordering {
79        // BinaryHeap is a max-heap, so we reverse key comparison for min-heap behavior
80        // For equal keys, higher source_idx (newer files) should come first
81        match other.key.cmp(&self.key) {
82            Ordering::Equal => self.source_idx.cmp(&other.source_idx),
83            ord => ord,
84        }
85    }
86}
87
88/// Compaction worker
89pub struct CompactionWorker {
90    /// Database directory
91    dir: PathBuf,
92    /// Configuration
93    config: CompactionConfig,
94    /// Statistics
95    stats: CompactionStats,
96    /// Counter for generating unique SSTable names
97    file_counter: AtomicU64,
98    /// Flag to stop compaction
99    stop_flag: Arc<AtomicBool>,
100}
101
102impl CompactionWorker {
103    /// Create a new compaction worker
104    pub fn new(dir: impl AsRef<Path>, config: CompactionConfig) -> Self {
105        Self {
106            dir: dir.as_ref().to_path_buf(),
107            config,
108            stats: CompactionStats::default(),
109            file_counter: AtomicU64::new(0),
110            stop_flag: Arc::new(AtomicBool::new(false)),
111        }
112    }
113
114    /// Get the stop flag for external control
115    pub fn stop_flag(&self) -> Arc<AtomicBool> {
116        Arc::clone(&self.stop_flag)
117    }
118
119    /// Check if compaction is needed for level 0
120    pub fn needs_compaction(&self, manifest: &Manifest) -> bool {
121        let level0_count = manifest.sstables_at_level(0).len();
122        level0_count >= self.config.level0_trigger
123    }
124
125    /// Check which level needs compaction
126    pub fn pick_compaction_level(&self, manifest: &Manifest) -> Option<u32> {
127        // Check level 0 first
128        let level0_count = manifest.sstables_at_level(0).len();
129        if level0_count >= self.config.level0_trigger {
130            return Some(0);
131        }
132        
133        // Check other levels by size
134        for level in 1..self.config.max_levels {
135            let level_size: u64 = manifest.sstables_at_level(level)
136                .iter()
137                .map(|s| s.file_size)
138                .sum();
139            
140            let max_size = self.max_size_for_level(level);
141            if level_size > max_size {
142                return Some(level);
143            }
144        }
145        
146        None
147    }
148
149    /// Get the maximum size for a level
150    fn max_size_for_level(&self, level: u32) -> u64 {
151        if level == 0 {
152            return u64::MAX; // Level 0 is count-based, not size-based
153        }
154        
155        let mut size = self.config.level1_max_size;
156        for _ in 1..level {
157            size *= self.config.level_multiplier as u64;
158        }
159        size
160    }
161
162    /// Generate a unique SSTable path
163    fn next_sstable_path(&self, level: u32) -> PathBuf {
164        let counter = self.file_counter.fetch_add(1, AtomicOrdering::SeqCst);
165        let timestamp = std::time::SystemTime::now()
166            .duration_since(std::time::UNIX_EPOCH)
167            .unwrap_or_default()
168            .as_millis();
169        
170        self.dir.join("sst").join(format!(
171            "L{}_{}_{}.sst",
172            level, timestamp, counter
173        ))
174    }
175
176    /// Compact level 0 to level 1
177    pub fn compact_level0(&mut self, manifest: &mut Manifest) -> Result<()> {
178        let level0_sstables = manifest.sstables_at_level(0);
179        if level0_sstables.is_empty() {
180            return Ok(());
181        }
182        
183        // Collect all level 0 SSTable paths
184        let input_paths: Vec<PathBuf> = level0_sstables
185            .iter()
186            .map(|s| PathBuf::from(&s.path))
187            .collect();
188        
189        // Find overlapping level 1 SSTables
190        let level1_sstables = manifest.sstables_at_level(1);
191        
192        // For simplicity, merge all level 0 with overlapping level 1
193        let mut all_inputs: Vec<PathBuf> = input_paths.clone();
194        
195        // Get min/max key range from level 0
196        let min_key: Vec<u8> = level0_sstables.iter()
197            .map(|s| s.min_key.clone())
198            .min()
199            .unwrap_or_default();
200        let max_key: Vec<u8> = level0_sstables.iter()
201            .map(|s| s.max_key.clone())
202            .max()
203            .unwrap_or_default();
204        
205        // Add overlapping level 1 SSTables
206        for sst in level1_sstables {
207            if sst.max_key >= min_key && sst.min_key <= max_key {
208                all_inputs.push(PathBuf::from(&sst.path));
209            }
210        }
211        
212        // Perform the merge
213        let outputs = self.merge_sstables(&all_inputs, 1)?;
214        
215        // Update manifest
216        manifest.record_compaction(0, all_inputs.clone(), outputs)?;
217        
218        // Delete old files
219        for path in all_inputs {
220            let _ = delete_sstable(&path);
221        }
222        
223        self.stats.compaction_count += 1;
224        
225        Ok(())
226    }
227
228    /// Merge multiple SSTables into new SSTables at the target level
229    fn merge_sstables(&mut self, inputs: &[PathBuf], target_level: u32) -> Result<Vec<SSTableMeta>> {
230        if inputs.is_empty() {
231            return Ok(Vec::new());
232        }
233        
234        // Create SST directory if needed
235        let sst_dir = self.dir.join("sst");
236        std::fs::create_dir_all(&sst_dir)?;
237        
238        // Open all input SSTables
239        let mut readers: Vec<SSTableReader> = Vec::new();
240        for path in inputs {
241            if path.exists() {
242                match SSTableReader::open(path) {
243                    Ok(reader) => {
244                        self.stats.bytes_read += reader.metadata().file_size;
245                        readers.push(reader);
246                    }
247                    Err(_) => continue, // Skip corrupted files
248                }
249            }
250        }
251        
252        if readers.is_empty() {
253            return Ok(Vec::new());
254        }
255        
256        // Initialize merge heap
257        let mut heap: BinaryHeap<MergeEntry> = BinaryHeap::new();
258        let mut iterators: Vec<_> = readers.iter_mut()
259            .map(|r| r.iter())
260            .collect::<Result<Vec<_>>>()?;
261        
262        // Prime the heap with first entry from each SSTable
263        for (idx, iter) in iterators.iter_mut().enumerate() {
264            if let Some(entry) = iter.next_entry()? {
265                heap.push(MergeEntry {
266                    key: entry.key.clone(),
267                    entry,
268                    source_idx: idx,
269                });
270            }
271        }
272        
273        // Output SSTables
274        let mut outputs: Vec<SSTableMeta> = Vec::new();
275        let mut current_writer: Option<SSTableWriter> = None;
276        let mut current_size: u64 = 0;
277        let mut last_key: Option<Vec<u8>> = None;
278        
279        while let Some(merge_entry) = heap.pop() {
280            // Skip duplicate keys (keep the newest - higher source_idx)
281            if last_key.as_ref() == Some(&merge_entry.key) {
282                self.stats.entries_removed += 1;
283                // Advance the iterator that provided this entry
284                if let Some(next) = iterators[merge_entry.source_idx].next_entry()? {
285                    heap.push(MergeEntry {
286                        key: next.key.clone(),
287                        entry: next,
288                        source_idx: merge_entry.source_idx,
289                    });
290                }
291                continue;
292            }
293            
294            // Start a new SSTable if needed
295            if current_writer.is_none() || current_size >= self.config.target_file_size {
296                // Finish current writer
297                if let Some(writer) = current_writer.take() {
298                    let meta = writer.finish()?;
299                    self.stats.bytes_written += meta.file_size;
300                    outputs.push(meta);
301                }
302                
303                // Start new writer
304                let path = self.next_sstable_path(target_level);
305                current_writer = Some(SSTableWriter::new(&path)?);
306                current_size = 0;
307            }
308            
309            // Write entry
310            if let Some(ref mut writer) = current_writer {
311                let entry_size = merge_entry.entry.key.len() + merge_entry.entry.value.len() + 10;
312                writer.add(merge_entry.entry.clone())?;
313                current_size += entry_size as u64;
314            }
315            
316            last_key = Some(merge_entry.key);
317            
318            // Advance the iterator that provided this entry
319            if let Some(next) = iterators[merge_entry.source_idx].next_entry()? {
320                heap.push(MergeEntry {
321                    key: next.key.clone(),
322                    entry: next,
323                    source_idx: merge_entry.source_idx,
324                });
325            }
326        }
327        
328        // Finish last writer
329        if let Some(writer) = current_writer {
330            let meta = writer.finish()?;
331            self.stats.bytes_written += meta.file_size;
332            outputs.push(meta);
333        }
334        
335        // Update level in output metadata
336        let outputs: Vec<SSTableMeta> = outputs.into_iter()
337            .map(|mut m| {
338                m.level = target_level;
339                m
340            })
341            .collect();
342        
343        Ok(outputs)
344    }
345
346    /// Get compaction statistics
347    pub fn stats(&self) -> &CompactionStats {
348        &self.stats
349    }
350
351    /// Run a single compaction pass
352    pub fn run_once(&mut self, manifest: &mut Manifest) -> Result<bool> {
353        if self.stop_flag.load(AtomicOrdering::Relaxed) {
354            return Ok(false);
355        }
356        
357        if let Some(level) = self.pick_compaction_level(manifest) {
358            if level == 0 {
359                self.compact_level0(manifest)?;
360                return Ok(true);
361            }
362            // TODO: Implement higher level compaction
363        }
364        
365        Ok(false)
366    }
367}
368
369#[cfg(test)]
370mod tests {
371    use super::*;
372    use crate::sstable::SSTableWriter;
373    use tempfile::tempdir;
374
375    #[test]
376    fn test_compaction_config_default() {
377        let config = CompactionConfig::default();
378        assert_eq!(config.level0_trigger, 4);
379        assert_eq!(config.max_levels, 7);
380    }
381
382    #[test]
383    fn test_merge_entry_ordering() {
384        let e1 = MergeEntry {
385            key: b"a".to_vec(),
386            entry: SSTableEntry::value(b"a".to_vec(), b"1".to_vec()),
387            source_idx: 0,
388        };
389        let e2 = MergeEntry {
390            key: b"b".to_vec(),
391            entry: SSTableEntry::value(b"b".to_vec(), b"2".to_vec()),
392            source_idx: 0,
393        };
394        
395        // In a max-heap, larger values come first
396        // We want min-heap behavior, so e1 (key "a") should be "greater"
397        assert!(e1 > e2);
398    }
399
400    #[test]
401    fn test_needs_compaction() {
402        let dir = tempdir().unwrap();
403        let config = CompactionConfig {
404            level0_trigger: 2,
405            ..Default::default()
406        };
407        let worker = CompactionWorker::new(dir.path(), config);
408        let mut manifest = Manifest::open(dir.path()).unwrap();
409        
410        assert!(!worker.needs_compaction(&manifest));
411        
412        // Add level 0 SSTables
413        for i in 0..2 {
414            let meta = SSTableMeta {
415                path: PathBuf::from(format!("test{}.sst", i)),
416                min_key: vec![],
417                max_key: vec![],
418                entry_count: 0,
419                file_size: 0,
420                level: 0,
421                sequence: 0,
422            };
423            manifest.add_sstable(&meta).unwrap();
424        }
425        
426        assert!(worker.needs_compaction(&manifest));
427    }
428
429    #[test]
430    fn test_merge_sstables() {
431        let dir = tempdir().unwrap();
432        let sst_dir = dir.path().join("sst");
433        std::fs::create_dir_all(&sst_dir).unwrap();
434        
435        // Create two SSTables with overlapping keys
436        let path1 = sst_dir.join("test1.sst");
437        let mut writer1 = SSTableWriter::new(&path1).unwrap();
438        writer1.add(SSTableEntry::value(b"a".to_vec(), b"1".to_vec())).unwrap();
439        writer1.add(SSTableEntry::value(b"c".to_vec(), b"3".to_vec())).unwrap();
440        writer1.finish().unwrap();
441        
442        let path2 = sst_dir.join("test2.sst");
443        let mut writer2 = SSTableWriter::new(&path2).unwrap();
444        writer2.add(SSTableEntry::value(b"b".to_vec(), b"2".to_vec())).unwrap();
445        writer2.add(SSTableEntry::value(b"c".to_vec(), b"3-new".to_vec())).unwrap(); // Overwrites
446        writer2.finish().unwrap();
447        
448        // Merge
449        let config = CompactionConfig::default();
450        let mut worker = CompactionWorker::new(dir.path(), config);
451        let outputs = worker.merge_sstables(&[path1, path2], 1).unwrap();
452        
453        assert!(!outputs.is_empty());
454        
455        // Verify merged content
456        let mut reader = SSTableReader::open(&outputs[0].path).unwrap();
457        assert_eq!(reader.get(b"a").unwrap().unwrap().value, b"1".to_vec());
458        assert_eq!(reader.get(b"b").unwrap().unwrap().value, b"2".to_vec());
459        // "c" should have the newer value from the second SSTable
460        assert_eq!(reader.get(b"c").unwrap().unwrap().value, b"3-new".to_vec());
461    }
462}