Skip to main content

sochdb_storage/
compaction_policy.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Compaction Policy for Version and Tombstone Pruning
19//!
20//! This module provides intelligent compaction policies that:
21//! - Aggressively prune old versions beyond retention window
22//! - Collect tombstones that have expired
23//! - Minimize write amplification (WA) and space amplification (SA)
24//! - Balance between read performance and compaction overhead
25//!
26//! ## Compaction Strategies
27//!
28//! 1. **Leveled Compaction**: Traditional RocksDB-style L0 → L1 → ... compaction
29//! 2. **Universal Compaction**: Minimize WA for write-heavy workloads
30//! 3. **FIFO Compaction**: Age-based TTL with minimal overhead
31//! 4. **Tiered Compaction**: Hybrid approach balancing WA and SA
32
33use std::cmp::Ordering;
34use std::collections::HashSet;
35use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
36use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
37
38use parking_lot::RwLock;
39
40/// Timestamp type (milliseconds since epoch)
41pub type Timestamp = u64;
42
43/// Version retention configuration
44#[derive(Debug, Clone)]
45pub struct RetentionConfig {
46    /// Maximum age for old versions (None = keep forever)
47    pub max_version_age: Option<Duration>,
48    /// Maximum number of versions to keep per key
49    pub max_versions_per_key: usize,
50    /// Tombstone grace period before collection
51    pub tombstone_grace_period: Duration,
52    /// Minimum age before a file is eligible for compaction
53    pub min_file_age: Duration,
54}
55
56impl Default for RetentionConfig {
57    fn default() -> Self {
58        Self {
59            max_version_age: Some(Duration::from_secs(7 * 24 * 60 * 60)), // 7 days
60            max_versions_per_key: 10,
61            tombstone_grace_period: Duration::from_secs(24 * 60 * 60), // 24 hours
62            min_file_age: Duration::from_secs(60),                     // 1 minute
63        }
64    }
65}
66
67/// Compaction strategy
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub enum CompactionStrategy {
70    /// Leveled compaction (low SA, higher WA)
71    Leveled,
72    /// Universal compaction (low WA, higher SA)
73    Universal,
74    /// FIFO compaction (TTL-based, minimal overhead)
75    Fifo,
76    /// Tiered compaction (balanced WA/SA)
77    Tiered,
78}
79
80impl Default for CompactionStrategy {
81    fn default() -> Self {
82        Self::Leveled
83    }
84}
85
86/// Compaction policy configuration
87#[derive(Debug, Clone)]
88pub struct CompactionConfig {
89    /// Compaction strategy
90    pub strategy: CompactionStrategy,
91    /// Retention configuration
92    pub retention: RetentionConfig,
93    /// Level 0 file limit before triggering compaction
94    pub l0_compaction_trigger: usize,
95    /// Maximum level 0 files before stopping writes
96    pub l0_stop_writes_trigger: usize,
97    /// Maximum bytes for level 1
98    pub max_bytes_for_level_base: u64,
99    /// Level multiplier (bytes_for_level[n+1] = multiplier * bytes_for_level[n])
100    pub max_bytes_for_level_multiplier: f64,
101    /// Target file size for base level
102    pub target_file_size_base: u64,
103    /// File size multiplier per level
104    pub target_file_size_multiplier: f64,
105    /// Maximum number of concurrent compactions
106    pub max_concurrent_compactions: usize,
107    /// Percentage of reads to sample for tombstone density
108    pub tombstone_sample_rate: f64,
109}
110
111impl Default for CompactionConfig {
112    fn default() -> Self {
113        Self {
114            strategy: CompactionStrategy::Leveled,
115            retention: RetentionConfig::default(),
116            l0_compaction_trigger: 4,
117            l0_stop_writes_trigger: 12,
118            max_bytes_for_level_base: 256 * 1024 * 1024, // 256 MB
119            max_bytes_for_level_multiplier: 10.0,
120            target_file_size_base: 64 * 1024 * 1024, // 64 MB
121            target_file_size_multiplier: 1.0,
122            max_concurrent_compactions: 4,
123            tombstone_sample_rate: 0.01,
124        }
125    }
126}
127
128/// File metadata for compaction decisions
129#[derive(Debug, Clone)]
130pub struct CompactionFile {
131    /// File ID
132    pub id: u64,
133    /// Level (0 = memtable flush, 1+ = compacted levels)
134    pub level: u32,
135    /// File size in bytes
136    pub size: u64,
137    /// Smallest key
138    pub smallest_key: Vec<u8>,
139    /// Largest key
140    pub largest_key: Vec<u8>,
141    /// Number of entries
142    pub num_entries: u64,
143    /// Number of deletions (tombstones)
144    pub num_deletions: u64,
145    /// Number of old versions (non-latest)
146    pub num_old_versions: u64,
147    /// Oldest entry timestamp
148    pub oldest_timestamp: Timestamp,
149    /// Newest entry timestamp
150    pub newest_timestamp: Timestamp,
151    /// Creation time
152    pub created_at: Instant,
153}
154
155impl CompactionFile {
156    /// Calculate tombstone density
157    pub fn tombstone_density(&self) -> f64 {
158        if self.num_entries == 0 {
159            0.0
160        } else {
161            self.num_deletions as f64 / self.num_entries as f64
162        }
163    }
164
165    /// Calculate version density (ratio of old versions)
166    pub fn version_density(&self) -> f64 {
167        if self.num_entries == 0 {
168            0.0
169        } else {
170            self.num_old_versions as f64 / self.num_entries as f64
171        }
172    }
173
174    /// Calculate garbage ratio (tombstones + old versions)
175    pub fn garbage_ratio(&self) -> f64 {
176        if self.num_entries == 0 {
177            0.0
178        } else {
179            (self.num_deletions + self.num_old_versions) as f64 / self.num_entries as f64
180        }
181    }
182
183    /// Check if file overlaps with key range
184    pub fn overlaps(&self, smallest: &[u8], largest: &[u8]) -> bool {
185        self.smallest_key.as_slice() <= largest && self.largest_key.as_slice() >= smallest
186    }
187}
188
189/// Compaction job description
190#[derive(Debug, Clone)]
191pub struct CompactionJob {
192    /// Job ID
193    pub id: u64,
194    /// Input files
195    pub inputs: Vec<CompactionFile>,
196    /// Target level
197    pub target_level: u32,
198    /// Priority (higher = more urgent)
199    pub priority: CompactionPriority,
200    /// Estimated output size
201    pub estimated_output_size: u64,
202    /// Reason for compaction
203    pub reason: CompactionReason,
204}
205
206/// Compaction priority
207#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
208pub enum CompactionPriority {
209    /// Low priority - background maintenance
210    Low = 0,
211    /// Normal priority - regular compaction
212    Normal = 1,
213    /// High priority - L0 file count is getting high
214    High = 2,
215    /// Urgent - approaching write stall
216    Urgent = 3,
217}
218
219/// Reason for triggering compaction
220#[derive(Debug, Clone, Copy, PartialEq, Eq)]
221pub enum CompactionReason {
222    /// L0 file count trigger
223    L0FileCount,
224    /// Level size limit exceeded
225    LevelSizeLimit,
226    /// High tombstone density
227    TombstoneDensity,
228    /// High old version density
229    VersionPruning,
230    /// TTL expiration
231    TtlExpiration,
232    /// Manual trigger
233    Manual,
234    /// Forced flush
235    ForcedFlush,
236}
237
238/// Compaction picker - selects files for compaction
239pub trait CompactionPicker: Send + Sync {
240    /// Pick the next compaction job
241    fn pick_compaction(&self, state: &CompactionState) -> Option<CompactionJob>;
242
243    /// Calculate level targets
244    fn calculate_level_targets(&self, state: &CompactionState) -> Vec<u64>;
245}
246
247/// State of the LSM tree for compaction decisions
248#[derive(Debug, Default)]
249pub struct CompactionState {
250    /// Files by level
251    pub files_by_level: Vec<Vec<CompactionFile>>,
252    /// Total size by level
253    pub size_by_level: Vec<u64>,
254    /// Current timestamp for age calculations
255    pub current_time: Timestamp,
256    /// Oldest readable snapshot
257    pub oldest_snapshot: Timestamp,
258}
259
260impl CompactionState {
261    /// Get total number of L0 files
262    pub fn l0_file_count(&self) -> usize {
263        self.files_by_level.get(0).map(|f| f.len()).unwrap_or(0)
264    }
265
266    /// Get level count
267    pub fn level_count(&self) -> usize {
268        self.files_by_level.len()
269    }
270
271    /// Find overlapping files in a level
272    pub fn find_overlapping(
273        &self,
274        level: usize,
275        smallest: &[u8],
276        largest: &[u8],
277    ) -> Vec<&CompactionFile> {
278        self.files_by_level
279            .get(level)
280            .map(|files| {
281                files
282                    .iter()
283                    .filter(|f| f.overlaps(smallest, largest))
284                    .collect()
285            })
286            .unwrap_or_default()
287    }
288}
289
290/// Leveled compaction picker
291pub struct LeveledCompactionPicker {
292    config: CompactionConfig,
293    job_counter: AtomicU64,
294}
295
296impl LeveledCompactionPicker {
297    pub fn new(config: CompactionConfig) -> Self {
298        Self {
299            config,
300            job_counter: AtomicU64::new(0),
301        }
302    }
303
304    /// Pick L0 compaction
305    fn pick_l0_compaction(&self, state: &CompactionState) -> Option<CompactionJob> {
306        let l0_files = state.files_by_level.get(0)?;
307
308        if l0_files.len() < self.config.l0_compaction_trigger {
309            return None;
310        }
311
312        // All L0 files go to L1
313        let inputs: Vec<_> = l0_files.clone();
314        if inputs.is_empty() {
315            return None;
316        }
317
318        // Find overlapping L1 files
319        let smallest = inputs.iter().map(|f| f.smallest_key.as_slice()).min()?;
320        let largest = inputs.iter().map(|f| f.largest_key.as_slice()).max()?;
321
322        let l1_overlapping = state.find_overlapping(1, smallest, largest);
323        let mut all_inputs = inputs;
324        all_inputs.extend(l1_overlapping.into_iter().cloned());
325
326        let priority = if l0_files.len() >= self.config.l0_stop_writes_trigger {
327            CompactionPriority::Urgent
328        } else if l0_files.len() >= self.config.l0_compaction_trigger * 2 {
329            CompactionPriority::High
330        } else {
331            CompactionPriority::Normal
332        };
333
334        let estimated_output_size: u64 = all_inputs.iter().map(|f| f.size).sum();
335
336        Some(CompactionJob {
337            id: self.job_counter.fetch_add(1, AtomicOrdering::SeqCst),
338            inputs: all_inputs,
339            target_level: 1,
340            priority,
341            estimated_output_size,
342            reason: CompactionReason::L0FileCount,
343        })
344    }
345
346    /// Pick level-to-level compaction based on size
347    fn pick_level_compaction(&self, state: &CompactionState) -> Option<CompactionJob> {
348        let targets = self.calculate_level_targets(state);
349
350        // Find level that most exceeds its target
351        let mut max_score = 0.0f64;
352        let mut pick_level = None;
353
354        for level in 1..state.level_count() {
355            if level >= targets.len() {
356                continue;
357            }
358            let target = targets[level];
359            if target == 0 {
360                continue;
361            }
362            let actual = state.size_by_level.get(level).copied().unwrap_or(0);
363            let score = actual as f64 / target as f64;
364            if score > max_score && score > 1.0 {
365                max_score = score;
366                pick_level = Some(level);
367            }
368        }
369
370        let level = pick_level?;
371        let files = state.files_by_level.get(level)?;
372
373        // Pick file with most garbage for version/tombstone pruning
374        let pick_file = files
375            .iter()
376            .max_by(|a, b| {
377                a.garbage_ratio()
378                    .partial_cmp(&b.garbage_ratio())
379                    .unwrap_or(Ordering::Equal)
380            })?
381            .clone();
382
383        // Find overlapping files in next level
384        let next_level = level + 1;
385        let overlapping =
386            state.find_overlapping(next_level, &pick_file.smallest_key, &pick_file.largest_key);
387
388        let mut inputs = vec![pick_file];
389        inputs.extend(overlapping.into_iter().cloned());
390
391        let estimated_output_size: u64 = inputs.iter().map(|f| f.size).sum();
392
393        Some(CompactionJob {
394            id: self.job_counter.fetch_add(1, AtomicOrdering::SeqCst),
395            inputs,
396            target_level: next_level as u32,
397            priority: CompactionPriority::Normal,
398            estimated_output_size,
399            reason: if max_score > 2.0 {
400                CompactionReason::LevelSizeLimit
401            } else {
402                CompactionReason::VersionPruning
403            },
404        })
405    }
406
407    /// Pick tombstone-driven compaction
408    fn pick_tombstone_compaction(&self, state: &CompactionState) -> Option<CompactionJob> {
409        // Find file with highest tombstone density
410        for level in 0..state.level_count() {
411            if let Some(files) = state.files_by_level.get(level) {
412                for file in files {
413                    // Compact if tombstone density > 50%
414                    if file.tombstone_density() > 0.5 {
415                        let overlapping = state.find_overlapping(
416                            level + 1,
417                            &file.smallest_key,
418                            &file.largest_key,
419                        );
420
421                        let mut inputs = vec![file.clone()];
422                        inputs.extend(overlapping.into_iter().cloned());
423
424                        return Some(CompactionJob {
425                            id: self.job_counter.fetch_add(1, AtomicOrdering::SeqCst),
426                            inputs,
427                            target_level: (level + 1) as u32,
428                            priority: CompactionPriority::Normal,
429                            estimated_output_size: file.size / 2, // Estimate 50% reduction
430                            reason: CompactionReason::TombstoneDensity,
431                        });
432                    }
433                }
434            }
435        }
436
437        None
438    }
439}
440
441impl CompactionPicker for LeveledCompactionPicker {
442    fn pick_compaction(&self, state: &CompactionState) -> Option<CompactionJob> {
443        // Priority order:
444        // 1. L0 compaction (avoid write stalls)
445        // 2. Tombstone-driven compaction (reclaim space)
446        // 3. Level size compaction
447
448        self.pick_l0_compaction(state)
449            .or_else(|| self.pick_tombstone_compaction(state))
450            .or_else(|| self.pick_level_compaction(state))
451    }
452
453    fn calculate_level_targets(&self, state: &CompactionState) -> Vec<u64> {
454        let mut targets = vec![0u64; state.level_count().max(7)];
455
456        // L0 doesn't have a size target, it has a file count trigger
457        targets[0] = 0;
458
459        // L1 is the base
460        targets[1] = self.config.max_bytes_for_level_base;
461
462        // Each subsequent level is multiplied
463        for level in 2..targets.len() {
464            targets[level] =
465                (targets[level - 1] as f64 * self.config.max_bytes_for_level_multiplier) as u64;
466        }
467
468        targets
469    }
470}
471
472/// Universal compaction picker (for write-heavy workloads)
473pub struct UniversalCompactionPicker {
474    config: CompactionConfig,
475    job_counter: AtomicU64,
476}
477
478impl UniversalCompactionPicker {
479    pub fn new(config: CompactionConfig) -> Self {
480        Self {
481            config,
482            job_counter: AtomicU64::new(0),
483        }
484    }
485}
486
487impl CompactionPicker for UniversalCompactionPicker {
488    fn pick_compaction(&self, state: &CompactionState) -> Option<CompactionJob> {
489        // Universal compaction: pick sorted runs to merge based on size ratios
490        let all_files: Vec<_> = state.files_by_level.iter().flatten().cloned().collect();
491
492        if all_files.len() < 2 {
493            return None;
494        }
495
496        // Sort by creation time (oldest first for FIFO-like behavior)
497        let mut sorted_files = all_files;
498        sorted_files.sort_by(|a, b| a.created_at.cmp(&b.created_at));
499
500        // Pick adjacent runs where size ratio is acceptable
501        let size_ratio_threshold = 2.0;
502        let mut inputs = Vec::new();
503        let mut total_size = 0u64;
504
505        for file in sorted_files {
506            if inputs.is_empty() || (total_size as f64 / file.size as f64) < size_ratio_threshold {
507                total_size += file.size;
508                inputs.push(file);
509            } else {
510                break;
511            }
512        }
513
514        if inputs.len() < 2 {
515            return None;
516        }
517
518        Some(CompactionJob {
519            id: self.job_counter.fetch_add(1, AtomicOrdering::SeqCst),
520            inputs,
521            target_level: 0, // Universal keeps everything at L0
522            priority: CompactionPriority::Normal,
523            estimated_output_size: total_size,
524            reason: CompactionReason::LevelSizeLimit,
525        })
526    }
527
528    fn calculate_level_targets(&self, _state: &CompactionState) -> Vec<u64> {
529        // Universal compaction doesn't use level targets
530        vec![]
531    }
532}
533
534/// Version pruning filter
535///
536/// Determines which versions can be safely garbage collected.
537pub struct VersionPruner {
538    /// Retention configuration
539    config: RetentionConfig,
540    /// Set of active snapshot timestamps
541    active_snapshots: RwLock<HashSet<Timestamp>>,
542    /// Current timestamp
543    current_time: AtomicU64,
544}
545
546impl VersionPruner {
547    pub fn new(config: RetentionConfig) -> Self {
548        let now = SystemTime::now()
549            .duration_since(UNIX_EPOCH)
550            .map(|d| d.as_millis() as u64)
551            .unwrap_or(0);
552
553        Self {
554            config,
555            active_snapshots: RwLock::new(HashSet::new()),
556            current_time: AtomicU64::new(now),
557        }
558    }
559
560    /// Register an active snapshot
561    pub fn register_snapshot(&self, timestamp: Timestamp) {
562        self.active_snapshots.write().insert(timestamp);
563    }
564
565    /// Unregister a snapshot
566    pub fn unregister_snapshot(&self, timestamp: Timestamp) {
567        self.active_snapshots.write().remove(&timestamp);
568    }
569
570    /// Get the oldest active snapshot
571    pub fn oldest_snapshot(&self) -> Option<Timestamp> {
572        self.active_snapshots.read().iter().min().copied()
573    }
574
575    /// Check if a version can be pruned
576    ///
577    /// A version can be pruned if:
578    /// 1. It's not the latest version
579    /// 2. It's older than max_version_age
580    /// 3. No active snapshot needs it
581    ///
582    /// A snapshot at time T needs all versions with timestamp <= T
583    /// (these are the versions that were visible at snapshot time)
584    pub fn can_prune_version(
585        &self,
586        version_timestamp: Timestamp,
587        is_latest: bool,
588        version_index: usize,
589    ) -> bool {
590        // Never prune the latest version
591        if is_latest {
592            return false;
593        }
594
595        // Check if any snapshot protects this version
596        // A snapshot at time T protects versions with timestamp <= T
597        if let Some(oldest) = self.oldest_snapshot() {
598            if version_timestamp <= oldest {
599                // Version was visible to the oldest snapshot, cannot prune
600                return false;
601            }
602        }
603
604        // Check max versions per key
605        if version_index >= self.config.max_versions_per_key {
606            return true;
607        }
608
609        // Check age-based pruning
610        if let Some(max_age) = self.config.max_version_age {
611            let now = self.current_time.load(AtomicOrdering::Relaxed);
612            let age_ms = now.saturating_sub(version_timestamp);
613            let max_age_ms = max_age.as_millis() as u64;
614
615            if age_ms > max_age_ms {
616                return true;
617            }
618        }
619
620        false
621    }
622
623    /// Check if a tombstone can be collected
624    ///
625    /// A tombstone can be collected only if:
626    /// 1. It's older than the grace period
627    /// 2. No active snapshot needs to see it
628    pub fn can_collect_tombstone(&self, tombstone_timestamp: Timestamp) -> bool {
629        let now = self.current_time.load(AtomicOrdering::Relaxed);
630        let grace_period_ms = self.config.tombstone_grace_period.as_millis() as u64;
631
632        if now.saturating_sub(tombstone_timestamp) < grace_period_ms {
633            return false;
634        }
635
636        // Check if any snapshot protects this tombstone
637        if let Some(oldest) = self.oldest_snapshot() {
638            if tombstone_timestamp <= oldest {
639                // Tombstone was visible to the oldest snapshot, cannot collect
640                return false;
641            }
642        }
643
644        true
645    }
646
647    /// Update current time
648    pub fn update_time(&self) {
649        let now = SystemTime::now()
650            .duration_since(UNIX_EPOCH)
651            .map(|d| d.as_millis() as u64)
652            .unwrap_or(0);
653        self.current_time.store(now, AtomicOrdering::Relaxed);
654    }
655}
656
657/// Compaction statistics
658#[derive(Debug, Default, Clone)]
659pub struct CompactionStats {
660    /// Number of compactions completed
661    pub compactions_completed: u64,
662    /// Total bytes read during compaction
663    pub bytes_read: u64,
664    /// Total bytes written during compaction
665    pub bytes_written: u64,
666    /// Entries processed
667    pub entries_processed: u64,
668    /// Tombstones collected
669    pub tombstones_collected: u64,
670    /// Old versions pruned
671    pub versions_pruned: u64,
672    /// Write amplification factor
673    pub write_amplification: f64,
674    /// Space amplification factor
675    pub space_amplification: f64,
676}
677
678// =============================================================================
679// Tests
680// =============================================================================
681
682#[cfg(test)]
683mod tests {
684    use super::*;
685
686    fn make_file(id: u64, level: u32, size: u64, smallest: &str, largest: &str) -> CompactionFile {
687        CompactionFile {
688            id,
689            level,
690            size,
691            smallest_key: smallest.as_bytes().to_vec(),
692            largest_key: largest.as_bytes().to_vec(),
693            num_entries: 1000,
694            num_deletions: 0,
695            num_old_versions: 0,
696            oldest_timestamp: 0,
697            newest_timestamp: 100,
698            created_at: Instant::now(),
699        }
700    }
701
702    #[test]
703    fn test_leveled_picker_l0() {
704        let picker = LeveledCompactionPicker::new(CompactionConfig {
705            l0_compaction_trigger: 4,
706            ..Default::default()
707        });
708
709        let mut state = CompactionState::default();
710        state.files_by_level = vec![
711            vec![
712                make_file(1, 0, 1000, "a", "d"),
713                make_file(2, 0, 1000, "c", "f"),
714                make_file(3, 0, 1000, "e", "h"),
715                make_file(4, 0, 1000, "g", "j"),
716            ],
717            vec![make_file(10, 1, 10000, "a", "z")],
718        ];
719
720        let job = picker.pick_compaction(&state);
721        assert!(job.is_some());
722
723        let job = job.unwrap();
724        assert_eq!(job.target_level, 1);
725        assert!(job.inputs.len() >= 4); // All L0 files + overlapping L1
726    }
727
728    #[test]
729    fn test_tombstone_density() {
730        let mut file = make_file(1, 0, 1000, "a", "z");
731        file.num_entries = 100;
732        file.num_deletions = 60;
733
734        assert!(file.tombstone_density() > 0.5);
735    }
736
737    #[test]
738    fn test_version_pruner() {
739        let config = RetentionConfig {
740            max_version_age: Some(Duration::from_secs(3600)),
741            max_versions_per_key: 5,
742            tombstone_grace_period: Duration::from_secs(60),
743            min_file_age: Duration::from_secs(60),
744        };
745
746        let pruner = VersionPruner::new(config);
747
748        // Latest version should never be pruned
749        assert!(!pruner.can_prune_version(0, true, 0));
750
751        // Version beyond max_versions_per_key can be pruned (if no snapshots)
752        assert!(pruner.can_prune_version(0, false, 10));
753
754        // Register a snapshot at timestamp 1000
755        let snapshot_ts = 1000;
756        pruner.register_snapshot(snapshot_ts);
757
758        // Version with timestamp 500 (before snapshot) is protected by the snapshot
759        // because the snapshot might need to read it
760        assert!(!pruner.can_prune_version(500, false, 10));
761
762        // Version with timestamp 1500 (after snapshot) can be pruned
763        // because it was written after the snapshot was taken
764        assert!(pruner.can_prune_version(1500, false, 10));
765    }
766
767    #[test]
768    fn test_level_targets() {
769        let picker = LeveledCompactionPicker::new(CompactionConfig {
770            max_bytes_for_level_base: 256 * 1024 * 1024,
771            max_bytes_for_level_multiplier: 10.0,
772            ..Default::default()
773        });
774
775        let state = CompactionState {
776            files_by_level: vec![vec![], vec![], vec![], vec![]],
777            size_by_level: vec![0, 0, 0, 0],
778            current_time: 0,
779            oldest_snapshot: 0,
780        };
781
782        let targets = picker.calculate_level_targets(&state);
783
784        assert_eq!(targets[0], 0); // L0 has no size target
785        assert_eq!(targets[1], 256 * 1024 * 1024);
786        assert_eq!(targets[2], 2560 * 1024 * 1024); // 10x L1
787    }
788
789    #[test]
790    fn test_compaction_priority() {
791        assert!(CompactionPriority::Urgent > CompactionPriority::High);
792        assert!(CompactionPriority::High > CompactionPriority::Normal);
793        assert!(CompactionPriority::Normal > CompactionPriority::Low);
794    }
795
796    #[test]
797    fn test_file_overlaps() {
798        let file = make_file(1, 0, 1000, "d", "h");
799
800        assert!(file.overlaps(b"a", b"e")); // Overlaps start
801        assert!(file.overlaps(b"f", b"z")); // Overlaps end
802        assert!(file.overlaps(b"e", b"g")); // Contained
803        assert!(file.overlaps(b"a", b"z")); // Contains file
804        assert!(!file.overlaps(b"a", b"c")); // Before
805        assert!(!file.overlaps(b"i", b"z")); // After
806    }
807}