Skip to main content

sochdb_storage/
compaction_policy.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Compaction Policy for Version and Tombstone Pruning
19//!
20//! This module provides intelligent compaction policies that:
21//! - Aggressively prune old versions beyond retention window
22//! - Collect tombstones that have expired
23//! - Minimize write amplification (WA) and space amplification (SA)
24//! - Balance between read performance and compaction overhead
25//!
26//! ## Compaction Strategies
27//!
28//! 1. **Leveled Compaction**: Traditional RocksDB-style L0 → L1 → ... compaction
29//! 2. **Universal Compaction**: Minimize WA for write-heavy workloads
30//! 3. **FIFO Compaction**: Age-based TTL with minimal overhead
31//! 4. **Tiered Compaction**: Hybrid approach balancing WA and SA
32
33use std::collections::{BinaryHeap, HashMap, HashSet};
34use std::cmp::Ordering;
35use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
36use std::sync::Arc;
37use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
38
39use parking_lot::RwLock;
40
41/// Timestamp type (milliseconds since epoch)
42pub type Timestamp = u64;
43
44/// Version retention configuration
45#[derive(Debug, Clone)]
46pub struct RetentionConfig {
47    /// Maximum age for old versions (None = keep forever)
48    pub max_version_age: Option<Duration>,
49    /// Maximum number of versions to keep per key
50    pub max_versions_per_key: usize,
51    /// Tombstone grace period before collection
52    pub tombstone_grace_period: Duration,
53    /// Minimum age before a file is eligible for compaction
54    pub min_file_age: Duration,
55}
56
57impl Default for RetentionConfig {
58    fn default() -> Self {
59        Self {
60            max_version_age: Some(Duration::from_secs(7 * 24 * 60 * 60)), // 7 days
61            max_versions_per_key: 10,
62            tombstone_grace_period: Duration::from_secs(24 * 60 * 60), // 24 hours
63            min_file_age: Duration::from_secs(60), // 1 minute
64        }
65    }
66}
67
68/// Compaction strategy
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum CompactionStrategy {
71    /// Leveled compaction (low SA, higher WA)
72    Leveled,
73    /// Universal compaction (low WA, higher SA)
74    Universal,
75    /// FIFO compaction (TTL-based, minimal overhead)
76    Fifo,
77    /// Tiered compaction (balanced WA/SA)
78    Tiered,
79}
80
81impl Default for CompactionStrategy {
82    fn default() -> Self {
83        Self::Leveled
84    }
85}
86
87/// Compaction policy configuration
88#[derive(Debug, Clone)]
89pub struct CompactionConfig {
90    /// Compaction strategy
91    pub strategy: CompactionStrategy,
92    /// Retention configuration
93    pub retention: RetentionConfig,
94    /// Level 0 file limit before triggering compaction
95    pub l0_compaction_trigger: usize,
96    /// Maximum level 0 files before stopping writes
97    pub l0_stop_writes_trigger: usize,
98    /// Maximum bytes for level 1
99    pub max_bytes_for_level_base: u64,
100    /// Level multiplier (bytes_for_level[n+1] = multiplier * bytes_for_level[n])
101    pub max_bytes_for_level_multiplier: f64,
102    /// Target file size for base level
103    pub target_file_size_base: u64,
104    /// File size multiplier per level
105    pub target_file_size_multiplier: f64,
106    /// Maximum number of concurrent compactions
107    pub max_concurrent_compactions: usize,
108    /// Percentage of reads to sample for tombstone density
109    pub tombstone_sample_rate: f64,
110}
111
112impl Default for CompactionConfig {
113    fn default() -> Self {
114        Self {
115            strategy: CompactionStrategy::Leveled,
116            retention: RetentionConfig::default(),
117            l0_compaction_trigger: 4,
118            l0_stop_writes_trigger: 12,
119            max_bytes_for_level_base: 256 * 1024 * 1024, // 256 MB
120            max_bytes_for_level_multiplier: 10.0,
121            target_file_size_base: 64 * 1024 * 1024, // 64 MB
122            target_file_size_multiplier: 1.0,
123            max_concurrent_compactions: 4,
124            tombstone_sample_rate: 0.01,
125        }
126    }
127}
128
129/// File metadata for compaction decisions
130#[derive(Debug, Clone)]
131pub struct CompactionFile {
132    /// File ID
133    pub id: u64,
134    /// Level (0 = memtable flush, 1+ = compacted levels)
135    pub level: u32,
136    /// File size in bytes
137    pub size: u64,
138    /// Smallest key
139    pub smallest_key: Vec<u8>,
140    /// Largest key
141    pub largest_key: Vec<u8>,
142    /// Number of entries
143    pub num_entries: u64,
144    /// Number of deletions (tombstones)
145    pub num_deletions: u64,
146    /// Number of old versions (non-latest)
147    pub num_old_versions: u64,
148    /// Oldest entry timestamp
149    pub oldest_timestamp: Timestamp,
150    /// Newest entry timestamp
151    pub newest_timestamp: Timestamp,
152    /// Creation time
153    pub created_at: Instant,
154}
155
156impl CompactionFile {
157    /// Calculate tombstone density
158    pub fn tombstone_density(&self) -> f64 {
159        if self.num_entries == 0 {
160            0.0
161        } else {
162            self.num_deletions as f64 / self.num_entries as f64
163        }
164    }
165
166    /// Calculate version density (ratio of old versions)
167    pub fn version_density(&self) -> f64 {
168        if self.num_entries == 0 {
169            0.0
170        } else {
171            self.num_old_versions as f64 / self.num_entries as f64
172        }
173    }
174
175    /// Calculate garbage ratio (tombstones + old versions)
176    pub fn garbage_ratio(&self) -> f64 {
177        if self.num_entries == 0 {
178            0.0
179        } else {
180            (self.num_deletions + self.num_old_versions) as f64 / self.num_entries as f64
181        }
182    }
183
184    /// Check if file overlaps with key range
185    pub fn overlaps(&self, smallest: &[u8], largest: &[u8]) -> bool {
186        self.smallest_key.as_slice() <= largest && self.largest_key.as_slice() >= smallest
187    }
188}
189
190/// Compaction job description
191#[derive(Debug, Clone)]
192pub struct CompactionJob {
193    /// Job ID
194    pub id: u64,
195    /// Input files
196    pub inputs: Vec<CompactionFile>,
197    /// Target level
198    pub target_level: u32,
199    /// Priority (higher = more urgent)
200    pub priority: CompactionPriority,
201    /// Estimated output size
202    pub estimated_output_size: u64,
203    /// Reason for compaction
204    pub reason: CompactionReason,
205}
206
207/// Compaction priority
208#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
209pub enum CompactionPriority {
210    /// Low priority - background maintenance
211    Low = 0,
212    /// Normal priority - regular compaction
213    Normal = 1,
214    /// High priority - L0 file count is getting high
215    High = 2,
216    /// Urgent - approaching write stall
217    Urgent = 3,
218}
219
220/// Reason for triggering compaction
221#[derive(Debug, Clone, Copy, PartialEq, Eq)]
222pub enum CompactionReason {
223    /// L0 file count trigger
224    L0FileCount,
225    /// Level size limit exceeded
226    LevelSizeLimit,
227    /// High tombstone density
228    TombstoneDensity,
229    /// High old version density
230    VersionPruning,
231    /// TTL expiration
232    TtlExpiration,
233    /// Manual trigger
234    Manual,
235    /// Forced flush
236    ForcedFlush,
237}
238
239/// Compaction picker - selects files for compaction
240pub trait CompactionPicker: Send + Sync {
241    /// Pick the next compaction job
242    fn pick_compaction(&self, state: &CompactionState) -> Option<CompactionJob>;
243
244    /// Calculate level targets
245    fn calculate_level_targets(&self, state: &CompactionState) -> Vec<u64>;
246}
247
248/// State of the LSM tree for compaction decisions
249#[derive(Debug, Default)]
250pub struct CompactionState {
251    /// Files by level
252    pub files_by_level: Vec<Vec<CompactionFile>>,
253    /// Total size by level
254    pub size_by_level: Vec<u64>,
255    /// Current timestamp for age calculations
256    pub current_time: Timestamp,
257    /// Oldest readable snapshot
258    pub oldest_snapshot: Timestamp,
259}
260
261impl CompactionState {
262    /// Get total number of L0 files
263    pub fn l0_file_count(&self) -> usize {
264        self.files_by_level.get(0).map(|f| f.len()).unwrap_or(0)
265    }
266
267    /// Get level count
268    pub fn level_count(&self) -> usize {
269        self.files_by_level.len()
270    }
271
272    /// Find overlapping files in a level
273    pub fn find_overlapping(&self, level: usize, smallest: &[u8], largest: &[u8]) -> Vec<&CompactionFile> {
274        self.files_by_level
275            .get(level)
276            .map(|files| {
277                files.iter().filter(|f| f.overlaps(smallest, largest)).collect()
278            })
279            .unwrap_or_default()
280    }
281}
282
283/// Leveled compaction picker
284pub struct LeveledCompactionPicker {
285    config: CompactionConfig,
286    job_counter: AtomicU64,
287}
288
289impl LeveledCompactionPicker {
290    pub fn new(config: CompactionConfig) -> Self {
291        Self {
292            config,
293            job_counter: AtomicU64::new(0),
294        }
295    }
296
297    /// Pick L0 compaction
298    fn pick_l0_compaction(&self, state: &CompactionState) -> Option<CompactionJob> {
299        let l0_files = state.files_by_level.get(0)?;
300        
301        if l0_files.len() < self.config.l0_compaction_trigger {
302            return None;
303        }
304
305        // All L0 files go to L1
306        let inputs: Vec<_> = l0_files.clone();
307        if inputs.is_empty() {
308            return None;
309        }
310
311        // Find overlapping L1 files
312        let smallest = inputs.iter().map(|f| f.smallest_key.as_slice()).min()?;
313        let largest = inputs.iter().map(|f| f.largest_key.as_slice()).max()?;
314
315        let l1_overlapping = state.find_overlapping(1, smallest, largest);
316        let mut all_inputs = inputs;
317        all_inputs.extend(l1_overlapping.into_iter().cloned());
318
319        let priority = if l0_files.len() >= self.config.l0_stop_writes_trigger {
320            CompactionPriority::Urgent
321        } else if l0_files.len() >= self.config.l0_compaction_trigger * 2 {
322            CompactionPriority::High
323        } else {
324            CompactionPriority::Normal
325        };
326
327        let estimated_output_size: u64 = all_inputs.iter().map(|f| f.size).sum();
328
329        Some(CompactionJob {
330            id: self.job_counter.fetch_add(1, AtomicOrdering::SeqCst),
331            inputs: all_inputs,
332            target_level: 1,
333            priority,
334            estimated_output_size,
335            reason: CompactionReason::L0FileCount,
336        })
337    }
338
339    /// Pick level-to-level compaction based on size
340    fn pick_level_compaction(&self, state: &CompactionState) -> Option<CompactionJob> {
341        let targets = self.calculate_level_targets(state);
342
343        // Find level that most exceeds its target
344        let mut max_score = 0.0f64;
345        let mut pick_level = None;
346
347        for level in 1..state.level_count() {
348            if level >= targets.len() {
349                continue;
350            }
351            let target = targets[level];
352            if target == 0 {
353                continue;
354            }
355            let actual = state.size_by_level.get(level).copied().unwrap_or(0);
356            let score = actual as f64 / target as f64;
357            if score > max_score && score > 1.0 {
358                max_score = score;
359                pick_level = Some(level);
360            }
361        }
362
363        let level = pick_level?;
364        let files = state.files_by_level.get(level)?;
365
366        // Pick file with most garbage for version/tombstone pruning
367        let pick_file = files
368            .iter()
369            .max_by(|a, b| {
370                a.garbage_ratio()
371                    .partial_cmp(&b.garbage_ratio())
372                    .unwrap_or(Ordering::Equal)
373            })?
374            .clone();
375
376        // Find overlapping files in next level
377        let next_level = level + 1;
378        let overlapping = state.find_overlapping(
379            next_level,
380            &pick_file.smallest_key,
381            &pick_file.largest_key,
382        );
383
384        let mut inputs = vec![pick_file];
385        inputs.extend(overlapping.into_iter().cloned());
386
387        let estimated_output_size: u64 = inputs.iter().map(|f| f.size).sum();
388
389        Some(CompactionJob {
390            id: self.job_counter.fetch_add(1, AtomicOrdering::SeqCst),
391            inputs,
392            target_level: next_level as u32,
393            priority: CompactionPriority::Normal,
394            estimated_output_size,
395            reason: if max_score > 2.0 {
396                CompactionReason::LevelSizeLimit
397            } else {
398                CompactionReason::VersionPruning
399            },
400        })
401    }
402
403    /// Pick tombstone-driven compaction
404    fn pick_tombstone_compaction(&self, state: &CompactionState) -> Option<CompactionJob> {
405        // Find file with highest tombstone density
406        for level in 0..state.level_count() {
407            if let Some(files) = state.files_by_level.get(level) {
408                for file in files {
409                    // Compact if tombstone density > 50%
410                    if file.tombstone_density() > 0.5 {
411                        let overlapping = state.find_overlapping(
412                            level + 1,
413                            &file.smallest_key,
414                            &file.largest_key,
415                        );
416
417                        let mut inputs = vec![file.clone()];
418                        inputs.extend(overlapping.into_iter().cloned());
419
420                        return Some(CompactionJob {
421                            id: self.job_counter.fetch_add(1, AtomicOrdering::SeqCst),
422                            inputs,
423                            target_level: (level + 1) as u32,
424                            priority: CompactionPriority::Normal,
425                            estimated_output_size: file.size / 2, // Estimate 50% reduction
426                            reason: CompactionReason::TombstoneDensity,
427                        });
428                    }
429                }
430            }
431        }
432
433        None
434    }
435}
436
437impl CompactionPicker for LeveledCompactionPicker {
438    fn pick_compaction(&self, state: &CompactionState) -> Option<CompactionJob> {
439        // Priority order:
440        // 1. L0 compaction (avoid write stalls)
441        // 2. Tombstone-driven compaction (reclaim space)
442        // 3. Level size compaction
443
444        self.pick_l0_compaction(state)
445            .or_else(|| self.pick_tombstone_compaction(state))
446            .or_else(|| self.pick_level_compaction(state))
447    }
448
449    fn calculate_level_targets(&self, state: &CompactionState) -> Vec<u64> {
450        let mut targets = vec![0u64; state.level_count().max(7)];
451
452        // L0 doesn't have a size target, it has a file count trigger
453        targets[0] = 0;
454
455        // L1 is the base
456        targets[1] = self.config.max_bytes_for_level_base;
457
458        // Each subsequent level is multiplied
459        for level in 2..targets.len() {
460            targets[level] = (targets[level - 1] as f64
461                * self.config.max_bytes_for_level_multiplier) as u64;
462        }
463
464        targets
465    }
466}
467
468/// Universal compaction picker (for write-heavy workloads)
469pub struct UniversalCompactionPicker {
470    config: CompactionConfig,
471    job_counter: AtomicU64,
472}
473
474impl UniversalCompactionPicker {
475    pub fn new(config: CompactionConfig) -> Self {
476        Self {
477            config,
478            job_counter: AtomicU64::new(0),
479        }
480    }
481}
482
483impl CompactionPicker for UniversalCompactionPicker {
484    fn pick_compaction(&self, state: &CompactionState) -> Option<CompactionJob> {
485        // Universal compaction: pick sorted runs to merge based on size ratios
486        let all_files: Vec<_> = state.files_by_level.iter().flatten().cloned().collect();
487
488        if all_files.len() < 2 {
489            return None;
490        }
491
492        // Sort by creation time (oldest first for FIFO-like behavior)
493        let mut sorted_files = all_files;
494        sorted_files.sort_by(|a, b| a.created_at.cmp(&b.created_at));
495
496        // Pick adjacent runs where size ratio is acceptable
497        let size_ratio_threshold = 2.0;
498        let mut inputs = Vec::new();
499        let mut total_size = 0u64;
500
501        for file in sorted_files {
502            if inputs.is_empty() || (total_size as f64 / file.size as f64) < size_ratio_threshold {
503                total_size += file.size;
504                inputs.push(file);
505            } else {
506                break;
507            }
508        }
509
510        if inputs.len() < 2 {
511            return None;
512        }
513
514        Some(CompactionJob {
515            id: self.job_counter.fetch_add(1, AtomicOrdering::SeqCst),
516            inputs,
517            target_level: 0, // Universal keeps everything at L0
518            priority: CompactionPriority::Normal,
519            estimated_output_size: total_size,
520            reason: CompactionReason::LevelSizeLimit,
521        })
522    }
523
524    fn calculate_level_targets(&self, _state: &CompactionState) -> Vec<u64> {
525        // Universal compaction doesn't use level targets
526        vec![]
527    }
528}
529
530/// Version pruning filter
531///
532/// Determines which versions can be safely garbage collected.
533pub struct VersionPruner {
534    /// Retention configuration
535    config: RetentionConfig,
536    /// Set of active snapshot timestamps
537    active_snapshots: RwLock<HashSet<Timestamp>>,
538    /// Current timestamp
539    current_time: AtomicU64,
540}
541
542impl VersionPruner {
543    pub fn new(config: RetentionConfig) -> Self {
544        let now = SystemTime::now()
545            .duration_since(UNIX_EPOCH)
546            .map(|d| d.as_millis() as u64)
547            .unwrap_or(0);
548
549        Self {
550            config,
551            active_snapshots: RwLock::new(HashSet::new()),
552            current_time: AtomicU64::new(now),
553        }
554    }
555
556    /// Register an active snapshot
557    pub fn register_snapshot(&self, timestamp: Timestamp) {
558        self.active_snapshots.write().insert(timestamp);
559    }
560
561    /// Unregister a snapshot
562    pub fn unregister_snapshot(&self, timestamp: Timestamp) {
563        self.active_snapshots.write().remove(&timestamp);
564    }
565
566    /// Get the oldest active snapshot
567    pub fn oldest_snapshot(&self) -> Option<Timestamp> {
568        self.active_snapshots.read().iter().min().copied()
569    }
570
571    /// Check if a version can be pruned
572    ///
573    /// A version can be pruned if:
574    /// 1. It's not the latest version
575    /// 2. It's older than max_version_age
576    /// 3. No active snapshot needs it
577    ///
578    /// A snapshot at time T needs all versions with timestamp <= T
579    /// (these are the versions that were visible at snapshot time)
580    pub fn can_prune_version(
581        &self,
582        version_timestamp: Timestamp,
583        is_latest: bool,
584        version_index: usize,
585    ) -> bool {
586        // Never prune the latest version
587        if is_latest {
588            return false;
589        }
590
591        // Check if any snapshot protects this version
592        // A snapshot at time T protects versions with timestamp <= T
593        if let Some(oldest) = self.oldest_snapshot() {
594            if version_timestamp <= oldest {
595                // Version was visible to the oldest snapshot, cannot prune
596                return false;
597            }
598        }
599
600        // Check max versions per key
601        if version_index >= self.config.max_versions_per_key {
602            return true;
603        }
604
605        // Check age-based pruning
606        if let Some(max_age) = self.config.max_version_age {
607            let now = self.current_time.load(AtomicOrdering::Relaxed);
608            let age_ms = now.saturating_sub(version_timestamp);
609            let max_age_ms = max_age.as_millis() as u64;
610
611            if age_ms > max_age_ms {
612                return true;
613            }
614        }
615
616        false
617    }
618
619    /// Check if a tombstone can be collected
620    ///
621    /// A tombstone can be collected only if:
622    /// 1. It's older than the grace period
623    /// 2. No active snapshot needs to see it
624    pub fn can_collect_tombstone(&self, tombstone_timestamp: Timestamp) -> bool {
625        let now = self.current_time.load(AtomicOrdering::Relaxed);
626        let grace_period_ms = self.config.tombstone_grace_period.as_millis() as u64;
627
628        if now.saturating_sub(tombstone_timestamp) < grace_period_ms {
629            return false;
630        }
631
632        // Check if any snapshot protects this tombstone
633        if let Some(oldest) = self.oldest_snapshot() {
634            if tombstone_timestamp <= oldest {
635                // Tombstone was visible to the oldest snapshot, cannot collect
636                return false;
637            }
638        }
639
640        true
641    }
642
643    /// Update current time
644    pub fn update_time(&self) {
645        let now = SystemTime::now()
646            .duration_since(UNIX_EPOCH)
647            .map(|d| d.as_millis() as u64)
648            .unwrap_or(0);
649        self.current_time.store(now, AtomicOrdering::Relaxed);
650    }
651}
652
653/// Compaction statistics
654#[derive(Debug, Default, Clone)]
655pub struct CompactionStats {
656    /// Number of compactions completed
657    pub compactions_completed: u64,
658    /// Total bytes read during compaction
659    pub bytes_read: u64,
660    /// Total bytes written during compaction
661    pub bytes_written: u64,
662    /// Entries processed
663    pub entries_processed: u64,
664    /// Tombstones collected
665    pub tombstones_collected: u64,
666    /// Old versions pruned
667    pub versions_pruned: u64,
668    /// Write amplification factor
669    pub write_amplification: f64,
670    /// Space amplification factor
671    pub space_amplification: f64,
672}
673
674// =============================================================================
675// Tests
676// =============================================================================
677
678#[cfg(test)]
679mod tests {
680    use super::*;
681
682    fn make_file(id: u64, level: u32, size: u64, smallest: &str, largest: &str) -> CompactionFile {
683        CompactionFile {
684            id,
685            level,
686            size,
687            smallest_key: smallest.as_bytes().to_vec(),
688            largest_key: largest.as_bytes().to_vec(),
689            num_entries: 1000,
690            num_deletions: 0,
691            num_old_versions: 0,
692            oldest_timestamp: 0,
693            newest_timestamp: 100,
694            created_at: Instant::now(),
695        }
696    }
697
698    #[test]
699    fn test_leveled_picker_l0() {
700        let picker = LeveledCompactionPicker::new(CompactionConfig {
701            l0_compaction_trigger: 4,
702            ..Default::default()
703        });
704
705        let mut state = CompactionState::default();
706        state.files_by_level = vec![
707            vec![
708                make_file(1, 0, 1000, "a", "d"),
709                make_file(2, 0, 1000, "c", "f"),
710                make_file(3, 0, 1000, "e", "h"),
711                make_file(4, 0, 1000, "g", "j"),
712            ],
713            vec![make_file(10, 1, 10000, "a", "z")],
714        ];
715
716        let job = picker.pick_compaction(&state);
717        assert!(job.is_some());
718
719        let job = job.unwrap();
720        assert_eq!(job.target_level, 1);
721        assert!(job.inputs.len() >= 4); // All L0 files + overlapping L1
722    }
723
724    #[test]
725    fn test_tombstone_density() {
726        let mut file = make_file(1, 0, 1000, "a", "z");
727        file.num_entries = 100;
728        file.num_deletions = 60;
729
730        assert!(file.tombstone_density() > 0.5);
731    }
732
733    #[test]
734    fn test_version_pruner() {
735        let config = RetentionConfig {
736            max_version_age: Some(Duration::from_secs(3600)),
737            max_versions_per_key: 5,
738            tombstone_grace_period: Duration::from_secs(60),
739            min_file_age: Duration::from_secs(60),
740        };
741
742        let pruner = VersionPruner::new(config);
743
744        // Latest version should never be pruned
745        assert!(!pruner.can_prune_version(0, true, 0));
746
747        // Version beyond max_versions_per_key can be pruned (if no snapshots)
748        assert!(pruner.can_prune_version(0, false, 10));
749
750        // Register a snapshot at timestamp 1000
751        let snapshot_ts = 1000;
752        pruner.register_snapshot(snapshot_ts);
753
754        // Version with timestamp 500 (before snapshot) is protected by the snapshot
755        // because the snapshot might need to read it
756        assert!(!pruner.can_prune_version(500, false, 10));
757
758        // Version with timestamp 1500 (after snapshot) can be pruned
759        // because it was written after the snapshot was taken
760        assert!(pruner.can_prune_version(1500, false, 10));
761    }
762
763    #[test]
764    fn test_level_targets() {
765        let picker = LeveledCompactionPicker::new(CompactionConfig {
766            max_bytes_for_level_base: 256 * 1024 * 1024,
767            max_bytes_for_level_multiplier: 10.0,
768            ..Default::default()
769        });
770
771        let state = CompactionState {
772            files_by_level: vec![vec![], vec![], vec![], vec![]],
773            size_by_level: vec![0, 0, 0, 0],
774            current_time: 0,
775            oldest_snapshot: 0,
776        };
777
778        let targets = picker.calculate_level_targets(&state);
779
780        assert_eq!(targets[0], 0); // L0 has no size target
781        assert_eq!(targets[1], 256 * 1024 * 1024);
782        assert_eq!(targets[2], 2560 * 1024 * 1024); // 10x L1
783    }
784
785    #[test]
786    fn test_compaction_priority() {
787        assert!(CompactionPriority::Urgent > CompactionPriority::High);
788        assert!(CompactionPriority::High > CompactionPriority::Normal);
789        assert!(CompactionPriority::Normal > CompactionPriority::Low);
790    }
791
792    #[test]
793    fn test_file_overlaps() {
794        let file = make_file(1, 0, 1000, "d", "h");
795
796        assert!(file.overlaps(b"a", b"e")); // Overlaps start
797        assert!(file.overlaps(b"f", b"z")); // Overlaps end
798        assert!(file.overlaps(b"e", b"g")); // Contained
799        assert!(file.overlaps(b"a", b"z")); // Contains file
800        assert!(!file.overlaps(b"a", b"c")); // Before
801        assert!(!file.overlaps(b"i", b"z")); // After
802    }
803}