Skip to main content

amaters_core/storage/
compaction.rs

1//! Compaction strategy for LSM-Tree
2//!
3//! Implements level-based and size-tiered compaction strategies to:
4//! - Merge SSTables from L0 to L1
5//! - Merge overlapping SSTables within levels
6//! - Remove tombstones (deleted keys) with TTL-based garbage collection
7//! - Maintain level size targets
8//! - Track compaction statistics with atomic counters
9//! - Throttle compaction write rate
10
11use crate::error::{AmateRSError, ErrorContext, Result};
12use crate::storage::{SSTableConfig, SSTableMetadata, SSTableReader, SSTableWriter};
13use crate::types::{CipherBlob, Key};
14use std::collections::BTreeMap;
15use std::path::{Path, PathBuf};
16use std::sync::Arc;
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::time::{Duration, Instant};
19
20/// Compaction strategy
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum CompactionStrategy {
23    /// Level-based compaction (default)
24    LevelBased,
25    /// Size-tiered compaction
26    SizeTiered,
27}
28
29/// Compaction configuration
30#[derive(Debug, Clone)]
31pub struct CompactionConfig {
32    /// Strategy to use
33    pub strategy: CompactionStrategy,
34    /// L0 compaction threshold (number of SSTables)
35    pub l0_threshold: usize,
36    /// Level size multiplier
37    pub level_multiplier: usize,
38    /// Base level size (L1 target size in bytes)
39    pub base_level_size: u64,
40    /// Maximum compaction size (bytes)
41    pub max_compaction_bytes: u64,
42    /// Minimum SSTable size for size-tiered compaction (bytes)
43    pub min_sstable_size: u64,
44    /// Size ratio for grouping SSTables in size-tiered compaction
45    /// SSTables within this ratio of each other are in the same tier
46    pub size_ratio: f64,
47    /// Minimum number of SSTables in a size tier to trigger compaction
48    pub min_tier_size: usize,
49    /// Maximum compaction write rate in bytes per second (0 = unlimited)
50    pub max_compaction_bytes_per_sec: u64,
51    /// Tombstone time-to-live: tombstones older than this are garbage collected
52    pub tombstone_ttl: Duration,
53}
54
55impl Default for CompactionConfig {
56    fn default() -> Self {
57        Self {
58            strategy: CompactionStrategy::LevelBased,
59            l0_threshold: 4,
60            level_multiplier: 10,
61            base_level_size: 10 * 1024 * 1024,       // 10 MB
62            max_compaction_bytes: 100 * 1024 * 1024, // 100 MB
63            min_sstable_size: 1024,                  // 1 KB
64            size_ratio: 2.0,
65            min_tier_size: 4,
66            max_compaction_bytes_per_sec: 0, // unlimited
67            tombstone_ttl: Duration::from_secs(7 * 24 * 3600), // 7 days
68        }
69    }
70}
71
72/// Compaction task
73#[derive(Debug, Clone)]
74pub struct CompactionTask {
75    /// Source level
76    pub source_level: usize,
77    /// Target level
78    pub target_level: usize,
79    /// SSTables to compact from source level
80    pub source_sstables: Vec<SSTableMetadata>,
81    /// SSTables to merge from target level (if any)
82    pub target_sstables: Vec<SSTableMetadata>,
83}
84
85/// Thread-safe compaction statistics tracked with atomic counters
86pub struct CompactionStats {
87    /// Total bytes read during compaction
88    pub bytes_read: AtomicU64,
89    /// Total bytes written during compaction
90    pub bytes_written: AtomicU64,
91    /// Total files merged
92    pub files_merged: AtomicU64,
93    /// Total compactions completed
94    pub compactions_completed: AtomicU64,
95    /// Total duration in milliseconds
96    pub total_duration_ms: AtomicU64,
97    /// Total keys processed
98    pub keys_processed: AtomicU64,
99    /// Total tombstones removed
100    pub tombstones_removed: AtomicU64,
101}
102
103impl Default for CompactionStats {
104    fn default() -> Self {
105        Self {
106            bytes_read: AtomicU64::new(0),
107            bytes_written: AtomicU64::new(0),
108            files_merged: AtomicU64::new(0),
109            compactions_completed: AtomicU64::new(0),
110            total_duration_ms: AtomicU64::new(0),
111            keys_processed: AtomicU64::new(0),
112            tombstones_removed: AtomicU64::new(0),
113        }
114    }
115}
116
117impl std::fmt::Debug for CompactionStats {
118    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119        f.debug_struct("CompactionStats")
120            .field("bytes_read", &self.bytes_read.load(Ordering::Relaxed))
121            .field("bytes_written", &self.bytes_written.load(Ordering::Relaxed))
122            .field("files_merged", &self.files_merged.load(Ordering::Relaxed))
123            .field(
124                "compactions_completed",
125                &self.compactions_completed.load(Ordering::Relaxed),
126            )
127            .field(
128                "total_duration_ms",
129                &self.total_duration_ms.load(Ordering::Relaxed),
130            )
131            .field(
132                "keys_processed",
133                &self.keys_processed.load(Ordering::Relaxed),
134            )
135            .field(
136                "tombstones_removed",
137                &self.tombstones_removed.load(Ordering::Relaxed),
138            )
139            .finish()
140    }
141}
142
143impl CompactionStats {
144    /// Create a snapshot of current stats as simple values
145    pub fn snapshot(&self) -> CompactionStatsSnapshot {
146        CompactionStatsSnapshot {
147            bytes_read: self.bytes_read.load(Ordering::Relaxed),
148            bytes_written: self.bytes_written.load(Ordering::Relaxed),
149            files_merged: self.files_merged.load(Ordering::Relaxed),
150            compactions_completed: self.compactions_completed.load(Ordering::Relaxed),
151            total_duration_ms: self.total_duration_ms.load(Ordering::Relaxed),
152            keys_processed: self.keys_processed.load(Ordering::Relaxed),
153            tombstones_removed: self.tombstones_removed.load(Ordering::Relaxed),
154        }
155    }
156}
157
158/// Non-atomic snapshot of compaction statistics
159#[derive(Debug, Clone, Default)]
160pub struct CompactionStatsSnapshot {
161    /// Total bytes read during compaction
162    pub bytes_read: u64,
163    /// Total bytes written during compaction
164    pub bytes_written: u64,
165    /// Total files merged
166    pub files_merged: u64,
167    /// Total compactions completed
168    pub compactions_completed: u64,
169    /// Total duration in milliseconds
170    pub total_duration_ms: u64,
171    /// Total keys processed
172    pub keys_processed: u64,
173    /// Total tombstones removed
174    pub tombstones_removed: u64,
175}
176
177/// Compaction write throttler
178///
179/// Limits the rate of compaction writes to avoid overwhelming I/O.
180/// Uses `std::thread::sleep` for tokio-free operation.
181#[derive(Debug)]
182pub struct CompactionThrottler {
183    /// Maximum bytes per second (0 = unlimited)
184    max_bytes_per_sec: u64,
185    /// Bytes written in the current tracking window
186    bytes_in_window: u64,
187    /// Start time of the current tracking window
188    window_start: Instant,
189}
190
191impl CompactionThrottler {
192    /// Create a new throttler with the given rate limit
193    pub fn new(max_bytes_per_sec: u64) -> Self {
194        Self {
195            max_bytes_per_sec,
196            bytes_in_window: 0,
197            window_start: Instant::now(),
198        }
199    }
200
201    /// Record bytes written and sleep if rate limit is exceeded
202    pub fn throttle(&mut self, bytes_written: u64) {
203        if self.max_bytes_per_sec == 0 {
204            return; // unlimited
205        }
206
207        self.bytes_in_window += bytes_written;
208
209        let elapsed = self.window_start.elapsed();
210        let elapsed_secs = elapsed.as_secs_f64();
211
212        // Calculate expected time for the bytes written at the rate limit
213        let expected_secs = self.bytes_in_window as f64 / self.max_bytes_per_sec as f64;
214
215        if expected_secs > elapsed_secs {
216            let sleep_duration = Duration::from_secs_f64(expected_secs - elapsed_secs);
217            std::thread::sleep(sleep_duration);
218        }
219
220        // Reset window periodically (every second) to avoid accumulation drift
221        if elapsed_secs >= 1.0 {
222            self.bytes_in_window = 0;
223            self.window_start = Instant::now();
224        }
225    }
226
227    /// Check if throttling is enabled
228    pub fn is_enabled(&self) -> bool {
229        self.max_bytes_per_sec > 0
230    }
231}
232
233/// A size tier: a group of SSTables with similar sizes
234#[derive(Debug, Clone)]
235pub struct SizeTier {
236    /// SSTables in this tier
237    pub sstables: Vec<SSTableMetadata>,
238    /// Average file size in this tier
239    pub avg_size: u64,
240}
241
242/// Compaction planner
243pub struct CompactionPlanner {
244    config: CompactionConfig,
245}
246
247impl CompactionPlanner {
248    /// Create a new compaction planner
249    pub fn new(config: CompactionConfig) -> Self {
250        Self { config }
251    }
252
253    /// Check if L0 needs compaction
254    pub fn needs_l0_compaction(&self, l0_sstable_count: usize) -> bool {
255        l0_sstable_count >= self.config.l0_threshold
256    }
257
258    /// Check if a level needs compaction
259    pub fn needs_level_compaction(&self, level: usize, level_size: u64) -> bool {
260        if level == 0 {
261            return false; // L0 uses count-based threshold
262        }
263
264        let target_size = self.level_target_size(level);
265        level_size > target_size
266    }
267
268    /// Calculate target size for a level
269    pub fn level_target_size(&self, level: usize) -> u64 {
270        if level == 0 {
271            return 0; // L0 doesn't have a size target
272        }
273
274        self.config.base_level_size * (self.config.level_multiplier as u64).pow(level as u32 - 1)
275    }
276
277    /// Plan a compaction task (level-based strategy)
278    pub fn plan_compaction(
279        &self,
280        source_level: usize,
281        source_sstables: Vec<SSTableMetadata>,
282        target_sstables: Vec<SSTableMetadata>,
283    ) -> Option<CompactionTask> {
284        if source_sstables.is_empty() {
285            return None;
286        }
287
288        // For L0 → L1, take all L0 SSTables
289        let source_to_compact = if source_level == 0 {
290            source_sstables
291        } else {
292            // For L1+, select SSTables based on size
293            self.select_sstables_for_compaction(source_sstables)
294        };
295
296        if source_to_compact.is_empty() {
297            return None;
298        }
299
300        // Find overlapping SSTables in target level
301        let target_to_merge = self.find_overlapping_sstables(&source_to_compact, &target_sstables);
302
303        Some(CompactionTask {
304            source_level,
305            target_level: source_level + 1,
306            source_sstables: source_to_compact,
307            target_sstables: target_to_merge,
308        })
309    }
310
311    /// Plan a size-tiered compaction task
312    ///
313    /// Groups SSTables by similar size (within `size_ratio` of each other).
314    /// When a tier has at least `min_tier_size` SSTables, they are merged.
315    pub fn plan_size_tiered_compaction(
316        &self,
317        sstables: Vec<SSTableMetadata>,
318    ) -> Option<CompactionTask> {
319        let tiers = self.group_by_size_tier(sstables);
320
321        // Find the first tier that has enough SSTables to trigger compaction
322        for tier in tiers {
323            if tier.sstables.len() >= self.config.min_tier_size {
324                // Determine the appropriate target level
325                // Size-tiered puts output into level based on size
326                let max_level = tier.sstables.iter().map(|s| s.level).max().unwrap_or(0);
327                let target_level = max_level + 1;
328
329                return Some(CompactionTask {
330                    source_level: max_level,
331                    target_level,
332                    source_sstables: tier.sstables,
333                    target_sstables: Vec::new(),
334                });
335            }
336        }
337
338        None
339    }
340
341    /// Group SSTables into size tiers
342    ///
343    /// SSTables are grouped such that the largest file in a tier is at most
344    /// `size_ratio` times the smallest file in the same tier.
345    pub fn group_by_size_tier(&self, mut sstables: Vec<SSTableMetadata>) -> Vec<SizeTier> {
346        if sstables.is_empty() {
347            return Vec::new();
348        }
349
350        // Sort by file size
351        sstables.sort_by_key(|s| s.file_size);
352
353        // Filter out SSTables smaller than minimum size
354        let sstables: Vec<SSTableMetadata> = sstables
355            .into_iter()
356            .filter(|s| s.file_size >= self.config.min_sstable_size)
357            .collect();
358
359        if sstables.is_empty() {
360            return Vec::new();
361        }
362
363        let mut tiers: Vec<SizeTier> = Vec::new();
364        let mut current_tier_sstables: Vec<SSTableMetadata> = Vec::new();
365        let mut tier_min_size: u64 = 0;
366
367        for sstable in sstables {
368            if current_tier_sstables.is_empty() {
369                tier_min_size = sstable.file_size;
370                current_tier_sstables.push(sstable);
371            } else if (sstable.file_size as f64) <= (tier_min_size as f64 * self.config.size_ratio)
372            {
373                // Within the size ratio, add to current tier
374                current_tier_sstables.push(sstable);
375            } else {
376                // Start a new tier
377                let avg_size = current_tier_sstables
378                    .iter()
379                    .map(|s| s.file_size)
380                    .sum::<u64>()
381                    / current_tier_sstables.len().max(1) as u64;
382                tiers.push(SizeTier {
383                    sstables: std::mem::take(&mut current_tier_sstables),
384                    avg_size,
385                });
386                tier_min_size = sstable.file_size;
387                current_tier_sstables.push(sstable);
388            }
389        }
390
391        // Don't forget the last tier
392        if !current_tier_sstables.is_empty() {
393            let avg_size = current_tier_sstables
394                .iter()
395                .map(|s| s.file_size)
396                .sum::<u64>()
397                / current_tier_sstables.len().max(1) as u64;
398            tiers.push(SizeTier {
399                sstables: current_tier_sstables,
400                avg_size,
401            });
402        }
403
404        tiers
405    }
406
407    /// Select SSTables for compaction (L1+)
408    fn select_sstables_for_compaction(
409        &self,
410        sstables: Vec<SSTableMetadata>,
411    ) -> Vec<SSTableMetadata> {
412        let mut selected = Vec::new();
413        let mut total_size = 0u64;
414
415        for sstable in sstables {
416            if total_size + sstable.file_size > self.config.max_compaction_bytes {
417                break;
418            }
419
420            total_size += sstable.file_size;
421            selected.push(sstable);
422
423            // Compact at least 2 SSTables
424            if selected.len() >= 2 {
425                break;
426            }
427        }
428
429        selected
430    }
431
432    /// Find overlapping SSTables in target level
433    pub fn find_overlapping_sstables(
434        &self,
435        source_sstables: &[SSTableMetadata],
436        target_sstables: &[SSTableMetadata],
437    ) -> Vec<SSTableMetadata> {
438        if source_sstables.is_empty() {
439            return Vec::new();
440        }
441
442        // Find min and max keys from source SSTables (safe: checked is_empty above)
443        let min_key = source_sstables
444            .iter()
445            .map(|s| &s.min_key)
446            .min()
447            .expect("source_sstables is non-empty");
448
449        let max_key = source_sstables
450            .iter()
451            .map(|s| &s.max_key)
452            .max()
453            .expect("source_sstables is non-empty");
454
455        // Find all target SSTables that overlap with this range
456        target_sstables
457            .iter()
458            .filter(|sstable| {
459                // Check if ranges overlap
460                !(&sstable.max_key < min_key || &sstable.min_key > max_key)
461            })
462            .cloned()
463            .collect()
464    }
465}
466
467/// Tombstone entry with timestamp for TTL-based garbage collection
468#[derive(Debug, Clone)]
469pub struct TombstoneEntry {
470    /// The key that was deleted
471    pub key: Key,
472    /// When the tombstone was created
473    pub created_at: Instant,
474}
475
476/// Compaction executor
477pub struct CompactionExecutor {
478    config: SSTableConfig,
479    compaction_config: CompactionConfig,
480    stats: Arc<CompactionStats>,
481    throttler: CompactionThrottler,
482    /// Active tombstones with their creation times
483    tombstones: BTreeMap<Key, Instant>,
484}
485
486impl CompactionExecutor {
487    /// Create a new compaction executor
488    pub fn new(config: SSTableConfig) -> Self {
489        Self {
490            config,
491            compaction_config: CompactionConfig::default(),
492            stats: Arc::new(CompactionStats::default()),
493            throttler: CompactionThrottler::new(0),
494            tombstones: BTreeMap::new(),
495        }
496    }
497
498    /// Create a new compaction executor with compaction configuration
499    pub fn with_compaction_config(
500        config: SSTableConfig,
501        compaction_config: CompactionConfig,
502    ) -> Self {
503        let throttler = CompactionThrottler::new(compaction_config.max_compaction_bytes_per_sec);
504        Self {
505            config,
506            compaction_config,
507            stats: Arc::new(CompactionStats::default()),
508            throttler,
509            tombstones: BTreeMap::new(),
510        }
511    }
512
513    /// Register a tombstone with its creation time
514    pub fn register_tombstone(&mut self, key: Key, created_at: Instant) {
515        self.tombstones.insert(key, created_at);
516    }
517
518    /// Check if a tombstone has expired based on TTL
519    fn is_tombstone_expired(&self, key: &Key) -> bool {
520        if let Some(created_at) = self.tombstones.get(key) {
521            created_at.elapsed() >= self.compaction_config.tombstone_ttl
522        } else {
523            false
524        }
525    }
526
527    /// Execute a compaction task
528    pub fn execute_compaction(
529        &mut self,
530        task: CompactionTask,
531        output_dir: &Path,
532        next_sstable_id: &mut u64,
533    ) -> Result<Vec<SSTableMetadata>> {
534        let start_time = Instant::now();
535
536        // Track files merged
537        let files_merged = (task.source_sstables.len() + task.target_sstables.len()) as u64;
538        self.stats
539            .files_merged
540            .fetch_add(files_merged, Ordering::Relaxed);
541
542        let _span =
543            tracing::debug_span!("amaters.storage.compaction", files_merged = files_merged,)
544                .entered();
545
546        // Collect all entries from source and target SSTables
547        let mut all_entries: BTreeMap<Key, Option<CipherBlob>> = BTreeMap::new();
548
549        // Read from source SSTables
550        for sstable in &task.source_sstables {
551            self.read_sstable_entries(&sstable.path, &mut all_entries)?;
552            self.stats
553                .bytes_read
554                .fetch_add(sstable.file_size, Ordering::Relaxed);
555        }
556
557        // Read from target SSTables (overlapping)
558        for sstable in &task.target_sstables {
559            self.read_sstable_entries(&sstable.path, &mut all_entries)?;
560            self.stats
561                .bytes_read
562                .fetch_add(sstable.file_size, Ordering::Relaxed);
563        }
564
565        // Write merged SSTables to target level
566        let output_sstables = self.write_compacted_sstables(
567            all_entries,
568            task.target_level,
569            output_dir,
570            next_sstable_id,
571        )?;
572
573        self.stats
574            .compactions_completed
575            .fetch_add(1, Ordering::Relaxed);
576
577        let duration_ms = start_time.elapsed().as_millis() as u64;
578        self.stats
579            .total_duration_ms
580            .fetch_add(duration_ms, Ordering::Relaxed);
581
582        Ok(output_sstables)
583    }
584
585    /// Read entries from an SSTable
586    fn read_sstable_entries(
587        &self,
588        path: &Path,
589        entries: &mut BTreeMap<Key, Option<CipherBlob>>,
590    ) -> Result<()> {
591        let reader = SSTableReader::open(path)?;
592        let sstable_entries = reader.iter()?;
593
594        for (key, value) in sstable_entries {
595            self.stats.keys_processed.fetch_add(1, Ordering::Relaxed);
596            // Later entries overwrite earlier ones (LSM semantics)
597            entries.insert(key, Some(value));
598        }
599
600        Ok(())
601    }
602
603    /// Write compacted entries to new SSTables
604    fn write_compacted_sstables(
605        &mut self,
606        entries: BTreeMap<Key, Option<CipherBlob>>,
607        target_level: usize,
608        output_dir: &Path,
609        next_id: &mut u64,
610    ) -> Result<Vec<SSTableMetadata>> {
611        let mut output_sstables = Vec::new();
612        let mut current_writer: Option<SSTableWriter> = None;
613        let mut current_path: Option<PathBuf> = None;
614        let mut current_size = 0usize;
615        let mut current_min_key: Option<Key> = None;
616        let mut current_max_key: Option<Key> = None;
617        let mut current_entries = 0usize;
618
619        const MAX_SSTABLE_SIZE: usize = 2 * 1024 * 1024; // 2 MB per SSTable
620
621        for (key, value_opt) in entries {
622            // Handle tombstones (None values)
623            let value = match value_opt {
624                Some(v) => v,
625                None => {
626                    // Check if tombstone has expired (TTL-based GC)
627                    if self.is_tombstone_expired(&key) {
628                        self.stats
629                            .tombstones_removed
630                            .fetch_add(1, Ordering::Relaxed);
631                        // Remove from tombstone tracking
632                        self.tombstones.remove(&key);
633                        continue;
634                    }
635                    // Tombstone not expired yet, still skip writing it
636                    // (original behavior: all tombstones removed during compaction)
637                    self.stats
638                        .tombstones_removed
639                        .fetch_add(1, Ordering::Relaxed);
640                    continue;
641                }
642            };
643
644            // Start new SSTable if needed
645            if current_writer.is_none() || current_size >= MAX_SSTABLE_SIZE {
646                // Finish previous SSTable
647                if let Some(writer) = current_writer.take() {
648                    writer.finish()?;
649
650                    if let (Some(path), Some(min_key), Some(max_key)) = (
651                        current_path.take(),
652                        current_min_key.take(),
653                        current_max_key.take(),
654                    ) {
655                        let file_size = std::fs::metadata(&path)
656                            .map_err(|e| {
657                                AmateRSError::StorageIntegrity(ErrorContext::new(format!(
658                                    "Failed to get SSTable size: {}",
659                                    e
660                                )))
661                            })?
662                            .len();
663
664                        self.stats
665                            .bytes_written
666                            .fetch_add(file_size, Ordering::Relaxed);
667                        self.throttler.throttle(file_size);
668
669                        output_sstables.push(SSTableMetadata {
670                            path,
671                            min_key,
672                            max_key,
673                            num_entries: current_entries,
674                            file_size,
675                            level: target_level,
676                        });
677                    }
678                }
679
680                // Start new SSTable
681                let id = *next_id;
682                *next_id += 1;
683                let path = output_dir.join(format!("L{}_{:08}.sst", target_level, id));
684                let writer = SSTableWriter::new(&path, self.config.clone())?;
685
686                current_writer = Some(writer);
687                current_path = Some(path);
688                current_size = 0;
689                current_min_key = None;
690                current_max_key = None;
691                current_entries = 0;
692            }
693
694            // Write entry
695            if let Some(ref mut writer) = current_writer {
696                let entry_size = 16 + key.as_bytes().len() + value.as_bytes().len();
697                writer.add(key.clone(), value)?;
698                current_size += entry_size;
699                current_entries += 1;
700
701                if current_min_key.is_none() {
702                    current_min_key = Some(key.clone());
703                }
704                current_max_key = Some(key);
705            }
706        }
707
708        // Finish final SSTable
709        if let Some(writer) = current_writer {
710            writer.finish()?;
711
712            if let (Some(path), Some(min_key), Some(max_key)) =
713                (current_path, current_min_key, current_max_key)
714            {
715                let file_size = std::fs::metadata(&path)
716                    .map_err(|e| {
717                        AmateRSError::StorageIntegrity(ErrorContext::new(format!(
718                            "Failed to get SSTable size: {}",
719                            e
720                        )))
721                    })?
722                    .len();
723
724                self.stats
725                    .bytes_written
726                    .fetch_add(file_size, Ordering::Relaxed);
727                self.throttler.throttle(file_size);
728
729                output_sstables.push(SSTableMetadata {
730                    path,
731                    min_key,
732                    max_key,
733                    num_entries: current_entries,
734                    file_size,
735                    level: target_level,
736                });
737            }
738        }
739
740        Ok(output_sstables)
741    }
742
743    /// Get compaction statistics
744    pub fn stats(&self) -> &CompactionStats {
745        &self.stats
746    }
747
748    /// Get a snapshot of compaction statistics
749    pub fn stats_snapshot(&self) -> CompactionStatsSnapshot {
750        self.stats.snapshot()
751    }
752}
753
754#[cfg(test)]
755mod tests {
756    use super::*;
757
758    #[test]
759    fn test_compaction_planner_l0_threshold() {
760        let config = CompactionConfig::default();
761        let planner = CompactionPlanner::new(config);
762
763        assert!(!planner.needs_l0_compaction(3));
764        assert!(planner.needs_l0_compaction(4));
765        assert!(planner.needs_l0_compaction(5));
766    }
767
768    #[test]
769    fn test_compaction_planner_level_sizes() {
770        let config = CompactionConfig {
771            base_level_size: 10 * 1024 * 1024, // 10 MB
772            level_multiplier: 10,
773            ..Default::default()
774        };
775        let planner = CompactionPlanner::new(config);
776
777        assert_eq!(planner.level_target_size(1), 10 * 1024 * 1024); // 10 MB
778        assert_eq!(planner.level_target_size(2), 100 * 1024 * 1024); // 100 MB
779        assert_eq!(planner.level_target_size(3), 1000 * 1024 * 1024); // 1 GB
780    }
781
782    #[test]
783    fn test_compaction_planner_needs_compaction() {
784        let config = CompactionConfig::default();
785        let planner = CompactionPlanner::new(config);
786
787        // L0 doesn't use size-based threshold
788        assert!(!planner.needs_level_compaction(0, 100 * 1024 * 1024));
789
790        // L1 target is 10 MB
791        assert!(!planner.needs_level_compaction(1, 5 * 1024 * 1024));
792        assert!(planner.needs_level_compaction(1, 15 * 1024 * 1024));
793    }
794
795    #[test]
796    fn test_find_overlapping_sstables() {
797        let config = CompactionConfig::default();
798        let planner = CompactionPlanner::new(config);
799
800        let source = vec![SSTableMetadata {
801            path: PathBuf::from("s1.sst"),
802            min_key: Key::from_str("key_005"),
803            max_key: Key::from_str("key_015"),
804            num_entries: 10,
805            file_size: 1000,
806            level: 0,
807        }];
808
809        let target = vec![
810            SSTableMetadata {
811                path: PathBuf::from("t1.sst"),
812                min_key: Key::from_str("key_000"),
813                max_key: Key::from_str("key_010"),
814                num_entries: 10,
815                file_size: 1000,
816                level: 1,
817            },
818            SSTableMetadata {
819                path: PathBuf::from("t2.sst"),
820                min_key: Key::from_str("key_020"),
821                max_key: Key::from_str("key_030"),
822                num_entries: 10,
823                file_size: 1000,
824                level: 1,
825            },
826        ];
827
828        let overlapping = planner.find_overlapping_sstables(&source, &target);
829
830        assert_eq!(overlapping.len(), 1);
831        assert_eq!(overlapping[0].path, PathBuf::from("t1.sst"));
832    }
833
834    // ====== Size-tiered compaction tests ======
835
836    #[test]
837    fn test_size_tiered_grouping_basic() {
838        let config = CompactionConfig {
839            strategy: CompactionStrategy::SizeTiered,
840            min_sstable_size: 100,
841            size_ratio: 2.0,
842            min_tier_size: 4,
843            ..Default::default()
844        };
845        let planner = CompactionPlanner::new(config);
846
847        // Create SSTables with similar sizes (all within 2x of each other)
848        let sstables = vec![
849            make_metadata("a.sst", 1000, 0),
850            make_metadata("b.sst", 1200, 0),
851            make_metadata("c.sst", 1500, 0),
852            make_metadata("d.sst", 1800, 0),
853        ];
854
855        let tiers = planner.group_by_size_tier(sstables);
856
857        // All should be in one tier (1000 * 2.0 = 2000, all are <= 2000)
858        assert_eq!(tiers.len(), 1);
859        assert_eq!(tiers[0].sstables.len(), 4);
860    }
861
862    #[test]
863    fn test_size_tiered_grouping_multiple_tiers() {
864        let config = CompactionConfig {
865            strategy: CompactionStrategy::SizeTiered,
866            min_sstable_size: 100,
867            size_ratio: 2.0,
868            min_tier_size: 2,
869            ..Default::default()
870        };
871        let planner = CompactionPlanner::new(config);
872
873        // Two distinct size groups
874        let sstables = vec![
875            make_metadata("small1.sst", 1000, 0),
876            make_metadata("small2.sst", 1500, 0),
877            make_metadata("big1.sst", 10000, 0),
878            make_metadata("big2.sst", 15000, 0),
879        ];
880
881        let tiers = planner.group_by_size_tier(sstables);
882
883        assert_eq!(tiers.len(), 2);
884        assert_eq!(tiers[0].sstables.len(), 2); // small group
885        assert_eq!(tiers[1].sstables.len(), 2); // big group
886    }
887
888    #[test]
889    fn test_size_tiered_merge_trigger() {
890        let config = CompactionConfig {
891            strategy: CompactionStrategy::SizeTiered,
892            min_sstable_size: 100,
893            size_ratio: 2.0,
894            min_tier_size: 4,
895            ..Default::default()
896        };
897        let planner = CompactionPlanner::new(config);
898
899        // Not enough SSTables in any tier
900        let sstables = vec![
901            make_metadata("a.sst", 1000, 0),
902            make_metadata("b.sst", 1200, 0),
903            make_metadata("c.sst", 1500, 0),
904        ];
905
906        let task = planner.plan_size_tiered_compaction(sstables);
907        assert!(task.is_none(), "Should not trigger with only 3 SSTables");
908
909        // Enough SSTables in a tier
910        let sstables = vec![
911            make_metadata("a.sst", 1000, 0),
912            make_metadata("b.sst", 1200, 0),
913            make_metadata("c.sst", 1500, 0),
914            make_metadata("d.sst", 1800, 0),
915        ];
916
917        let task = planner.plan_size_tiered_compaction(sstables);
918        assert!(
919            task.is_some(),
920            "Should trigger with 4 SSTables in same tier"
921        );
922
923        let task = task.expect("task should be Some");
924        assert_eq!(task.source_sstables.len(), 4);
925        assert_eq!(task.target_level, 1);
926    }
927
928    #[test]
929    fn test_size_tiered_filters_small_sstables() {
930        let config = CompactionConfig {
931            strategy: CompactionStrategy::SizeTiered,
932            min_sstable_size: 500,
933            size_ratio: 2.0,
934            min_tier_size: 4,
935            ..Default::default()
936        };
937        let planner = CompactionPlanner::new(config);
938
939        // All SSTables below minimum size
940        let sstables = vec![
941            make_metadata("a.sst", 100, 0),
942            make_metadata("b.sst", 200, 0),
943            make_metadata("c.sst", 300, 0),
944            make_metadata("d.sst", 400, 0),
945        ];
946
947        let tiers = planner.group_by_size_tier(sstables);
948        assert!(tiers.is_empty());
949    }
950
951    // ====== Compaction stats tests ======
952
953    #[test]
954    fn test_compaction_stats_default() {
955        let stats = CompactionStats::default();
956        let snapshot = stats.snapshot();
957
958        assert_eq!(snapshot.bytes_read, 0);
959        assert_eq!(snapshot.bytes_written, 0);
960        assert_eq!(snapshot.files_merged, 0);
961        assert_eq!(snapshot.compactions_completed, 0);
962        assert_eq!(snapshot.total_duration_ms, 0);
963        assert_eq!(snapshot.keys_processed, 0);
964        assert_eq!(snapshot.tombstones_removed, 0);
965    }
966
967    #[test]
968    fn test_compaction_stats_atomic_updates() {
969        let stats = CompactionStats::default();
970
971        stats.bytes_read.fetch_add(1000, Ordering::Relaxed);
972        stats.bytes_written.fetch_add(500, Ordering::Relaxed);
973        stats.files_merged.fetch_add(3, Ordering::Relaxed);
974        stats.compactions_completed.fetch_add(1, Ordering::Relaxed);
975        stats.total_duration_ms.fetch_add(42, Ordering::Relaxed);
976        stats.keys_processed.fetch_add(100, Ordering::Relaxed);
977        stats.tombstones_removed.fetch_add(5, Ordering::Relaxed);
978
979        let snapshot = stats.snapshot();
980        assert_eq!(snapshot.bytes_read, 1000);
981        assert_eq!(snapshot.bytes_written, 500);
982        assert_eq!(snapshot.files_merged, 3);
983        assert_eq!(snapshot.compactions_completed, 1);
984        assert_eq!(snapshot.total_duration_ms, 42);
985        assert_eq!(snapshot.keys_processed, 100);
986        assert_eq!(snapshot.tombstones_removed, 5);
987    }
988
989    #[test]
990    fn test_compaction_stats_thread_safety() {
991        let stats = Arc::new(CompactionStats::default());
992
993        let handles: Vec<_> = (0..10)
994            .map(|_| {
995                let stats_clone = Arc::clone(&stats);
996                std::thread::spawn(move || {
997                    for _ in 0..100 {
998                        stats_clone.bytes_read.fetch_add(1, Ordering::Relaxed);
999                        stats_clone.keys_processed.fetch_add(1, Ordering::Relaxed);
1000                    }
1001                })
1002            })
1003            .collect();
1004
1005        for handle in handles {
1006            handle.join().expect("thread should complete");
1007        }
1008
1009        let snapshot = stats.snapshot();
1010        assert_eq!(snapshot.bytes_read, 1000);
1011        assert_eq!(snapshot.keys_processed, 1000);
1012    }
1013
1014    // ====== Throttling tests ======
1015
1016    #[test]
1017    fn test_throttler_disabled() {
1018        let mut throttler = CompactionThrottler::new(0);
1019        assert!(!throttler.is_enabled());
1020
1021        // Should return immediately, no sleeping
1022        let start = Instant::now();
1023        throttler.throttle(1_000_000);
1024        let elapsed = start.elapsed();
1025
1026        // Should complete nearly instantly
1027        assert!(elapsed < Duration::from_millis(50));
1028    }
1029
1030    #[test]
1031    fn test_throttler_enabled() {
1032        let mut throttler = CompactionThrottler::new(10_000); // 10 KB/s
1033        assert!(throttler.is_enabled());
1034
1035        // Write 20 KB, should take ~2 seconds at 10 KB/s
1036        let start = Instant::now();
1037        throttler.throttle(20_000);
1038        let elapsed = start.elapsed();
1039
1040        // Should have throttled for approximately 2 seconds
1041        assert!(
1042            elapsed >= Duration::from_millis(1500),
1043            "Expected >= 1.5s delay, got {:?}",
1044            elapsed
1045        );
1046    }
1047
1048    #[test]
1049    fn test_throttler_small_writes_no_delay() {
1050        let mut throttler = CompactionThrottler::new(1_000_000); // 1 MB/s
1051        assert!(throttler.is_enabled());
1052
1053        // Write small amount, should complete quickly
1054        let start = Instant::now();
1055        throttler.throttle(100);
1056        let elapsed = start.elapsed();
1057
1058        assert!(elapsed < Duration::from_millis(50));
1059    }
1060
1061    // ====== Tombstone TTL GC tests ======
1062
1063    #[test]
1064    fn test_tombstone_registration() {
1065        let config = SSTableConfig::default();
1066        let mut executor = CompactionExecutor::new(config);
1067
1068        let key = Key::from_str("test_key");
1069        let created_at = Instant::now();
1070        executor.register_tombstone(key.clone(), created_at);
1071
1072        // Should not be expired with default 7-day TTL
1073        assert!(!executor.is_tombstone_expired(&key));
1074    }
1075
1076    #[test]
1077    fn test_tombstone_expiry_with_short_ttl() {
1078        let config = SSTableConfig::default();
1079        let compaction_config = CompactionConfig {
1080            tombstone_ttl: Duration::from_millis(1), // Very short TTL for testing
1081            ..Default::default()
1082        };
1083        let mut executor = CompactionExecutor::with_compaction_config(config, compaction_config);
1084
1085        let key = Key::from_str("expired_key");
1086        // Create tombstone "in the past" by using an Instant that's old enough
1087        let old_time = Instant::now() - Duration::from_millis(10);
1088        executor.register_tombstone(key.clone(), old_time);
1089
1090        // Should be expired now
1091        assert!(executor.is_tombstone_expired(&key));
1092    }
1093
1094    #[test]
1095    fn test_tombstone_not_expired() {
1096        let config = SSTableConfig::default();
1097        let compaction_config = CompactionConfig {
1098            tombstone_ttl: Duration::from_secs(3600), // 1 hour TTL
1099            ..Default::default()
1100        };
1101        let mut executor = CompactionExecutor::with_compaction_config(config, compaction_config);
1102
1103        let key = Key::from_str("fresh_key");
1104        executor.register_tombstone(key.clone(), Instant::now());
1105
1106        // Should NOT be expired (just created)
1107        assert!(!executor.is_tombstone_expired(&key));
1108    }
1109
1110    #[test]
1111    fn test_unknown_tombstone_not_expired() {
1112        let config = SSTableConfig::default();
1113        let executor = CompactionExecutor::new(config);
1114
1115        // Key not registered as tombstone
1116        let key = Key::from_str("unknown_key");
1117        assert!(!executor.is_tombstone_expired(&key));
1118    }
1119
1120    // ====== Edge case tests ======
1121
1122    #[test]
1123    fn test_plan_compaction_empty_source() {
1124        let config = CompactionConfig::default();
1125        let planner = CompactionPlanner::new(config);
1126
1127        let task = planner.plan_compaction(0, Vec::new(), Vec::new());
1128        assert!(task.is_none());
1129    }
1130
1131    #[test]
1132    fn test_plan_compaction_single_sstable() {
1133        let config = CompactionConfig::default();
1134        let planner = CompactionPlanner::new(config);
1135
1136        let source = vec![SSTableMetadata {
1137            path: PathBuf::from("single.sst"),
1138            min_key: Key::from_str("key_001"),
1139            max_key: Key::from_str("key_010"),
1140            num_entries: 10,
1141            file_size: 1000,
1142            level: 0,
1143        }];
1144
1145        // L0 takes all SSTables, even just one
1146        let task = planner.plan_compaction(0, source, Vec::new());
1147        assert!(task.is_some());
1148        let task = task.expect("task should be Some for L0");
1149        assert_eq!(task.source_sstables.len(), 1);
1150    }
1151
1152    #[test]
1153    fn test_size_tiered_empty_input() {
1154        let config = CompactionConfig {
1155            strategy: CompactionStrategy::SizeTiered,
1156            ..Default::default()
1157        };
1158        let planner = CompactionPlanner::new(config);
1159
1160        let task = planner.plan_size_tiered_compaction(Vec::new());
1161        assert!(task.is_none());
1162    }
1163
1164    #[test]
1165    fn test_find_overlapping_empty_source() {
1166        let config = CompactionConfig::default();
1167        let planner = CompactionPlanner::new(config);
1168
1169        let target = vec![SSTableMetadata {
1170            path: PathBuf::from("t1.sst"),
1171            min_key: Key::from_str("key_000"),
1172            max_key: Key::from_str("key_010"),
1173            num_entries: 10,
1174            file_size: 1000,
1175            level: 1,
1176        }];
1177
1178        let overlapping = planner.find_overlapping_sstables(&[], &target);
1179        assert!(overlapping.is_empty());
1180    }
1181
1182    #[test]
1183    fn test_find_overlapping_no_overlap() {
1184        let config = CompactionConfig::default();
1185        let planner = CompactionPlanner::new(config);
1186
1187        let source = vec![SSTableMetadata {
1188            path: PathBuf::from("s1.sst"),
1189            min_key: Key::from_str("aaa"),
1190            max_key: Key::from_str("bbb"),
1191            num_entries: 10,
1192            file_size: 1000,
1193            level: 0,
1194        }];
1195
1196        let target = vec![SSTableMetadata {
1197            path: PathBuf::from("t1.sst"),
1198            min_key: Key::from_str("zzz_000"),
1199            max_key: Key::from_str("zzz_999"),
1200            num_entries: 10,
1201            file_size: 1000,
1202            level: 1,
1203        }];
1204
1205        let overlapping = planner.find_overlapping_sstables(&source, &target);
1206        assert!(overlapping.is_empty());
1207    }
1208
1209    #[test]
1210    fn test_compaction_config_defaults() {
1211        let config = CompactionConfig::default();
1212        assert_eq!(config.strategy, CompactionStrategy::LevelBased);
1213        assert_eq!(config.l0_threshold, 4);
1214        assert_eq!(config.level_multiplier, 10);
1215        assert_eq!(config.min_sstable_size, 1024);
1216        assert_eq!(config.size_ratio, 2.0);
1217        assert_eq!(config.min_tier_size, 4);
1218        assert_eq!(config.max_compaction_bytes_per_sec, 0);
1219        assert_eq!(config.tombstone_ttl, Duration::from_secs(7 * 24 * 3600));
1220    }
1221
1222    #[test]
1223    fn test_executor_stats_accessible() {
1224        let executor = CompactionExecutor::new(SSTableConfig::default());
1225        let snapshot = executor.stats_snapshot();
1226        assert_eq!(snapshot.compactions_completed, 0);
1227        assert_eq!(snapshot.bytes_read, 0);
1228    }
1229
1230    #[test]
1231    fn test_size_tiered_preserves_level_info() {
1232        let config = CompactionConfig {
1233            strategy: CompactionStrategy::SizeTiered,
1234            min_sstable_size: 100,
1235            size_ratio: 2.0,
1236            min_tier_size: 2,
1237            ..Default::default()
1238        };
1239        let planner = CompactionPlanner::new(config);
1240
1241        // SSTables at level 2
1242        let sstables = vec![
1243            make_metadata("a.sst", 1000, 2),
1244            make_metadata("b.sst", 1500, 2),
1245        ];
1246
1247        let task = planner.plan_size_tiered_compaction(sstables);
1248        assert!(task.is_some());
1249        let task = task.expect("task should be Some");
1250        assert_eq!(task.source_level, 2);
1251        assert_eq!(task.target_level, 3);
1252    }
1253
1254    #[test]
1255    fn test_level_target_size_l0() {
1256        let config = CompactionConfig::default();
1257        let planner = CompactionPlanner::new(config);
1258        assert_eq!(planner.level_target_size(0), 0);
1259    }
1260
1261    // ====== Integration-style tests with real SSTable I/O ======
1262
1263    #[test]
1264    fn test_executor_compaction_with_stats() {
1265        let temp_dir =
1266            std::env::temp_dir().join(format!("amaters_compaction_test_{}", std::process::id()));
1267        std::fs::create_dir_all(&temp_dir).expect("should create temp dir");
1268
1269        let sstable_config = SSTableConfig::default();
1270
1271        // Create two SSTables with some entries
1272        let path1 = temp_dir.join("L0_00000001.sst");
1273        let path2 = temp_dir.join("L0_00000002.sst");
1274
1275        create_test_sstable(
1276            &path1,
1277            &sstable_config,
1278            &[("key_01", "val_01"), ("key_02", "val_02")],
1279        );
1280        create_test_sstable(
1281            &path2,
1282            &sstable_config,
1283            &[("key_03", "val_03"), ("key_04", "val_04")],
1284        );
1285
1286        let meta1 = make_file_metadata(&path1, 0);
1287        let meta2 = make_file_metadata(&path2, 0);
1288
1289        let task = CompactionTask {
1290            source_level: 0,
1291            target_level: 1,
1292            source_sstables: vec![meta1, meta2],
1293            target_sstables: Vec::new(),
1294        };
1295
1296        let mut executor = CompactionExecutor::new(sstable_config);
1297        let mut next_id = 100u64;
1298
1299        let result = executor.execute_compaction(task, &temp_dir, &mut next_id);
1300        assert!(result.is_ok(), "compaction should succeed");
1301
1302        let output = result.expect("should have output");
1303        assert!(!output.is_empty(), "should produce output SSTables");
1304
1305        let snapshot = executor.stats_snapshot();
1306        assert!(snapshot.bytes_read > 0, "should have read bytes");
1307        assert!(snapshot.bytes_written > 0, "should have written bytes");
1308        assert_eq!(snapshot.compactions_completed, 1);
1309        assert_eq!(snapshot.files_merged, 2);
1310        assert!(
1311            snapshot.keys_processed >= 4,
1312            "should have processed at least 4 keys"
1313        );
1314        assert!(
1315            snapshot.total_duration_ms < 10_000,
1316            "should complete quickly"
1317        );
1318
1319        // Cleanup
1320        let _ = std::fs::remove_dir_all(&temp_dir);
1321    }
1322
1323    #[test]
1324    fn test_executor_compaction_with_throttling() {
1325        let temp_dir =
1326            std::env::temp_dir().join(format!("amaters_throttle_test_{}", std::process::id()));
1327        std::fs::create_dir_all(&temp_dir).expect("should create temp dir");
1328
1329        let sstable_config = SSTableConfig::default();
1330
1331        // Create SSTables
1332        let path1 = temp_dir.join("L0_00000001.sst");
1333        create_test_sstable(
1334            &path1,
1335            &sstable_config,
1336            &[("key_01", "val_01"), ("key_02", "val_02")],
1337        );
1338
1339        let meta1 = make_file_metadata(&path1, 0);
1340
1341        let compaction_config = CompactionConfig {
1342            max_compaction_bytes_per_sec: 0, // unlimited for this basic test
1343            ..Default::default()
1344        };
1345
1346        let task = CompactionTask {
1347            source_level: 0,
1348            target_level: 1,
1349            source_sstables: vec![meta1],
1350            target_sstables: Vec::new(),
1351        };
1352
1353        let mut executor =
1354            CompactionExecutor::with_compaction_config(sstable_config, compaction_config);
1355        let mut next_id = 200u64;
1356
1357        let start = Instant::now();
1358        let result = executor.execute_compaction(task, &temp_dir, &mut next_id);
1359        let elapsed = start.elapsed();
1360
1361        assert!(result.is_ok());
1362        // With unlimited throttle, should complete quickly
1363        assert!(elapsed < Duration::from_secs(5));
1364
1365        // Cleanup
1366        let _ = std::fs::remove_dir_all(&temp_dir);
1367    }
1368
1369    // ====== Helper functions ======
1370
1371    fn make_metadata(name: &str, file_size: u64, level: usize) -> SSTableMetadata {
1372        SSTableMetadata {
1373            path: PathBuf::from(name),
1374            min_key: Key::from_str(&format!("{}_min", name)),
1375            max_key: Key::from_str(&format!("{}_max", name)),
1376            num_entries: 10,
1377            file_size,
1378            level,
1379        }
1380    }
1381
1382    fn create_test_sstable(path: &Path, config: &SSTableConfig, entries: &[(&str, &str)]) {
1383        let mut writer =
1384            SSTableWriter::new(path, config.clone()).expect("should create SSTable writer");
1385        for (k, v) in entries {
1386            let key = Key::from_str(k);
1387            let value = CipherBlob::new(v.as_bytes().to_vec());
1388            writer.add(key, value).expect("should add entry");
1389        }
1390        writer.finish().expect("should finish writing");
1391    }
1392
1393    fn make_file_metadata(path: &Path, level: usize) -> SSTableMetadata {
1394        let file_size = std::fs::metadata(path)
1395            .expect("SSTable file should exist")
1396            .len();
1397
1398        // Read the SSTable to get key range
1399        let reader = SSTableReader::open(path).expect("should open SSTable");
1400        let entries = reader.iter().expect("should read entries");
1401
1402        let min_key = entries
1403            .first()
1404            .map(|(k, _)| k.clone())
1405            .unwrap_or_else(|| Key::from_str(""));
1406        let max_key = entries
1407            .last()
1408            .map(|(k, _)| k.clone())
1409            .unwrap_or_else(|| Key::from_str(""));
1410
1411        SSTableMetadata {
1412            path: path.to_path_buf(),
1413            min_key,
1414            max_key,
1415            num_entries: entries.len(),
1416            file_size,
1417            level,
1418        }
1419    }
1420}