rustlite_storage/
compaction.rs

1//! Compaction - Background merging and level management
2//!
3//! Compaction merges SSTables to reduce read amplification and
4//! reclaim space from deleted entries (tombstones).
5
6use crate::manifest::Manifest;
7use crate::sstable::{delete_sstable, SSTableEntry, SSTableMeta, SSTableReader, SSTableWriter};
8use rustlite_core::Result;
9use std::cmp::Ordering;
10use std::collections::BinaryHeap;
11use std::path::{Path, PathBuf};
12use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
13use std::sync::Arc;
14
15/// Compaction configuration
16#[derive(Debug, Clone)]
17pub struct CompactionConfig {
18    /// Maximum number of SSTables at level 0 before triggering compaction
19    pub level0_trigger: usize,
20    /// Size multiplier between levels (e.g., 10 means level N+1 is 10x larger)
21    pub level_multiplier: usize,
22    /// Maximum size for level 1 in bytes
23    pub level1_max_size: u64,
24    /// Maximum number of levels
25    pub max_levels: u32,
26    /// Target file size for output SSTables
27    pub target_file_size: u64,
28}
29
30impl Default for CompactionConfig {
31    fn default() -> Self {
32        Self {
33            level0_trigger: 4,
34            level_multiplier: 10,
35            level1_max_size: 10 * 1024 * 1024, // 10MB
36            max_levels: 7,
37            target_file_size: 2 * 1024 * 1024, // 2MB
38        }
39    }
40}
41
42/// Statistics for compaction
43#[derive(Debug, Clone, Default)]
44pub struct CompactionStats {
45    /// Total bytes read during compaction
46    pub bytes_read: u64,
47    /// Total bytes written during compaction
48    pub bytes_written: u64,
49    /// Number of compactions performed
50    pub compaction_count: u64,
51    /// Number of entries removed (tombstones + overwritten)
52    pub entries_removed: u64,
53}
54
55/// Entry for merge iterator (with ordering)
56#[derive(Debug)]
57struct MergeEntry {
58    key: Vec<u8>,
59    entry: SSTableEntry,
60    source_idx: usize,
61}
62
63impl PartialEq for MergeEntry {
64    fn eq(&self, other: &Self) -> bool {
65        self.key == other.key && self.source_idx == other.source_idx
66    }
67}
68
69impl Eq for MergeEntry {}
70
71impl PartialOrd for MergeEntry {
72    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
73        Some(self.cmp(other))
74    }
75}
76
77impl Ord for MergeEntry {
78    fn cmp(&self, other: &Self) -> Ordering {
79        // BinaryHeap is a max-heap, so we reverse key comparison for min-heap behavior
80        // For equal keys, higher source_idx (newer files) should come first
81        match other.key.cmp(&self.key) {
82            Ordering::Equal => self.source_idx.cmp(&other.source_idx),
83            ord => ord,
84        }
85    }
86}
87
88/// Compaction worker
89pub struct CompactionWorker {
90    /// Database directory
91    dir: PathBuf,
92    /// Configuration
93    config: CompactionConfig,
94    /// Statistics
95    stats: CompactionStats,
96    /// Counter for generating unique SSTable names
97    file_counter: AtomicU64,
98    /// Flag to stop compaction
99    stop_flag: Arc<AtomicBool>,
100}
101
102impl CompactionWorker {
103    /// Create a new compaction worker
104    pub fn new(dir: impl AsRef<Path>, config: CompactionConfig) -> Self {
105        Self {
106            dir: dir.as_ref().to_path_buf(),
107            config,
108            stats: CompactionStats::default(),
109            file_counter: AtomicU64::new(0),
110            stop_flag: Arc::new(AtomicBool::new(false)),
111        }
112    }
113
114    /// Get the stop flag for external control
115    pub fn stop_flag(&self) -> Arc<AtomicBool> {
116        Arc::clone(&self.stop_flag)
117    }
118
119    /// Check if compaction is needed for level 0
120    pub fn needs_compaction(&self, manifest: &Manifest) -> bool {
121        let level0_count = manifest.sstables_at_level(0).len();
122        level0_count >= self.config.level0_trigger
123    }
124
125    /// Check which level needs compaction
126    pub fn pick_compaction_level(&self, manifest: &Manifest) -> Option<u32> {
127        // Check level 0 first
128        let level0_count = manifest.sstables_at_level(0).len();
129        if level0_count >= self.config.level0_trigger {
130            return Some(0);
131        }
132
133        // Check other levels by size
134        for level in 1..self.config.max_levels {
135            let level_size: u64 = manifest
136                .sstables_at_level(level)
137                .iter()
138                .map(|s| s.file_size)
139                .sum();
140
141            let max_size = self.max_size_for_level(level);
142            if level_size > max_size {
143                return Some(level);
144            }
145        }
146
147        None
148    }
149
150    /// Get the maximum size for a level
151    fn max_size_for_level(&self, level: u32) -> u64 {
152        if level == 0 {
153            return u64::MAX; // Level 0 is count-based, not size-based
154        }
155
156        let mut size = self.config.level1_max_size;
157        for _ in 1..level {
158            size *= self.config.level_multiplier as u64;
159        }
160        size
161    }
162
163    /// Generate a unique SSTable path
164    fn next_sstable_path(&self, level: u32) -> PathBuf {
165        let counter = self.file_counter.fetch_add(1, AtomicOrdering::SeqCst);
166        let timestamp = std::time::SystemTime::now()
167            .duration_since(std::time::UNIX_EPOCH)
168            .unwrap_or_default()
169            .as_millis();
170
171        self.dir
172            .join("sst")
173            .join(format!("L{}_{}_{}.sst", level, timestamp, counter))
174    }
175
176    /// Compact level 0 to level 1
177    pub fn compact_level0(&mut self, manifest: &mut Manifest) -> Result<()> {
178        let level0_sstables = manifest.sstables_at_level(0);
179        if level0_sstables.is_empty() {
180            return Ok(());
181        }
182
183        // Collect all level 0 SSTable paths
184        let input_paths: Vec<PathBuf> = level0_sstables
185            .iter()
186            .map(|s| PathBuf::from(&s.path))
187            .collect();
188
189        // Find overlapping level 1 SSTables
190        let level1_sstables = manifest.sstables_at_level(1);
191
192        // For simplicity, merge all level 0 with overlapping level 1
193        let mut all_inputs: Vec<PathBuf> = input_paths.clone();
194
195        // Get min/max key range from level 0
196        let min_key: Vec<u8> = level0_sstables
197            .iter()
198            .map(|s| s.min_key.clone())
199            .min()
200            .unwrap_or_default();
201        let max_key: Vec<u8> = level0_sstables
202            .iter()
203            .map(|s| s.max_key.clone())
204            .max()
205            .unwrap_or_default();
206
207        // Add overlapping level 1 SSTables
208        for sst in level1_sstables {
209            if sst.max_key >= min_key && sst.min_key <= max_key {
210                all_inputs.push(PathBuf::from(&sst.path));
211            }
212        }
213
214        // Perform the merge
215        let outputs = self.merge_sstables(&all_inputs, 1)?;
216
217        // Update manifest
218        manifest.record_compaction(0, all_inputs.clone(), outputs)?;
219
220        // Delete old files
221        for path in all_inputs {
222            let _ = delete_sstable(&path);
223        }
224
225        self.stats.compaction_count += 1;
226
227        Ok(())
228    }
229
230    /// Merge multiple SSTables into new SSTables at the target level
231    fn merge_sstables(
232        &mut self,
233        inputs: &[PathBuf],
234        target_level: u32,
235    ) -> Result<Vec<SSTableMeta>> {
236        if inputs.is_empty() {
237            return Ok(Vec::new());
238        }
239
240        // Create SST directory if needed
241        let sst_dir = self.dir.join("sst");
242        std::fs::create_dir_all(&sst_dir)?;
243
244        // Open all input SSTables
245        let mut readers: Vec<SSTableReader> = Vec::new();
246        for path in inputs {
247            if path.exists() {
248                match SSTableReader::open(path) {
249                    Ok(reader) => {
250                        self.stats.bytes_read += reader.metadata().file_size;
251                        readers.push(reader);
252                    }
253                    Err(_) => continue, // Skip corrupted files
254                }
255            }
256        }
257
258        if readers.is_empty() {
259            return Ok(Vec::new());
260        }
261
262        // Initialize merge heap
263        let mut heap: BinaryHeap<MergeEntry> = BinaryHeap::new();
264        let mut iterators: Vec<_> = readers
265            .iter_mut()
266            .map(|r| r.iter())
267            .collect::<Result<Vec<_>>>()?;
268
269        // Prime the heap with first entry from each SSTable
270        for (idx, iter) in iterators.iter_mut().enumerate() {
271            if let Some(entry) = iter.next_entry()? {
272                heap.push(MergeEntry {
273                    key: entry.key.clone(),
274                    entry,
275                    source_idx: idx,
276                });
277            }
278        }
279
280        // Output SSTables
281        let mut outputs: Vec<SSTableMeta> = Vec::new();
282        let mut current_writer: Option<SSTableWriter> = None;
283        let mut current_size: u64 = 0;
284        let mut last_key: Option<Vec<u8>> = None;
285
286        while let Some(merge_entry) = heap.pop() {
287            // Skip duplicate keys (keep the newest - higher source_idx)
288            if last_key.as_ref() == Some(&merge_entry.key) {
289                self.stats.entries_removed += 1;
290                // Advance the iterator that provided this entry
291                if let Some(next) = iterators[merge_entry.source_idx].next_entry()? {
292                    heap.push(MergeEntry {
293                        key: next.key.clone(),
294                        entry: next,
295                        source_idx: merge_entry.source_idx,
296                    });
297                }
298                continue;
299            }
300
301            // Start a new SSTable if needed
302            if current_writer.is_none() || current_size >= self.config.target_file_size {
303                // Finish current writer
304                if let Some(writer) = current_writer.take() {
305                    let meta = writer.finish()?;
306                    self.stats.bytes_written += meta.file_size;
307                    outputs.push(meta);
308                }
309
310                // Start new writer
311                let path = self.next_sstable_path(target_level);
312                current_writer = Some(SSTableWriter::new(&path)?);
313                current_size = 0;
314            }
315
316            // Write entry
317            if let Some(ref mut writer) = current_writer {
318                let entry_size = merge_entry.entry.key.len() + merge_entry.entry.value.len() + 10;
319                writer.add(merge_entry.entry.clone())?;
320                current_size += entry_size as u64;
321            }
322
323            last_key = Some(merge_entry.key);
324
325            // Advance the iterator that provided this entry
326            if let Some(next) = iterators[merge_entry.source_idx].next_entry()? {
327                heap.push(MergeEntry {
328                    key: next.key.clone(),
329                    entry: next,
330                    source_idx: merge_entry.source_idx,
331                });
332            }
333        }
334
335        // Finish last writer
336        if let Some(writer) = current_writer {
337            let meta = writer.finish()?;
338            self.stats.bytes_written += meta.file_size;
339            outputs.push(meta);
340        }
341
342        // Update level in output metadata
343        let outputs: Vec<SSTableMeta> = outputs
344            .into_iter()
345            .map(|mut m| {
346                m.level = target_level;
347                m
348            })
349            .collect();
350
351        Ok(outputs)
352    }
353
354    /// Get compaction statistics
355    pub fn stats(&self) -> &CompactionStats {
356        &self.stats
357    }
358
359    /// Run a single compaction pass
360    pub fn run_once(&mut self, manifest: &mut Manifest) -> Result<bool> {
361        if self.stop_flag.load(AtomicOrdering::Relaxed) {
362            return Ok(false);
363        }
364
365        if let Some(level) = self.pick_compaction_level(manifest) {
366            if level == 0 {
367                self.compact_level0(manifest)?;
368                return Ok(true);
369            }
370            // TODO: Implement higher level compaction
371        }
372
373        Ok(false)
374    }
375}
376
377#[cfg(test)]
378mod tests {
379    use super::*;
380    use crate::sstable::SSTableWriter;
381    use tempfile::tempdir;
382
383    #[test]
384    fn test_compaction_config_default() {
385        let config = CompactionConfig::default();
386        assert_eq!(config.level0_trigger, 4);
387        assert_eq!(config.max_levels, 7);
388    }
389
390    #[test]
391    fn test_merge_entry_ordering() {
392        let e1 = MergeEntry {
393            key: b"a".to_vec(),
394            entry: SSTableEntry::value(b"a".to_vec(), b"1".to_vec()),
395            source_idx: 0,
396        };
397        let e2 = MergeEntry {
398            key: b"b".to_vec(),
399            entry: SSTableEntry::value(b"b".to_vec(), b"2".to_vec()),
400            source_idx: 0,
401        };
402
403        // In a max-heap, larger values come first
404        // We want min-heap behavior, so e1 (key "a") should be "greater"
405        assert!(e1 > e2);
406    }
407
408    #[test]
409    fn test_needs_compaction() {
410        let dir = tempdir().unwrap();
411        let config = CompactionConfig {
412            level0_trigger: 2,
413            ..Default::default()
414        };
415        let worker = CompactionWorker::new(dir.path(), config);
416        let mut manifest = Manifest::open(dir.path()).unwrap();
417
418        assert!(!worker.needs_compaction(&manifest));
419
420        // Add level 0 SSTables
421        for i in 0..2 {
422            let meta = SSTableMeta {
423                path: PathBuf::from(format!("test{}.sst", i)),
424                min_key: vec![],
425                max_key: vec![],
426                entry_count: 0,
427                file_size: 0,
428                level: 0,
429                sequence: 0,
430            };
431            manifest.add_sstable(&meta).unwrap();
432        }
433
434        assert!(worker.needs_compaction(&manifest));
435    }
436
437    #[test]
438    fn test_merge_sstables() {
439        let dir = tempdir().unwrap();
440        let sst_dir = dir.path().join("sst");
441        std::fs::create_dir_all(&sst_dir).unwrap();
442
443        // Create two SSTables with overlapping keys
444        let path1 = sst_dir.join("test1.sst");
445        let mut writer1 = SSTableWriter::new(&path1).unwrap();
446        writer1
447            .add(SSTableEntry::value(b"a".to_vec(), b"1".to_vec()))
448            .unwrap();
449        writer1
450            .add(SSTableEntry::value(b"c".to_vec(), b"3".to_vec()))
451            .unwrap();
452        writer1.finish().unwrap();
453
454        let path2 = sst_dir.join("test2.sst");
455        let mut writer2 = SSTableWriter::new(&path2).unwrap();
456        writer2
457            .add(SSTableEntry::value(b"b".to_vec(), b"2".to_vec()))
458            .unwrap();
459        writer2
460            .add(SSTableEntry::value(b"c".to_vec(), b"3-new".to_vec()))
461            .unwrap(); // Overwrites
462        writer2.finish().unwrap();
463
464        // Merge
465        let config = CompactionConfig::default();
466        let mut worker = CompactionWorker::new(dir.path(), config);
467        let outputs = worker.merge_sstables(&[path1, path2], 1).unwrap();
468
469        assert!(!outputs.is_empty());
470
471        // Verify merged content
472        let mut reader = SSTableReader::open(&outputs[0].path).unwrap();
473        assert_eq!(reader.get(b"a").unwrap().unwrap().value, b"1".to_vec());
474        assert_eq!(reader.get(b"b").unwrap().unwrap().value, b"2".to_vec());
475        // "c" should have the newer value from the second SSTable
476        assert_eq!(reader.get(b"c").unwrap().unwrap().value, b"3-new".to_vec());
477    }
478}