amaters_core/storage/
compaction.rs

1//! Compaction strategy for LSM-Tree
2//!
3//! Implements level-based compaction to:
4//! - Merge SSTables from L0 to L1
5//! - Merge overlapping SSTables within levels
6//! - Remove tombstones (deleted keys)
7//! - Maintain level size targets
8
9use crate::error::{AmateRSError, ErrorContext, Result};
10use crate::storage::{SSTableConfig, SSTableMetadata, SSTableReader, SSTableWriter};
11use crate::types::{CipherBlob, Key};
12use std::collections::BTreeMap;
13use std::path::{Path, PathBuf};
14
15/// Compaction strategy
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum CompactionStrategy {
18    /// Level-based compaction (default)
19    LevelBased,
20    /// Size-tiered compaction
21    SizeTiered,
22}
23
24/// Compaction configuration
25#[derive(Debug, Clone)]
26pub struct CompactionConfig {
27    /// Strategy to use
28    pub strategy: CompactionStrategy,
29    /// L0 compaction threshold (number of SSTables)
30    pub l0_threshold: usize,
31    /// Level size multiplier
32    pub level_multiplier: usize,
33    /// Base level size (L1 target size in bytes)
34    pub base_level_size: u64,
35    /// Maximum compaction size (bytes)
36    pub max_compaction_bytes: u64,
37}
38
39impl Default for CompactionConfig {
40    fn default() -> Self {
41        Self {
42            strategy: CompactionStrategy::LevelBased,
43            l0_threshold: 4,
44            level_multiplier: 10,
45            base_level_size: 10 * 1024 * 1024,       // 10 MB
46            max_compaction_bytes: 100 * 1024 * 1024, // 100 MB
47        }
48    }
49}
50
51/// Compaction task
52#[derive(Debug, Clone)]
53pub struct CompactionTask {
54    /// Source level
55    pub source_level: usize,
56    /// Target level
57    pub target_level: usize,
58    /// SSTables to compact from source level
59    pub source_sstables: Vec<SSTableMetadata>,
60    /// SSTables to merge from target level (if any)
61    pub target_sstables: Vec<SSTableMetadata>,
62}
63
64/// Compaction statistics
65#[derive(Debug, Clone, Default)]
66pub struct CompactionStats {
67    /// Total compactions performed
68    pub total_compactions: u64,
69    /// Total bytes read during compaction
70    pub bytes_read: u64,
71    /// Total bytes written during compaction
72    pub bytes_written: u64,
73    /// Total keys processed
74    pub keys_processed: u64,
75    /// Total tombstones removed
76    pub tombstones_removed: u64,
77}
78
79/// Compaction planner
80pub struct CompactionPlanner {
81    config: CompactionConfig,
82}
83
84impl CompactionPlanner {
85    /// Create a new compaction planner
86    pub fn new(config: CompactionConfig) -> Self {
87        Self { config }
88    }
89
90    /// Check if L0 needs compaction
91    pub fn needs_l0_compaction(&self, l0_sstable_count: usize) -> bool {
92        l0_sstable_count >= self.config.l0_threshold
93    }
94
95    /// Check if a level needs compaction
96    pub fn needs_level_compaction(&self, level: usize, level_size: u64) -> bool {
97        if level == 0 {
98            return false; // L0 uses count-based threshold
99        }
100
101        let target_size = self.level_target_size(level);
102        level_size > target_size
103    }
104
105    /// Calculate target size for a level
106    pub fn level_target_size(&self, level: usize) -> u64 {
107        if level == 0 {
108            return 0; // L0 doesn't have a size target
109        }
110
111        self.config.base_level_size * (self.config.level_multiplier as u64).pow(level as u32 - 1)
112    }
113
114    /// Plan a compaction task
115    pub fn plan_compaction(
116        &self,
117        source_level: usize,
118        source_sstables: Vec<SSTableMetadata>,
119        target_sstables: Vec<SSTableMetadata>,
120    ) -> Option<CompactionTask> {
121        if source_sstables.is_empty() {
122            return None;
123        }
124
125        // For L0 → L1, take all L0 SSTables
126        let source_to_compact = if source_level == 0 {
127            source_sstables
128        } else {
129            // For L1+, select SSTables based on size
130            self.select_sstables_for_compaction(source_sstables)
131        };
132
133        if source_to_compact.is_empty() {
134            return None;
135        }
136
137        // Find overlapping SSTables in target level
138        let target_to_merge = self.find_overlapping_sstables(&source_to_compact, &target_sstables);
139
140        Some(CompactionTask {
141            source_level,
142            target_level: source_level + 1,
143            source_sstables: source_to_compact,
144            target_sstables: target_to_merge,
145        })
146    }
147
148    /// Select SSTables for compaction (L1+)
149    fn select_sstables_for_compaction(
150        &self,
151        sstables: Vec<SSTableMetadata>,
152    ) -> Vec<SSTableMetadata> {
153        // Simple strategy: compact oldest SSTables first
154        // In production, this should consider:
155        // - SSTable overlap
156        // - Size ratios
157        // - Read frequency (hot/cold data)
158
159        let mut selected = Vec::new();
160        let mut total_size = 0u64;
161
162        for sstable in sstables {
163            if total_size + sstable.file_size > self.config.max_compaction_bytes {
164                break;
165            }
166
167            total_size += sstable.file_size;
168            selected.push(sstable);
169
170            // Compact at least 2 SSTables
171            if selected.len() >= 2 {
172                break;
173            }
174        }
175
176        selected
177    }
178
179    /// Find overlapping SSTables in target level
180    fn find_overlapping_sstables(
181        &self,
182        source_sstables: &[SSTableMetadata],
183        target_sstables: &[SSTableMetadata],
184    ) -> Vec<SSTableMetadata> {
185        if source_sstables.is_empty() {
186            return Vec::new();
187        }
188
189        // Find min and max keys from source SSTables (safe: checked is_empty above)
190        let min_key = source_sstables
191            .iter()
192            .map(|s| &s.min_key)
193            .min()
194            .expect("source_sstables is non-empty");
195
196        let max_key = source_sstables
197            .iter()
198            .map(|s| &s.max_key)
199            .max()
200            .expect("source_sstables is non-empty");
201
202        // Find all target SSTables that overlap with this range
203        target_sstables
204            .iter()
205            .filter(|sstable| {
206                // Check if ranges overlap
207                !(&sstable.max_key < min_key || &sstable.min_key > max_key)
208            })
209            .cloned()
210            .collect()
211    }
212}
213
214/// Compaction executor
215pub struct CompactionExecutor {
216    config: SSTableConfig,
217    stats: CompactionStats,
218}
219
220impl CompactionExecutor {
221    /// Create a new compaction executor
222    pub fn new(config: SSTableConfig) -> Self {
223        Self {
224            config,
225            stats: CompactionStats::default(),
226        }
227    }
228
229    /// Execute a compaction task
230    pub fn execute_compaction(
231        &mut self,
232        task: CompactionTask,
233        output_dir: &Path,
234        next_sstable_id: &mut u64,
235    ) -> Result<Vec<SSTableMetadata>> {
236        // Collect all entries from source and target SSTables
237        let mut all_entries = BTreeMap::new();
238
239        // Read from source SSTables
240        for sstable in &task.source_sstables {
241            self.read_sstable_entries(&sstable.path, &mut all_entries)?;
242            self.stats.bytes_read += sstable.file_size;
243        }
244
245        // Read from target SSTables (overlapping)
246        for sstable in &task.target_sstables {
247            self.read_sstable_entries(&sstable.path, &mut all_entries)?;
248            self.stats.bytes_read += sstable.file_size;
249        }
250
251        // Write merged SSTables to target level
252        let output_sstables = self.write_compacted_sstables(
253            all_entries,
254            task.target_level,
255            output_dir,
256            next_sstable_id,
257        )?;
258
259        self.stats.total_compactions += 1;
260
261        Ok(output_sstables)
262    }
263
264    /// Read entries from an SSTable
265    fn read_sstable_entries(
266        &mut self,
267        path: &Path,
268        entries: &mut BTreeMap<Key, Option<CipherBlob>>,
269    ) -> Result<()> {
270        let reader = SSTableReader::open(path)?;
271        let sstable_entries = reader.iter()?;
272
273        for (key, value) in sstable_entries {
274            self.stats.keys_processed += 1;
275            // Later entries overwrite earlier ones (LSM semantics)
276            entries.insert(key, Some(value));
277        }
278
279        Ok(())
280    }
281
282    /// Write compacted entries to new SSTables
283    fn write_compacted_sstables(
284        &mut self,
285        entries: BTreeMap<Key, Option<CipherBlob>>,
286        target_level: usize,
287        output_dir: &Path,
288        next_id: &mut u64,
289    ) -> Result<Vec<SSTableMetadata>> {
290        let mut output_sstables = Vec::new();
291        let mut current_writer: Option<SSTableWriter> = None;
292        let mut current_path: Option<PathBuf> = None;
293        let mut current_size = 0usize;
294        let mut current_min_key: Option<Key> = None;
295        let mut current_max_key: Option<Key> = None;
296        let mut current_entries = 0usize;
297
298        const MAX_SSTABLE_SIZE: usize = 2 * 1024 * 1024; // 2 MB per SSTable
299
300        for (key, value_opt) in entries {
301            // Skip tombstones (None values)
302            let value = match value_opt {
303                Some(v) => v,
304                None => {
305                    self.stats.tombstones_removed += 1;
306                    continue;
307                }
308            };
309
310            // Start new SSTable if needed
311            if current_writer.is_none() || current_size >= MAX_SSTABLE_SIZE {
312                // Finish previous SSTable
313                if let Some(writer) = current_writer.take() {
314                    writer.finish()?;
315
316                    if let (Some(path), Some(min_key), Some(max_key)) = (
317                        current_path.take(),
318                        current_min_key.take(),
319                        current_max_key.take(),
320                    ) {
321                        let file_size = std::fs::metadata(&path)
322                            .map_err(|e| {
323                                AmateRSError::StorageIntegrity(ErrorContext::new(format!(
324                                    "Failed to get SSTable size: {}",
325                                    e
326                                )))
327                            })?
328                            .len();
329
330                        self.stats.bytes_written += file_size;
331
332                        output_sstables.push(SSTableMetadata {
333                            path,
334                            min_key,
335                            max_key,
336                            num_entries: current_entries,
337                            file_size,
338                            level: target_level,
339                        });
340                    }
341                }
342
343                // Start new SSTable
344                let id = *next_id;
345                *next_id += 1;
346                let path = output_dir.join(format!("L{}_{:08}.sst", target_level, id));
347                let writer = SSTableWriter::new(&path, self.config.clone())?;
348
349                current_writer = Some(writer);
350                current_path = Some(path);
351                current_size = 0;
352                current_min_key = None;
353                current_max_key = None;
354                current_entries = 0;
355            }
356
357            // Write entry
358            if let Some(ref mut writer) = current_writer {
359                let entry_size = 16 + key.as_bytes().len() + value.as_bytes().len();
360                writer.add(key.clone(), value)?;
361                current_size += entry_size;
362                current_entries += 1;
363
364                if current_min_key.is_none() {
365                    current_min_key = Some(key.clone());
366                }
367                current_max_key = Some(key);
368            }
369        }
370
371        // Finish final SSTable
372        if let Some(writer) = current_writer {
373            writer.finish()?;
374
375            if let (Some(path), Some(min_key), Some(max_key)) =
376                (current_path, current_min_key, current_max_key)
377            {
378                let file_size = std::fs::metadata(&path)
379                    .map_err(|e| {
380                        AmateRSError::StorageIntegrity(ErrorContext::new(format!(
381                            "Failed to get SSTable size: {}",
382                            e
383                        )))
384                    })?
385                    .len();
386
387                self.stats.bytes_written += file_size;
388
389                output_sstables.push(SSTableMetadata {
390                    path,
391                    min_key,
392                    max_key,
393                    num_entries: current_entries,
394                    file_size,
395                    level: target_level,
396                });
397            }
398        }
399
400        Ok(output_sstables)
401    }
402
403    /// Get compaction statistics
404    pub fn stats(&self) -> &CompactionStats {
405        &self.stats
406    }
407}
408
409#[cfg(test)]
410mod tests {
411    use super::*;
412
413    #[test]
414    fn test_compaction_planner_l0_threshold() {
415        let config = CompactionConfig::default();
416        let planner = CompactionPlanner::new(config);
417
418        assert!(!planner.needs_l0_compaction(3));
419        assert!(planner.needs_l0_compaction(4));
420        assert!(planner.needs_l0_compaction(5));
421    }
422
423    #[test]
424    fn test_compaction_planner_level_sizes() {
425        let config = CompactionConfig {
426            base_level_size: 10 * 1024 * 1024, // 10 MB
427            level_multiplier: 10,
428            ..Default::default()
429        };
430        let planner = CompactionPlanner::new(config);
431
432        assert_eq!(planner.level_target_size(1), 10 * 1024 * 1024); // 10 MB
433        assert_eq!(planner.level_target_size(2), 100 * 1024 * 1024); // 100 MB
434        assert_eq!(planner.level_target_size(3), 1000 * 1024 * 1024); // 1 GB
435    }
436
437    #[test]
438    fn test_compaction_planner_needs_compaction() {
439        let config = CompactionConfig::default();
440        let planner = CompactionPlanner::new(config);
441
442        // L0 doesn't use size-based threshold
443        assert!(!planner.needs_level_compaction(0, 100 * 1024 * 1024));
444
445        // L1 target is 10 MB
446        assert!(!planner.needs_level_compaction(1, 5 * 1024 * 1024));
447        assert!(planner.needs_level_compaction(1, 15 * 1024 * 1024));
448    }
449
450    #[test]
451    fn test_find_overlapping_sstables() {
452        let config = CompactionConfig::default();
453        let planner = CompactionPlanner::new(config);
454
455        let source = vec![SSTableMetadata {
456            path: PathBuf::from("s1.sst"),
457            min_key: Key::from_str("key_005"),
458            max_key: Key::from_str("key_015"),
459            num_entries: 10,
460            file_size: 1000,
461            level: 0,
462        }];
463
464        let target = vec![
465            SSTableMetadata {
466                path: PathBuf::from("t1.sst"),
467                min_key: Key::from_str("key_000"),
468                max_key: Key::from_str("key_010"),
469                num_entries: 10,
470                file_size: 1000,
471                level: 1,
472            },
473            SSTableMetadata {
474                path: PathBuf::from("t2.sst"),
475                min_key: Key::from_str("key_020"),
476                max_key: Key::from_str("key_030"),
477                num_entries: 10,
478                file_size: 1000,
479                level: 1,
480            },
481        ];
482
483        let overlapping = planner.find_overlapping_sstables(&source, &target);
484
485        assert_eq!(overlapping.len(), 1);
486        assert_eq!(overlapping[0].path, PathBuf::from("t1.sst"));
487    }
488}