Skip to main content

amaters_core/storage/
compaction.rs

1//! Compaction strategy for LSM-Tree
2//!
3//! Implements level-based and size-tiered compaction strategies to:
4//! - Merge SSTables from L0 to L1
5//! - Merge overlapping SSTables within levels
6//! - Remove tombstones (deleted keys) with TTL-based garbage collection
7//! - Maintain level size targets
8//! - Track compaction statistics with atomic counters
9//! - Throttle compaction write rate
10
11use crate::error::{AmateRSError, ErrorContext, Result};
12use crate::storage::{SSTableConfig, SSTableMetadata, SSTableReader, SSTableWriter};
13use crate::types::{CipherBlob, Key};
14use std::collections::BTreeMap;
15use std::path::{Path, PathBuf};
16use std::sync::Arc;
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::time::{Duration, Instant};
19
20/// Compaction strategy
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum CompactionStrategy {
23    /// Level-based compaction (default)
24    LevelBased,
25    /// Size-tiered compaction
26    SizeTiered,
27}
28
29/// Compaction configuration
30#[derive(Debug, Clone)]
31pub struct CompactionConfig {
32    /// Strategy to use
33    pub strategy: CompactionStrategy,
34    /// L0 compaction threshold (number of SSTables)
35    pub l0_threshold: usize,
36    /// Level size multiplier
37    pub level_multiplier: usize,
38    /// Base level size (L1 target size in bytes)
39    pub base_level_size: u64,
40    /// Maximum compaction size (bytes)
41    pub max_compaction_bytes: u64,
42    /// Minimum SSTable size for size-tiered compaction (bytes)
43    pub min_sstable_size: u64,
44    /// Size ratio for grouping SSTables in size-tiered compaction
45    /// SSTables within this ratio of each other are in the same tier
46    pub size_ratio: f64,
47    /// Minimum number of SSTables in a size tier to trigger compaction
48    pub min_tier_size: usize,
49    /// Maximum compaction write rate in bytes per second (0 = unlimited)
50    pub max_compaction_bytes_per_sec: u64,
51    /// Tombstone time-to-live: tombstones older than this are garbage collected
52    pub tombstone_ttl: Duration,
53}
54
55impl Default for CompactionConfig {
56    fn default() -> Self {
57        Self {
58            strategy: CompactionStrategy::LevelBased,
59            l0_threshold: 4,
60            level_multiplier: 10,
61            base_level_size: 10 * 1024 * 1024,       // 10 MB
62            max_compaction_bytes: 100 * 1024 * 1024, // 100 MB
63            min_sstable_size: 1024,                  // 1 KB
64            size_ratio: 2.0,
65            min_tier_size: 4,
66            max_compaction_bytes_per_sec: 0, // unlimited
67            tombstone_ttl: Duration::from_secs(7 * 24 * 3600), // 7 days
68        }
69    }
70}
71
72/// Compaction task
73#[derive(Debug, Clone)]
74pub struct CompactionTask {
75    /// Source level
76    pub source_level: usize,
77    /// Target level
78    pub target_level: usize,
79    /// SSTables to compact from source level
80    pub source_sstables: Vec<SSTableMetadata>,
81    /// SSTables to merge from target level (if any)
82    pub target_sstables: Vec<SSTableMetadata>,
83}
84
85/// Thread-safe compaction statistics tracked with atomic counters
86pub struct CompactionStats {
87    /// Total bytes read during compaction
88    pub bytes_read: AtomicU64,
89    /// Total bytes written during compaction
90    pub bytes_written: AtomicU64,
91    /// Total files merged
92    pub files_merged: AtomicU64,
93    /// Total compactions completed
94    pub compactions_completed: AtomicU64,
95    /// Total duration in milliseconds
96    pub total_duration_ms: AtomicU64,
97    /// Total keys processed
98    pub keys_processed: AtomicU64,
99    /// Total tombstones removed
100    pub tombstones_removed: AtomicU64,
101}
102
103impl Default for CompactionStats {
104    fn default() -> Self {
105        Self {
106            bytes_read: AtomicU64::new(0),
107            bytes_written: AtomicU64::new(0),
108            files_merged: AtomicU64::new(0),
109            compactions_completed: AtomicU64::new(0),
110            total_duration_ms: AtomicU64::new(0),
111            keys_processed: AtomicU64::new(0),
112            tombstones_removed: AtomicU64::new(0),
113        }
114    }
115}
116
117impl std::fmt::Debug for CompactionStats {
118    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119        f.debug_struct("CompactionStats")
120            .field("bytes_read", &self.bytes_read.load(Ordering::Relaxed))
121            .field("bytes_written", &self.bytes_written.load(Ordering::Relaxed))
122            .field("files_merged", &self.files_merged.load(Ordering::Relaxed))
123            .field(
124                "compactions_completed",
125                &self.compactions_completed.load(Ordering::Relaxed),
126            )
127            .field(
128                "total_duration_ms",
129                &self.total_duration_ms.load(Ordering::Relaxed),
130            )
131            .field(
132                "keys_processed",
133                &self.keys_processed.load(Ordering::Relaxed),
134            )
135            .field(
136                "tombstones_removed",
137                &self.tombstones_removed.load(Ordering::Relaxed),
138            )
139            .finish()
140    }
141}
142
143impl CompactionStats {
144    /// Create a snapshot of current stats as simple values
145    pub fn snapshot(&self) -> CompactionStatsSnapshot {
146        CompactionStatsSnapshot {
147            bytes_read: self.bytes_read.load(Ordering::Relaxed),
148            bytes_written: self.bytes_written.load(Ordering::Relaxed),
149            files_merged: self.files_merged.load(Ordering::Relaxed),
150            compactions_completed: self.compactions_completed.load(Ordering::Relaxed),
151            total_duration_ms: self.total_duration_ms.load(Ordering::Relaxed),
152            keys_processed: self.keys_processed.load(Ordering::Relaxed),
153            tombstones_removed: self.tombstones_removed.load(Ordering::Relaxed),
154        }
155    }
156}
157
158/// Non-atomic snapshot of compaction statistics
159#[derive(Debug, Clone, Default)]
160pub struct CompactionStatsSnapshot {
161    /// Total bytes read during compaction
162    pub bytes_read: u64,
163    /// Total bytes written during compaction
164    pub bytes_written: u64,
165    /// Total files merged
166    pub files_merged: u64,
167    /// Total compactions completed
168    pub compactions_completed: u64,
169    /// Total duration in milliseconds
170    pub total_duration_ms: u64,
171    /// Total keys processed
172    pub keys_processed: u64,
173    /// Total tombstones removed
174    pub tombstones_removed: u64,
175}
176
177/// Compaction write throttler
178///
179/// Limits the rate of compaction writes to avoid overwhelming I/O.
180/// Uses `std::thread::sleep` for tokio-free operation.
181#[derive(Debug)]
182pub struct CompactionThrottler {
183    /// Maximum bytes per second (0 = unlimited)
184    max_bytes_per_sec: u64,
185    /// Bytes written in the current tracking window
186    bytes_in_window: u64,
187    /// Start time of the current tracking window
188    window_start: Instant,
189}
190
191impl CompactionThrottler {
192    /// Create a new throttler with the given rate limit
193    pub fn new(max_bytes_per_sec: u64) -> Self {
194        Self {
195            max_bytes_per_sec,
196            bytes_in_window: 0,
197            window_start: Instant::now(),
198        }
199    }
200
201    /// Record bytes written and sleep if rate limit is exceeded
202    pub fn throttle(&mut self, bytes_written: u64) {
203        if self.max_bytes_per_sec == 0 {
204            return; // unlimited
205        }
206
207        self.bytes_in_window += bytes_written;
208
209        let elapsed = self.window_start.elapsed();
210        let elapsed_secs = elapsed.as_secs_f64();
211
212        // Calculate expected time for the bytes written at the rate limit
213        let expected_secs = self.bytes_in_window as f64 / self.max_bytes_per_sec as f64;
214
215        if expected_secs > elapsed_secs {
216            let sleep_duration = Duration::from_secs_f64(expected_secs - elapsed_secs);
217            std::thread::sleep(sleep_duration);
218        }
219
220        // Reset window periodically (every second) to avoid accumulation drift
221        if elapsed_secs >= 1.0 {
222            self.bytes_in_window = 0;
223            self.window_start = Instant::now();
224        }
225    }
226
227    /// Check if throttling is enabled
228    pub fn is_enabled(&self) -> bool {
229        self.max_bytes_per_sec > 0
230    }
231}
232
233/// A size tier: a group of SSTables with similar sizes
234#[derive(Debug, Clone)]
235pub struct SizeTier {
236    /// SSTables in this tier
237    pub sstables: Vec<SSTableMetadata>,
238    /// Average file size in this tier
239    pub avg_size: u64,
240}
241
242/// Compaction planner
243pub struct CompactionPlanner {
244    config: CompactionConfig,
245}
246
247impl CompactionPlanner {
248    /// Create a new compaction planner
249    pub fn new(config: CompactionConfig) -> Self {
250        Self { config }
251    }
252
253    /// Check if L0 needs compaction
254    pub fn needs_l0_compaction(&self, l0_sstable_count: usize) -> bool {
255        l0_sstable_count >= self.config.l0_threshold
256    }
257
258    /// Check if a level needs compaction
259    pub fn needs_level_compaction(&self, level: usize, level_size: u64) -> bool {
260        if level == 0 {
261            return false; // L0 uses count-based threshold
262        }
263
264        let target_size = self.level_target_size(level);
265        level_size > target_size
266    }
267
268    /// Calculate target size for a level
269    pub fn level_target_size(&self, level: usize) -> u64 {
270        if level == 0 {
271            return 0; // L0 doesn't have a size target
272        }
273
274        self.config.base_level_size * (self.config.level_multiplier as u64).pow(level as u32 - 1)
275    }
276
277    /// Plan a compaction task (level-based strategy)
278    pub fn plan_compaction(
279        &self,
280        source_level: usize,
281        source_sstables: Vec<SSTableMetadata>,
282        target_sstables: Vec<SSTableMetadata>,
283    ) -> Option<CompactionTask> {
284        if source_sstables.is_empty() {
285            return None;
286        }
287
288        // For L0 → L1, take all L0 SSTables
289        let source_to_compact = if source_level == 0 {
290            source_sstables
291        } else {
292            // For L1+, select SSTables based on size
293            self.select_sstables_for_compaction(source_sstables)
294        };
295
296        if source_to_compact.is_empty() {
297            return None;
298        }
299
300        // Find overlapping SSTables in target level
301        let target_to_merge = self.find_overlapping_sstables(&source_to_compact, &target_sstables);
302
303        Some(CompactionTask {
304            source_level,
305            target_level: source_level + 1,
306            source_sstables: source_to_compact,
307            target_sstables: target_to_merge,
308        })
309    }
310
311    /// Plan a size-tiered compaction task
312    ///
313    /// Groups SSTables by similar size (within `size_ratio` of each other).
314    /// When a tier has at least `min_tier_size` SSTables, they are merged.
315    pub fn plan_size_tiered_compaction(
316        &self,
317        sstables: Vec<SSTableMetadata>,
318    ) -> Option<CompactionTask> {
319        let tiers = self.group_by_size_tier(sstables);
320
321        // Find the first tier that has enough SSTables to trigger compaction
322        for tier in tiers {
323            if tier.sstables.len() >= self.config.min_tier_size {
324                // Determine the appropriate target level
325                // Size-tiered puts output into level based on size
326                let max_level = tier.sstables.iter().map(|s| s.level).max().unwrap_or(0);
327                let target_level = max_level + 1;
328
329                return Some(CompactionTask {
330                    source_level: max_level,
331                    target_level,
332                    source_sstables: tier.sstables,
333                    target_sstables: Vec::new(),
334                });
335            }
336        }
337
338        None
339    }
340
341    /// Group SSTables into size tiers
342    ///
343    /// SSTables are grouped such that the largest file in a tier is at most
344    /// `size_ratio` times the smallest file in the same tier.
345    pub fn group_by_size_tier(&self, mut sstables: Vec<SSTableMetadata>) -> Vec<SizeTier> {
346        if sstables.is_empty() {
347            return Vec::new();
348        }
349
350        // Sort by file size
351        sstables.sort_by_key(|s| s.file_size);
352
353        // Filter out SSTables smaller than minimum size
354        let sstables: Vec<SSTableMetadata> = sstables
355            .into_iter()
356            .filter(|s| s.file_size >= self.config.min_sstable_size)
357            .collect();
358
359        if sstables.is_empty() {
360            return Vec::new();
361        }
362
363        let mut tiers: Vec<SizeTier> = Vec::new();
364        let mut current_tier_sstables: Vec<SSTableMetadata> = Vec::new();
365        let mut tier_min_size: u64 = 0;
366
367        for sstable in sstables {
368            if current_tier_sstables.is_empty() {
369                tier_min_size = sstable.file_size;
370                current_tier_sstables.push(sstable);
371            } else if (sstable.file_size as f64) <= (tier_min_size as f64 * self.config.size_ratio)
372            {
373                // Within the size ratio, add to current tier
374                current_tier_sstables.push(sstable);
375            } else {
376                // Start a new tier
377                let avg_size = current_tier_sstables
378                    .iter()
379                    .map(|s| s.file_size)
380                    .sum::<u64>()
381                    / current_tier_sstables.len().max(1) as u64;
382                tiers.push(SizeTier {
383                    sstables: std::mem::take(&mut current_tier_sstables),
384                    avg_size,
385                });
386                tier_min_size = sstable.file_size;
387                current_tier_sstables.push(sstable);
388            }
389        }
390
391        // Don't forget the last tier
392        if !current_tier_sstables.is_empty() {
393            let avg_size = current_tier_sstables
394                .iter()
395                .map(|s| s.file_size)
396                .sum::<u64>()
397                / current_tier_sstables.len().max(1) as u64;
398            tiers.push(SizeTier {
399                sstables: current_tier_sstables,
400                avg_size,
401            });
402        }
403
404        tiers
405    }
406
407    /// Select SSTables for compaction (L1+)
408    fn select_sstables_for_compaction(
409        &self,
410        sstables: Vec<SSTableMetadata>,
411    ) -> Vec<SSTableMetadata> {
412        let mut selected = Vec::new();
413        let mut total_size = 0u64;
414
415        for sstable in sstables {
416            if total_size + sstable.file_size > self.config.max_compaction_bytes {
417                break;
418            }
419
420            total_size += sstable.file_size;
421            selected.push(sstable);
422
423            // Compact at least 2 SSTables
424            if selected.len() >= 2 {
425                break;
426            }
427        }
428
429        selected
430    }
431
432    /// Find overlapping SSTables in target level
433    pub fn find_overlapping_sstables(
434        &self,
435        source_sstables: &[SSTableMetadata],
436        target_sstables: &[SSTableMetadata],
437    ) -> Vec<SSTableMetadata> {
438        if source_sstables.is_empty() {
439            return Vec::new();
440        }
441
442        // Find min and max keys from source SSTables (safe: checked is_empty above)
443        let min_key = source_sstables
444            .iter()
445            .map(|s| &s.min_key)
446            .min()
447            .expect("source_sstables is non-empty");
448
449        let max_key = source_sstables
450            .iter()
451            .map(|s| &s.max_key)
452            .max()
453            .expect("source_sstables is non-empty");
454
455        // Find all target SSTables that overlap with this range
456        target_sstables
457            .iter()
458            .filter(|sstable| {
459                // Check if ranges overlap
460                !(&sstable.max_key < min_key || &sstable.min_key > max_key)
461            })
462            .cloned()
463            .collect()
464    }
465}
466
467/// Tombstone entry with timestamp for TTL-based garbage collection
468#[derive(Debug, Clone)]
469pub struct TombstoneEntry {
470    /// The key that was deleted
471    pub key: Key,
472    /// When the tombstone was created
473    pub created_at: Instant,
474}
475
476/// Compaction executor
477pub struct CompactionExecutor {
478    config: SSTableConfig,
479    compaction_config: CompactionConfig,
480    stats: Arc<CompactionStats>,
481    throttler: CompactionThrottler,
482    /// Active tombstones with their creation times
483    tombstones: BTreeMap<Key, Instant>,
484}
485
486impl CompactionExecutor {
487    /// Create a new compaction executor
488    pub fn new(config: SSTableConfig) -> Self {
489        Self {
490            config,
491            compaction_config: CompactionConfig::default(),
492            stats: Arc::new(CompactionStats::default()),
493            throttler: CompactionThrottler::new(0),
494            tombstones: BTreeMap::new(),
495        }
496    }
497
498    /// Create a new compaction executor with compaction configuration
499    pub fn with_compaction_config(
500        config: SSTableConfig,
501        compaction_config: CompactionConfig,
502    ) -> Self {
503        let throttler = CompactionThrottler::new(compaction_config.max_compaction_bytes_per_sec);
504        Self {
505            config,
506            compaction_config,
507            stats: Arc::new(CompactionStats::default()),
508            throttler,
509            tombstones: BTreeMap::new(),
510        }
511    }
512
513    /// Register a tombstone with its creation time
514    pub fn register_tombstone(&mut self, key: Key, created_at: Instant) {
515        self.tombstones.insert(key, created_at);
516    }
517
518    /// Check if a tombstone has expired based on TTL
519    fn is_tombstone_expired(&self, key: &Key) -> bool {
520        if let Some(created_at) = self.tombstones.get(key) {
521            created_at.elapsed() >= self.compaction_config.tombstone_ttl
522        } else {
523            false
524        }
525    }
526
527    /// Execute a compaction task
528    pub fn execute_compaction(
529        &mut self,
530        task: CompactionTask,
531        output_dir: &Path,
532        next_sstable_id: &mut u64,
533    ) -> Result<Vec<SSTableMetadata>> {
534        let start_time = Instant::now();
535
536        // Track files merged
537        let files_merged = (task.source_sstables.len() + task.target_sstables.len()) as u64;
538        self.stats
539            .files_merged
540            .fetch_add(files_merged, Ordering::Relaxed);
541
542        // Collect all entries from source and target SSTables
543        let mut all_entries: BTreeMap<Key, Option<CipherBlob>> = BTreeMap::new();
544
545        // Read from source SSTables
546        for sstable in &task.source_sstables {
547            self.read_sstable_entries(&sstable.path, &mut all_entries)?;
548            self.stats
549                .bytes_read
550                .fetch_add(sstable.file_size, Ordering::Relaxed);
551        }
552
553        // Read from target SSTables (overlapping)
554        for sstable in &task.target_sstables {
555            self.read_sstable_entries(&sstable.path, &mut all_entries)?;
556            self.stats
557                .bytes_read
558                .fetch_add(sstable.file_size, Ordering::Relaxed);
559        }
560
561        // Write merged SSTables to target level
562        let output_sstables = self.write_compacted_sstables(
563            all_entries,
564            task.target_level,
565            output_dir,
566            next_sstable_id,
567        )?;
568
569        self.stats
570            .compactions_completed
571            .fetch_add(1, Ordering::Relaxed);
572
573        let duration_ms = start_time.elapsed().as_millis() as u64;
574        self.stats
575            .total_duration_ms
576            .fetch_add(duration_ms, Ordering::Relaxed);
577
578        Ok(output_sstables)
579    }
580
581    /// Read entries from an SSTable
582    fn read_sstable_entries(
583        &self,
584        path: &Path,
585        entries: &mut BTreeMap<Key, Option<CipherBlob>>,
586    ) -> Result<()> {
587        let reader = SSTableReader::open(path)?;
588        let sstable_entries = reader.iter()?;
589
590        for (key, value) in sstable_entries {
591            self.stats.keys_processed.fetch_add(1, Ordering::Relaxed);
592            // Later entries overwrite earlier ones (LSM semantics)
593            entries.insert(key, Some(value));
594        }
595
596        Ok(())
597    }
598
599    /// Write compacted entries to new SSTables
600    fn write_compacted_sstables(
601        &mut self,
602        entries: BTreeMap<Key, Option<CipherBlob>>,
603        target_level: usize,
604        output_dir: &Path,
605        next_id: &mut u64,
606    ) -> Result<Vec<SSTableMetadata>> {
607        let mut output_sstables = Vec::new();
608        let mut current_writer: Option<SSTableWriter> = None;
609        let mut current_path: Option<PathBuf> = None;
610        let mut current_size = 0usize;
611        let mut current_min_key: Option<Key> = None;
612        let mut current_max_key: Option<Key> = None;
613        let mut current_entries = 0usize;
614
615        const MAX_SSTABLE_SIZE: usize = 2 * 1024 * 1024; // 2 MB per SSTable
616
617        for (key, value_opt) in entries {
618            // Handle tombstones (None values)
619            let value = match value_opt {
620                Some(v) => v,
621                None => {
622                    // Check if tombstone has expired (TTL-based GC)
623                    if self.is_tombstone_expired(&key) {
624                        self.stats
625                            .tombstones_removed
626                            .fetch_add(1, Ordering::Relaxed);
627                        // Remove from tombstone tracking
628                        self.tombstones.remove(&key);
629                        continue;
630                    }
631                    // Tombstone not expired yet, still skip writing it
632                    // (original behavior: all tombstones removed during compaction)
633                    self.stats
634                        .tombstones_removed
635                        .fetch_add(1, Ordering::Relaxed);
636                    continue;
637                }
638            };
639
640            // Start new SSTable if needed
641            if current_writer.is_none() || current_size >= MAX_SSTABLE_SIZE {
642                // Finish previous SSTable
643                if let Some(writer) = current_writer.take() {
644                    writer.finish()?;
645
646                    if let (Some(path), Some(min_key), Some(max_key)) = (
647                        current_path.take(),
648                        current_min_key.take(),
649                        current_max_key.take(),
650                    ) {
651                        let file_size = std::fs::metadata(&path)
652                            .map_err(|e| {
653                                AmateRSError::StorageIntegrity(ErrorContext::new(format!(
654                                    "Failed to get SSTable size: {}",
655                                    e
656                                )))
657                            })?
658                            .len();
659
660                        self.stats
661                            .bytes_written
662                            .fetch_add(file_size, Ordering::Relaxed);
663                        self.throttler.throttle(file_size);
664
665                        output_sstables.push(SSTableMetadata {
666                            path,
667                            min_key,
668                            max_key,
669                            num_entries: current_entries,
670                            file_size,
671                            level: target_level,
672                        });
673                    }
674                }
675
676                // Start new SSTable
677                let id = *next_id;
678                *next_id += 1;
679                let path = output_dir.join(format!("L{}_{:08}.sst", target_level, id));
680                let writer = SSTableWriter::new(&path, self.config.clone())?;
681
682                current_writer = Some(writer);
683                current_path = Some(path);
684                current_size = 0;
685                current_min_key = None;
686                current_max_key = None;
687                current_entries = 0;
688            }
689
690            // Write entry
691            if let Some(ref mut writer) = current_writer {
692                let entry_size = 16 + key.as_bytes().len() + value.as_bytes().len();
693                writer.add(key.clone(), value)?;
694                current_size += entry_size;
695                current_entries += 1;
696
697                if current_min_key.is_none() {
698                    current_min_key = Some(key.clone());
699                }
700                current_max_key = Some(key);
701            }
702        }
703
704        // Finish final SSTable
705        if let Some(writer) = current_writer {
706            writer.finish()?;
707
708            if let (Some(path), Some(min_key), Some(max_key)) =
709                (current_path, current_min_key, current_max_key)
710            {
711                let file_size = std::fs::metadata(&path)
712                    .map_err(|e| {
713                        AmateRSError::StorageIntegrity(ErrorContext::new(format!(
714                            "Failed to get SSTable size: {}",
715                            e
716                        )))
717                    })?
718                    .len();
719
720                self.stats
721                    .bytes_written
722                    .fetch_add(file_size, Ordering::Relaxed);
723                self.throttler.throttle(file_size);
724
725                output_sstables.push(SSTableMetadata {
726                    path,
727                    min_key,
728                    max_key,
729                    num_entries: current_entries,
730                    file_size,
731                    level: target_level,
732                });
733            }
734        }
735
736        Ok(output_sstables)
737    }
738
739    /// Get compaction statistics
740    pub fn stats(&self) -> &CompactionStats {
741        &self.stats
742    }
743
744    /// Get a snapshot of compaction statistics
745    pub fn stats_snapshot(&self) -> CompactionStatsSnapshot {
746        self.stats.snapshot()
747    }
748}
749
750#[cfg(test)]
751mod tests {
752    use super::*;
753
754    #[test]
755    fn test_compaction_planner_l0_threshold() {
756        let config = CompactionConfig::default();
757        let planner = CompactionPlanner::new(config);
758
759        assert!(!planner.needs_l0_compaction(3));
760        assert!(planner.needs_l0_compaction(4));
761        assert!(planner.needs_l0_compaction(5));
762    }
763
764    #[test]
765    fn test_compaction_planner_level_sizes() {
766        let config = CompactionConfig {
767            base_level_size: 10 * 1024 * 1024, // 10 MB
768            level_multiplier: 10,
769            ..Default::default()
770        };
771        let planner = CompactionPlanner::new(config);
772
773        assert_eq!(planner.level_target_size(1), 10 * 1024 * 1024); // 10 MB
774        assert_eq!(planner.level_target_size(2), 100 * 1024 * 1024); // 100 MB
775        assert_eq!(planner.level_target_size(3), 1000 * 1024 * 1024); // 1 GB
776    }
777
778    #[test]
779    fn test_compaction_planner_needs_compaction() {
780        let config = CompactionConfig::default();
781        let planner = CompactionPlanner::new(config);
782
783        // L0 doesn't use size-based threshold
784        assert!(!planner.needs_level_compaction(0, 100 * 1024 * 1024));
785
786        // L1 target is 10 MB
787        assert!(!planner.needs_level_compaction(1, 5 * 1024 * 1024));
788        assert!(planner.needs_level_compaction(1, 15 * 1024 * 1024));
789    }
790
791    #[test]
792    fn test_find_overlapping_sstables() {
793        let config = CompactionConfig::default();
794        let planner = CompactionPlanner::new(config);
795
796        let source = vec![SSTableMetadata {
797            path: PathBuf::from("s1.sst"),
798            min_key: Key::from_str("key_005"),
799            max_key: Key::from_str("key_015"),
800            num_entries: 10,
801            file_size: 1000,
802            level: 0,
803        }];
804
805        let target = vec![
806            SSTableMetadata {
807                path: PathBuf::from("t1.sst"),
808                min_key: Key::from_str("key_000"),
809                max_key: Key::from_str("key_010"),
810                num_entries: 10,
811                file_size: 1000,
812                level: 1,
813            },
814            SSTableMetadata {
815                path: PathBuf::from("t2.sst"),
816                min_key: Key::from_str("key_020"),
817                max_key: Key::from_str("key_030"),
818                num_entries: 10,
819                file_size: 1000,
820                level: 1,
821            },
822        ];
823
824        let overlapping = planner.find_overlapping_sstables(&source, &target);
825
826        assert_eq!(overlapping.len(), 1);
827        assert_eq!(overlapping[0].path, PathBuf::from("t1.sst"));
828    }
829
830    // ====== Size-tiered compaction tests ======
831
832    #[test]
833    fn test_size_tiered_grouping_basic() {
834        let config = CompactionConfig {
835            strategy: CompactionStrategy::SizeTiered,
836            min_sstable_size: 100,
837            size_ratio: 2.0,
838            min_tier_size: 4,
839            ..Default::default()
840        };
841        let planner = CompactionPlanner::new(config);
842
843        // Create SSTables with similar sizes (all within 2x of each other)
844        let sstables = vec![
845            make_metadata("a.sst", 1000, 0),
846            make_metadata("b.sst", 1200, 0),
847            make_metadata("c.sst", 1500, 0),
848            make_metadata("d.sst", 1800, 0),
849        ];
850
851        let tiers = planner.group_by_size_tier(sstables);
852
853        // All should be in one tier (1000 * 2.0 = 2000, all are <= 2000)
854        assert_eq!(tiers.len(), 1);
855        assert_eq!(tiers[0].sstables.len(), 4);
856    }
857
858    #[test]
859    fn test_size_tiered_grouping_multiple_tiers() {
860        let config = CompactionConfig {
861            strategy: CompactionStrategy::SizeTiered,
862            min_sstable_size: 100,
863            size_ratio: 2.0,
864            min_tier_size: 2,
865            ..Default::default()
866        };
867        let planner = CompactionPlanner::new(config);
868
869        // Two distinct size groups
870        let sstables = vec![
871            make_metadata("small1.sst", 1000, 0),
872            make_metadata("small2.sst", 1500, 0),
873            make_metadata("big1.sst", 10000, 0),
874            make_metadata("big2.sst", 15000, 0),
875        ];
876
877        let tiers = planner.group_by_size_tier(sstables);
878
879        assert_eq!(tiers.len(), 2);
880        assert_eq!(tiers[0].sstables.len(), 2); // small group
881        assert_eq!(tiers[1].sstables.len(), 2); // big group
882    }
883
884    #[test]
885    fn test_size_tiered_merge_trigger() {
886        let config = CompactionConfig {
887            strategy: CompactionStrategy::SizeTiered,
888            min_sstable_size: 100,
889            size_ratio: 2.0,
890            min_tier_size: 4,
891            ..Default::default()
892        };
893        let planner = CompactionPlanner::new(config);
894
895        // Not enough SSTables in any tier
896        let sstables = vec![
897            make_metadata("a.sst", 1000, 0),
898            make_metadata("b.sst", 1200, 0),
899            make_metadata("c.sst", 1500, 0),
900        ];
901
902        let task = planner.plan_size_tiered_compaction(sstables);
903        assert!(task.is_none(), "Should not trigger with only 3 SSTables");
904
905        // Enough SSTables in a tier
906        let sstables = vec![
907            make_metadata("a.sst", 1000, 0),
908            make_metadata("b.sst", 1200, 0),
909            make_metadata("c.sst", 1500, 0),
910            make_metadata("d.sst", 1800, 0),
911        ];
912
913        let task = planner.plan_size_tiered_compaction(sstables);
914        assert!(
915            task.is_some(),
916            "Should trigger with 4 SSTables in same tier"
917        );
918
919        let task = task.expect("task should be Some");
920        assert_eq!(task.source_sstables.len(), 4);
921        assert_eq!(task.target_level, 1);
922    }
923
924    #[test]
925    fn test_size_tiered_filters_small_sstables() {
926        let config = CompactionConfig {
927            strategy: CompactionStrategy::SizeTiered,
928            min_sstable_size: 500,
929            size_ratio: 2.0,
930            min_tier_size: 4,
931            ..Default::default()
932        };
933        let planner = CompactionPlanner::new(config);
934
935        // All SSTables below minimum size
936        let sstables = vec![
937            make_metadata("a.sst", 100, 0),
938            make_metadata("b.sst", 200, 0),
939            make_metadata("c.sst", 300, 0),
940            make_metadata("d.sst", 400, 0),
941        ];
942
943        let tiers = planner.group_by_size_tier(sstables);
944        assert!(tiers.is_empty());
945    }
946
947    // ====== Compaction stats tests ======
948
949    #[test]
950    fn test_compaction_stats_default() {
951        let stats = CompactionStats::default();
952        let snapshot = stats.snapshot();
953
954        assert_eq!(snapshot.bytes_read, 0);
955        assert_eq!(snapshot.bytes_written, 0);
956        assert_eq!(snapshot.files_merged, 0);
957        assert_eq!(snapshot.compactions_completed, 0);
958        assert_eq!(snapshot.total_duration_ms, 0);
959        assert_eq!(snapshot.keys_processed, 0);
960        assert_eq!(snapshot.tombstones_removed, 0);
961    }
962
963    #[test]
964    fn test_compaction_stats_atomic_updates() {
965        let stats = CompactionStats::default();
966
967        stats.bytes_read.fetch_add(1000, Ordering::Relaxed);
968        stats.bytes_written.fetch_add(500, Ordering::Relaxed);
969        stats.files_merged.fetch_add(3, Ordering::Relaxed);
970        stats.compactions_completed.fetch_add(1, Ordering::Relaxed);
971        stats.total_duration_ms.fetch_add(42, Ordering::Relaxed);
972        stats.keys_processed.fetch_add(100, Ordering::Relaxed);
973        stats.tombstones_removed.fetch_add(5, Ordering::Relaxed);
974
975        let snapshot = stats.snapshot();
976        assert_eq!(snapshot.bytes_read, 1000);
977        assert_eq!(snapshot.bytes_written, 500);
978        assert_eq!(snapshot.files_merged, 3);
979        assert_eq!(snapshot.compactions_completed, 1);
980        assert_eq!(snapshot.total_duration_ms, 42);
981        assert_eq!(snapshot.keys_processed, 100);
982        assert_eq!(snapshot.tombstones_removed, 5);
983    }
984
985    #[test]
986    fn test_compaction_stats_thread_safety() {
987        let stats = Arc::new(CompactionStats::default());
988
989        let handles: Vec<_> = (0..10)
990            .map(|_| {
991                let stats_clone = Arc::clone(&stats);
992                std::thread::spawn(move || {
993                    for _ in 0..100 {
994                        stats_clone.bytes_read.fetch_add(1, Ordering::Relaxed);
995                        stats_clone.keys_processed.fetch_add(1, Ordering::Relaxed);
996                    }
997                })
998            })
999            .collect();
1000
1001        for handle in handles {
1002            handle.join().expect("thread should complete");
1003        }
1004
1005        let snapshot = stats.snapshot();
1006        assert_eq!(snapshot.bytes_read, 1000);
1007        assert_eq!(snapshot.keys_processed, 1000);
1008    }
1009
1010    // ====== Throttling tests ======
1011
1012    #[test]
1013    fn test_throttler_disabled() {
1014        let mut throttler = CompactionThrottler::new(0);
1015        assert!(!throttler.is_enabled());
1016
1017        // Should return immediately, no sleeping
1018        let start = Instant::now();
1019        throttler.throttle(1_000_000);
1020        let elapsed = start.elapsed();
1021
1022        // Should complete nearly instantly
1023        assert!(elapsed < Duration::from_millis(50));
1024    }
1025
1026    #[test]
1027    fn test_throttler_enabled() {
1028        let mut throttler = CompactionThrottler::new(10_000); // 10 KB/s
1029        assert!(throttler.is_enabled());
1030
1031        // Write 20 KB, should take ~2 seconds at 10 KB/s
1032        let start = Instant::now();
1033        throttler.throttle(20_000);
1034        let elapsed = start.elapsed();
1035
1036        // Should have throttled for approximately 2 seconds
1037        assert!(
1038            elapsed >= Duration::from_millis(1500),
1039            "Expected >= 1.5s delay, got {:?}",
1040            elapsed
1041        );
1042    }
1043
1044    #[test]
1045    fn test_throttler_small_writes_no_delay() {
1046        let mut throttler = CompactionThrottler::new(1_000_000); // 1 MB/s
1047        assert!(throttler.is_enabled());
1048
1049        // Write small amount, should complete quickly
1050        let start = Instant::now();
1051        throttler.throttle(100);
1052        let elapsed = start.elapsed();
1053
1054        assert!(elapsed < Duration::from_millis(50));
1055    }
1056
1057    // ====== Tombstone TTL GC tests ======
1058
1059    #[test]
1060    fn test_tombstone_registration() {
1061        let config = SSTableConfig::default();
1062        let mut executor = CompactionExecutor::new(config);
1063
1064        let key = Key::from_str("test_key");
1065        let created_at = Instant::now();
1066        executor.register_tombstone(key.clone(), created_at);
1067
1068        // Should not be expired with default 7-day TTL
1069        assert!(!executor.is_tombstone_expired(&key));
1070    }
1071
1072    #[test]
1073    fn test_tombstone_expiry_with_short_ttl() {
1074        let config = SSTableConfig::default();
1075        let compaction_config = CompactionConfig {
1076            tombstone_ttl: Duration::from_millis(1), // Very short TTL for testing
1077            ..Default::default()
1078        };
1079        let mut executor = CompactionExecutor::with_compaction_config(config, compaction_config);
1080
1081        let key = Key::from_str("expired_key");
1082        // Create tombstone "in the past" by using an Instant that's old enough
1083        let old_time = Instant::now() - Duration::from_millis(10);
1084        executor.register_tombstone(key.clone(), old_time);
1085
1086        // Should be expired now
1087        assert!(executor.is_tombstone_expired(&key));
1088    }
1089
1090    #[test]
1091    fn test_tombstone_not_expired() {
1092        let config = SSTableConfig::default();
1093        let compaction_config = CompactionConfig {
1094            tombstone_ttl: Duration::from_secs(3600), // 1 hour TTL
1095            ..Default::default()
1096        };
1097        let mut executor = CompactionExecutor::with_compaction_config(config, compaction_config);
1098
1099        let key = Key::from_str("fresh_key");
1100        executor.register_tombstone(key.clone(), Instant::now());
1101
1102        // Should NOT be expired (just created)
1103        assert!(!executor.is_tombstone_expired(&key));
1104    }
1105
1106    #[test]
1107    fn test_unknown_tombstone_not_expired() {
1108        let config = SSTableConfig::default();
1109        let executor = CompactionExecutor::new(config);
1110
1111        // Key not registered as tombstone
1112        let key = Key::from_str("unknown_key");
1113        assert!(!executor.is_tombstone_expired(&key));
1114    }
1115
1116    // ====== Edge case tests ======
1117
1118    #[test]
1119    fn test_plan_compaction_empty_source() {
1120        let config = CompactionConfig::default();
1121        let planner = CompactionPlanner::new(config);
1122
1123        let task = planner.plan_compaction(0, Vec::new(), Vec::new());
1124        assert!(task.is_none());
1125    }
1126
1127    #[test]
1128    fn test_plan_compaction_single_sstable() {
1129        let config = CompactionConfig::default();
1130        let planner = CompactionPlanner::new(config);
1131
1132        let source = vec![SSTableMetadata {
1133            path: PathBuf::from("single.sst"),
1134            min_key: Key::from_str("key_001"),
1135            max_key: Key::from_str("key_010"),
1136            num_entries: 10,
1137            file_size: 1000,
1138            level: 0,
1139        }];
1140
1141        // L0 takes all SSTables, even just one
1142        let task = planner.plan_compaction(0, source, Vec::new());
1143        assert!(task.is_some());
1144        let task = task.expect("task should be Some for L0");
1145        assert_eq!(task.source_sstables.len(), 1);
1146    }
1147
1148    #[test]
1149    fn test_size_tiered_empty_input() {
1150        let config = CompactionConfig {
1151            strategy: CompactionStrategy::SizeTiered,
1152            ..Default::default()
1153        };
1154        let planner = CompactionPlanner::new(config);
1155
1156        let task = planner.plan_size_tiered_compaction(Vec::new());
1157        assert!(task.is_none());
1158    }
1159
1160    #[test]
1161    fn test_find_overlapping_empty_source() {
1162        let config = CompactionConfig::default();
1163        let planner = CompactionPlanner::new(config);
1164
1165        let target = vec![SSTableMetadata {
1166            path: PathBuf::from("t1.sst"),
1167            min_key: Key::from_str("key_000"),
1168            max_key: Key::from_str("key_010"),
1169            num_entries: 10,
1170            file_size: 1000,
1171            level: 1,
1172        }];
1173
1174        let overlapping = planner.find_overlapping_sstables(&[], &target);
1175        assert!(overlapping.is_empty());
1176    }
1177
1178    #[test]
1179    fn test_find_overlapping_no_overlap() {
1180        let config = CompactionConfig::default();
1181        let planner = CompactionPlanner::new(config);
1182
1183        let source = vec![SSTableMetadata {
1184            path: PathBuf::from("s1.sst"),
1185            min_key: Key::from_str("aaa"),
1186            max_key: Key::from_str("bbb"),
1187            num_entries: 10,
1188            file_size: 1000,
1189            level: 0,
1190        }];
1191
1192        let target = vec![SSTableMetadata {
1193            path: PathBuf::from("t1.sst"),
1194            min_key: Key::from_str("zzz_000"),
1195            max_key: Key::from_str("zzz_999"),
1196            num_entries: 10,
1197            file_size: 1000,
1198            level: 1,
1199        }];
1200
1201        let overlapping = planner.find_overlapping_sstables(&source, &target);
1202        assert!(overlapping.is_empty());
1203    }
1204
1205    #[test]
1206    fn test_compaction_config_defaults() {
1207        let config = CompactionConfig::default();
1208        assert_eq!(config.strategy, CompactionStrategy::LevelBased);
1209        assert_eq!(config.l0_threshold, 4);
1210        assert_eq!(config.level_multiplier, 10);
1211        assert_eq!(config.min_sstable_size, 1024);
1212        assert_eq!(config.size_ratio, 2.0);
1213        assert_eq!(config.min_tier_size, 4);
1214        assert_eq!(config.max_compaction_bytes_per_sec, 0);
1215        assert_eq!(config.tombstone_ttl, Duration::from_secs(7 * 24 * 3600));
1216    }
1217
1218    #[test]
1219    fn test_executor_stats_accessible() {
1220        let executor = CompactionExecutor::new(SSTableConfig::default());
1221        let snapshot = executor.stats_snapshot();
1222        assert_eq!(snapshot.compactions_completed, 0);
1223        assert_eq!(snapshot.bytes_read, 0);
1224    }
1225
1226    #[test]
1227    fn test_size_tiered_preserves_level_info() {
1228        let config = CompactionConfig {
1229            strategy: CompactionStrategy::SizeTiered,
1230            min_sstable_size: 100,
1231            size_ratio: 2.0,
1232            min_tier_size: 2,
1233            ..Default::default()
1234        };
1235        let planner = CompactionPlanner::new(config);
1236
1237        // SSTables at level 2
1238        let sstables = vec![
1239            make_metadata("a.sst", 1000, 2),
1240            make_metadata("b.sst", 1500, 2),
1241        ];
1242
1243        let task = planner.plan_size_tiered_compaction(sstables);
1244        assert!(task.is_some());
1245        let task = task.expect("task should be Some");
1246        assert_eq!(task.source_level, 2);
1247        assert_eq!(task.target_level, 3);
1248    }
1249
1250    #[test]
1251    fn test_level_target_size_l0() {
1252        let config = CompactionConfig::default();
1253        let planner = CompactionPlanner::new(config);
1254        assert_eq!(planner.level_target_size(0), 0);
1255    }
1256
1257    // ====== Integration-style tests with real SSTable I/O ======
1258
1259    #[test]
1260    fn test_executor_compaction_with_stats() {
1261        let temp_dir =
1262            std::env::temp_dir().join(format!("amaters_compaction_test_{}", std::process::id()));
1263        std::fs::create_dir_all(&temp_dir).expect("should create temp dir");
1264
1265        let sstable_config = SSTableConfig::default();
1266
1267        // Create two SSTables with some entries
1268        let path1 = temp_dir.join("L0_00000001.sst");
1269        let path2 = temp_dir.join("L0_00000002.sst");
1270
1271        create_test_sstable(
1272            &path1,
1273            &sstable_config,
1274            &[("key_01", "val_01"), ("key_02", "val_02")],
1275        );
1276        create_test_sstable(
1277            &path2,
1278            &sstable_config,
1279            &[("key_03", "val_03"), ("key_04", "val_04")],
1280        );
1281
1282        let meta1 = make_file_metadata(&path1, 0);
1283        let meta2 = make_file_metadata(&path2, 0);
1284
1285        let task = CompactionTask {
1286            source_level: 0,
1287            target_level: 1,
1288            source_sstables: vec![meta1, meta2],
1289            target_sstables: Vec::new(),
1290        };
1291
1292        let mut executor = CompactionExecutor::new(sstable_config);
1293        let mut next_id = 100u64;
1294
1295        let result = executor.execute_compaction(task, &temp_dir, &mut next_id);
1296        assert!(result.is_ok(), "compaction should succeed");
1297
1298        let output = result.expect("should have output");
1299        assert!(!output.is_empty(), "should produce output SSTables");
1300
1301        let snapshot = executor.stats_snapshot();
1302        assert!(snapshot.bytes_read > 0, "should have read bytes");
1303        assert!(snapshot.bytes_written > 0, "should have written bytes");
1304        assert_eq!(snapshot.compactions_completed, 1);
1305        assert_eq!(snapshot.files_merged, 2);
1306        assert!(
1307            snapshot.keys_processed >= 4,
1308            "should have processed at least 4 keys"
1309        );
1310        assert!(
1311            snapshot.total_duration_ms < 10_000,
1312            "should complete quickly"
1313        );
1314
1315        // Cleanup
1316        let _ = std::fs::remove_dir_all(&temp_dir);
1317    }
1318
1319    #[test]
1320    fn test_executor_compaction_with_throttling() {
1321        let temp_dir =
1322            std::env::temp_dir().join(format!("amaters_throttle_test_{}", std::process::id()));
1323        std::fs::create_dir_all(&temp_dir).expect("should create temp dir");
1324
1325        let sstable_config = SSTableConfig::default();
1326
1327        // Create SSTables
1328        let path1 = temp_dir.join("L0_00000001.sst");
1329        create_test_sstable(
1330            &path1,
1331            &sstable_config,
1332            &[("key_01", "val_01"), ("key_02", "val_02")],
1333        );
1334
1335        let meta1 = make_file_metadata(&path1, 0);
1336
1337        let compaction_config = CompactionConfig {
1338            max_compaction_bytes_per_sec: 0, // unlimited for this basic test
1339            ..Default::default()
1340        };
1341
1342        let task = CompactionTask {
1343            source_level: 0,
1344            target_level: 1,
1345            source_sstables: vec![meta1],
1346            target_sstables: Vec::new(),
1347        };
1348
1349        let mut executor =
1350            CompactionExecutor::with_compaction_config(sstable_config, compaction_config);
1351        let mut next_id = 200u64;
1352
1353        let start = Instant::now();
1354        let result = executor.execute_compaction(task, &temp_dir, &mut next_id);
1355        let elapsed = start.elapsed();
1356
1357        assert!(result.is_ok());
1358        // With unlimited throttle, should complete quickly
1359        assert!(elapsed < Duration::from_secs(5));
1360
1361        // Cleanup
1362        let _ = std::fs::remove_dir_all(&temp_dir);
1363    }
1364
1365    // ====== Helper functions ======
1366
1367    fn make_metadata(name: &str, file_size: u64, level: usize) -> SSTableMetadata {
1368        SSTableMetadata {
1369            path: PathBuf::from(name),
1370            min_key: Key::from_str(&format!("{}_min", name)),
1371            max_key: Key::from_str(&format!("{}_max", name)),
1372            num_entries: 10,
1373            file_size,
1374            level,
1375        }
1376    }
1377
1378    fn create_test_sstable(path: &Path, config: &SSTableConfig, entries: &[(&str, &str)]) {
1379        let mut writer =
1380            SSTableWriter::new(path, config.clone()).expect("should create SSTable writer");
1381        for (k, v) in entries {
1382            let key = Key::from_str(k);
1383            let value = CipherBlob::new(v.as_bytes().to_vec());
1384            writer.add(key, value).expect("should add entry");
1385        }
1386        writer.finish().expect("should finish writing");
1387    }
1388
1389    fn make_file_metadata(path: &Path, level: usize) -> SSTableMetadata {
1390        let file_size = std::fs::metadata(path)
1391            .expect("SSTable file should exist")
1392            .len();
1393
1394        // Read the SSTable to get key range
1395        let reader = SSTableReader::open(path).expect("should open SSTable");
1396        let entries = reader.iter().expect("should read entries");
1397
1398        let min_key = entries
1399            .first()
1400            .map(|(k, _)| k.clone())
1401            .unwrap_or_else(|| Key::from_str(""));
1402        let max_key = entries
1403            .last()
1404            .map(|(k, _)| k.clone())
1405            .unwrap_or_else(|| Key::from_str(""));
1406
1407        SSTableMetadata {
1408            path: path.to_path_buf(),
1409            min_key,
1410            max_key,
1411            num_entries: entries.len(),
1412            file_size,
1413            level,
1414        }
1415    }
1416}