sochdb_storage/
compaction_policy.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Compaction Policy for Version and Tombstone Pruning
16//!
17//! This module provides intelligent compaction policies that:
18//! - Aggressively prune old versions beyond retention window
19//! - Collect tombstones that have expired
20//! - Minimize write amplification (WA) and space amplification (SA)
21//! - Balance between read performance and compaction overhead
22//!
23//! ## Compaction Strategies
24//!
25//! 1. **Leveled Compaction**: Traditional RocksDB-style L0 → L1 → ... compaction
26//! 2. **Universal Compaction**: Minimize WA for write-heavy workloads
27//! 3. **FIFO Compaction**: Age-based TTL with minimal overhead
28//! 4. **Tiered Compaction**: Hybrid approach balancing WA and SA
29
30use std::collections::{BinaryHeap, HashMap, HashSet};
31use std::cmp::Ordering;
32use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
33use std::sync::Arc;
34use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
35
36use parking_lot::RwLock;
37
38/// Timestamp type (milliseconds since epoch)
39pub type Timestamp = u64;
40
41/// Version retention configuration
42#[derive(Debug, Clone)]
43pub struct RetentionConfig {
44    /// Maximum age for old versions (None = keep forever)
45    pub max_version_age: Option<Duration>,
46    /// Maximum number of versions to keep per key
47    pub max_versions_per_key: usize,
48    /// Tombstone grace period before collection
49    pub tombstone_grace_period: Duration,
50    /// Minimum age before a file is eligible for compaction
51    pub min_file_age: Duration,
52}
53
54impl Default for RetentionConfig {
55    fn default() -> Self {
56        Self {
57            max_version_age: Some(Duration::from_secs(7 * 24 * 60 * 60)), // 7 days
58            max_versions_per_key: 10,
59            tombstone_grace_period: Duration::from_secs(24 * 60 * 60), // 24 hours
60            min_file_age: Duration::from_secs(60), // 1 minute
61        }
62    }
63}
64
65/// Compaction strategy
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum CompactionStrategy {
68    /// Leveled compaction (low SA, higher WA)
69    Leveled,
70    /// Universal compaction (low WA, higher SA)
71    Universal,
72    /// FIFO compaction (TTL-based, minimal overhead)
73    Fifo,
74    /// Tiered compaction (balanced WA/SA)
75    Tiered,
76}
77
78impl Default for CompactionStrategy {
79    fn default() -> Self {
80        Self::Leveled
81    }
82}
83
84/// Compaction policy configuration
85#[derive(Debug, Clone)]
86pub struct CompactionConfig {
87    /// Compaction strategy
88    pub strategy: CompactionStrategy,
89    /// Retention configuration
90    pub retention: RetentionConfig,
91    /// Level 0 file limit before triggering compaction
92    pub l0_compaction_trigger: usize,
93    /// Maximum level 0 files before stopping writes
94    pub l0_stop_writes_trigger: usize,
95    /// Maximum bytes for level 1
96    pub max_bytes_for_level_base: u64,
97    /// Level multiplier (bytes_for_level[n+1] = multiplier * bytes_for_level[n])
98    pub max_bytes_for_level_multiplier: f64,
99    /// Target file size for base level
100    pub target_file_size_base: u64,
101    /// File size multiplier per level
102    pub target_file_size_multiplier: f64,
103    /// Maximum number of concurrent compactions
104    pub max_concurrent_compactions: usize,
105    /// Percentage of reads to sample for tombstone density
106    pub tombstone_sample_rate: f64,
107}
108
109impl Default for CompactionConfig {
110    fn default() -> Self {
111        Self {
112            strategy: CompactionStrategy::Leveled,
113            retention: RetentionConfig::default(),
114            l0_compaction_trigger: 4,
115            l0_stop_writes_trigger: 12,
116            max_bytes_for_level_base: 256 * 1024 * 1024, // 256 MB
117            max_bytes_for_level_multiplier: 10.0,
118            target_file_size_base: 64 * 1024 * 1024, // 64 MB
119            target_file_size_multiplier: 1.0,
120            max_concurrent_compactions: 4,
121            tombstone_sample_rate: 0.01,
122        }
123    }
124}
125
126/// File metadata for compaction decisions
127#[derive(Debug, Clone)]
128pub struct CompactionFile {
129    /// File ID
130    pub id: u64,
131    /// Level (0 = memtable flush, 1+ = compacted levels)
132    pub level: u32,
133    /// File size in bytes
134    pub size: u64,
135    /// Smallest key
136    pub smallest_key: Vec<u8>,
137    /// Largest key
138    pub largest_key: Vec<u8>,
139    /// Number of entries
140    pub num_entries: u64,
141    /// Number of deletions (tombstones)
142    pub num_deletions: u64,
143    /// Number of old versions (non-latest)
144    pub num_old_versions: u64,
145    /// Oldest entry timestamp
146    pub oldest_timestamp: Timestamp,
147    /// Newest entry timestamp
148    pub newest_timestamp: Timestamp,
149    /// Creation time
150    pub created_at: Instant,
151}
152
153impl CompactionFile {
154    /// Calculate tombstone density
155    pub fn tombstone_density(&self) -> f64 {
156        if self.num_entries == 0 {
157            0.0
158        } else {
159            self.num_deletions as f64 / self.num_entries as f64
160        }
161    }
162
163    /// Calculate version density (ratio of old versions)
164    pub fn version_density(&self) -> f64 {
165        if self.num_entries == 0 {
166            0.0
167        } else {
168            self.num_old_versions as f64 / self.num_entries as f64
169        }
170    }
171
172    /// Calculate garbage ratio (tombstones + old versions)
173    pub fn garbage_ratio(&self) -> f64 {
174        if self.num_entries == 0 {
175            0.0
176        } else {
177            (self.num_deletions + self.num_old_versions) as f64 / self.num_entries as f64
178        }
179    }
180
181    /// Check if file overlaps with key range
182    pub fn overlaps(&self, smallest: &[u8], largest: &[u8]) -> bool {
183        self.smallest_key.as_slice() <= largest && self.largest_key.as_slice() >= smallest
184    }
185}
186
187/// Compaction job description
188#[derive(Debug, Clone)]
189pub struct CompactionJob {
190    /// Job ID
191    pub id: u64,
192    /// Input files
193    pub inputs: Vec<CompactionFile>,
194    /// Target level
195    pub target_level: u32,
196    /// Priority (higher = more urgent)
197    pub priority: CompactionPriority,
198    /// Estimated output size
199    pub estimated_output_size: u64,
200    /// Reason for compaction
201    pub reason: CompactionReason,
202}
203
204/// Compaction priority
205#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
206pub enum CompactionPriority {
207    /// Low priority - background maintenance
208    Low = 0,
209    /// Normal priority - regular compaction
210    Normal = 1,
211    /// High priority - L0 file count is getting high
212    High = 2,
213    /// Urgent - approaching write stall
214    Urgent = 3,
215}
216
217/// Reason for triggering compaction
218#[derive(Debug, Clone, Copy, PartialEq, Eq)]
219pub enum CompactionReason {
220    /// L0 file count trigger
221    L0FileCount,
222    /// Level size limit exceeded
223    LevelSizeLimit,
224    /// High tombstone density
225    TombstoneDensity,
226    /// High old version density
227    VersionPruning,
228    /// TTL expiration
229    TtlExpiration,
230    /// Manual trigger
231    Manual,
232    /// Forced flush
233    ForcedFlush,
234}
235
236/// Compaction picker - selects files for compaction
237pub trait CompactionPicker: Send + Sync {
238    /// Pick the next compaction job
239    fn pick_compaction(&self, state: &CompactionState) -> Option<CompactionJob>;
240
241    /// Calculate level targets
242    fn calculate_level_targets(&self, state: &CompactionState) -> Vec<u64>;
243}
244
245/// State of the LSM tree for compaction decisions
246#[derive(Debug, Default)]
247pub struct CompactionState {
248    /// Files by level
249    pub files_by_level: Vec<Vec<CompactionFile>>,
250    /// Total size by level
251    pub size_by_level: Vec<u64>,
252    /// Current timestamp for age calculations
253    pub current_time: Timestamp,
254    /// Oldest readable snapshot
255    pub oldest_snapshot: Timestamp,
256}
257
258impl CompactionState {
259    /// Get total number of L0 files
260    pub fn l0_file_count(&self) -> usize {
261        self.files_by_level.get(0).map(|f| f.len()).unwrap_or(0)
262    }
263
264    /// Get level count
265    pub fn level_count(&self) -> usize {
266        self.files_by_level.len()
267    }
268
269    /// Find overlapping files in a level
270    pub fn find_overlapping(&self, level: usize, smallest: &[u8], largest: &[u8]) -> Vec<&CompactionFile> {
271        self.files_by_level
272            .get(level)
273            .map(|files| {
274                files.iter().filter(|f| f.overlaps(smallest, largest)).collect()
275            })
276            .unwrap_or_default()
277    }
278}
279
280/// Leveled compaction picker
281pub struct LeveledCompactionPicker {
282    config: CompactionConfig,
283    job_counter: AtomicU64,
284}
285
286impl LeveledCompactionPicker {
287    pub fn new(config: CompactionConfig) -> Self {
288        Self {
289            config,
290            job_counter: AtomicU64::new(0),
291        }
292    }
293
294    /// Pick L0 compaction
295    fn pick_l0_compaction(&self, state: &CompactionState) -> Option<CompactionJob> {
296        let l0_files = state.files_by_level.get(0)?;
297        
298        if l0_files.len() < self.config.l0_compaction_trigger {
299            return None;
300        }
301
302        // All L0 files go to L1
303        let inputs: Vec<_> = l0_files.clone();
304        if inputs.is_empty() {
305            return None;
306        }
307
308        // Find overlapping L1 files
309        let smallest = inputs.iter().map(|f| f.smallest_key.as_slice()).min()?;
310        let largest = inputs.iter().map(|f| f.largest_key.as_slice()).max()?;
311
312        let l1_overlapping = state.find_overlapping(1, smallest, largest);
313        let mut all_inputs = inputs;
314        all_inputs.extend(l1_overlapping.into_iter().cloned());
315
316        let priority = if l0_files.len() >= self.config.l0_stop_writes_trigger {
317            CompactionPriority::Urgent
318        } else if l0_files.len() >= self.config.l0_compaction_trigger * 2 {
319            CompactionPriority::High
320        } else {
321            CompactionPriority::Normal
322        };
323
324        let estimated_output_size: u64 = all_inputs.iter().map(|f| f.size).sum();
325
326        Some(CompactionJob {
327            id: self.job_counter.fetch_add(1, AtomicOrdering::SeqCst),
328            inputs: all_inputs,
329            target_level: 1,
330            priority,
331            estimated_output_size,
332            reason: CompactionReason::L0FileCount,
333        })
334    }
335
336    /// Pick level-to-level compaction based on size
337    fn pick_level_compaction(&self, state: &CompactionState) -> Option<CompactionJob> {
338        let targets = self.calculate_level_targets(state);
339
340        // Find level that most exceeds its target
341        let mut max_score = 0.0f64;
342        let mut pick_level = None;
343
344        for level in 1..state.level_count() {
345            if level >= targets.len() {
346                continue;
347            }
348            let target = targets[level];
349            if target == 0 {
350                continue;
351            }
352            let actual = state.size_by_level.get(level).copied().unwrap_or(0);
353            let score = actual as f64 / target as f64;
354            if score > max_score && score > 1.0 {
355                max_score = score;
356                pick_level = Some(level);
357            }
358        }
359
360        let level = pick_level?;
361        let files = state.files_by_level.get(level)?;
362
363        // Pick file with most garbage for version/tombstone pruning
364        let pick_file = files
365            .iter()
366            .max_by(|a, b| {
367                a.garbage_ratio()
368                    .partial_cmp(&b.garbage_ratio())
369                    .unwrap_or(Ordering::Equal)
370            })?
371            .clone();
372
373        // Find overlapping files in next level
374        let next_level = level + 1;
375        let overlapping = state.find_overlapping(
376            next_level,
377            &pick_file.smallest_key,
378            &pick_file.largest_key,
379        );
380
381        let mut inputs = vec![pick_file];
382        inputs.extend(overlapping.into_iter().cloned());
383
384        let estimated_output_size: u64 = inputs.iter().map(|f| f.size).sum();
385
386        Some(CompactionJob {
387            id: self.job_counter.fetch_add(1, AtomicOrdering::SeqCst),
388            inputs,
389            target_level: next_level as u32,
390            priority: CompactionPriority::Normal,
391            estimated_output_size,
392            reason: if max_score > 2.0 {
393                CompactionReason::LevelSizeLimit
394            } else {
395                CompactionReason::VersionPruning
396            },
397        })
398    }
399
400    /// Pick tombstone-driven compaction
401    fn pick_tombstone_compaction(&self, state: &CompactionState) -> Option<CompactionJob> {
402        // Find file with highest tombstone density
403        for level in 0..state.level_count() {
404            if let Some(files) = state.files_by_level.get(level) {
405                for file in files {
406                    // Compact if tombstone density > 50%
407                    if file.tombstone_density() > 0.5 {
408                        let overlapping = state.find_overlapping(
409                            level + 1,
410                            &file.smallest_key,
411                            &file.largest_key,
412                        );
413
414                        let mut inputs = vec![file.clone()];
415                        inputs.extend(overlapping.into_iter().cloned());
416
417                        return Some(CompactionJob {
418                            id: self.job_counter.fetch_add(1, AtomicOrdering::SeqCst),
419                            inputs,
420                            target_level: (level + 1) as u32,
421                            priority: CompactionPriority::Normal,
422                            estimated_output_size: file.size / 2, // Estimate 50% reduction
423                            reason: CompactionReason::TombstoneDensity,
424                        });
425                    }
426                }
427            }
428        }
429
430        None
431    }
432}
433
434impl CompactionPicker for LeveledCompactionPicker {
435    fn pick_compaction(&self, state: &CompactionState) -> Option<CompactionJob> {
436        // Priority order:
437        // 1. L0 compaction (avoid write stalls)
438        // 2. Tombstone-driven compaction (reclaim space)
439        // 3. Level size compaction
440
441        self.pick_l0_compaction(state)
442            .or_else(|| self.pick_tombstone_compaction(state))
443            .or_else(|| self.pick_level_compaction(state))
444    }
445
446    fn calculate_level_targets(&self, state: &CompactionState) -> Vec<u64> {
447        let mut targets = vec![0u64; state.level_count().max(7)];
448
449        // L0 doesn't have a size target, it has a file count trigger
450        targets[0] = 0;
451
452        // L1 is the base
453        targets[1] = self.config.max_bytes_for_level_base;
454
455        // Each subsequent level is multiplied
456        for level in 2..targets.len() {
457            targets[level] = (targets[level - 1] as f64
458                * self.config.max_bytes_for_level_multiplier) as u64;
459        }
460
461        targets
462    }
463}
464
465/// Universal compaction picker (for write-heavy workloads)
466pub struct UniversalCompactionPicker {
467    config: CompactionConfig,
468    job_counter: AtomicU64,
469}
470
471impl UniversalCompactionPicker {
472    pub fn new(config: CompactionConfig) -> Self {
473        Self {
474            config,
475            job_counter: AtomicU64::new(0),
476        }
477    }
478}
479
480impl CompactionPicker for UniversalCompactionPicker {
481    fn pick_compaction(&self, state: &CompactionState) -> Option<CompactionJob> {
482        // Universal compaction: pick sorted runs to merge based on size ratios
483        let all_files: Vec<_> = state.files_by_level.iter().flatten().cloned().collect();
484
485        if all_files.len() < 2 {
486            return None;
487        }
488
489        // Sort by creation time (oldest first for FIFO-like behavior)
490        let mut sorted_files = all_files;
491        sorted_files.sort_by(|a, b| a.created_at.cmp(&b.created_at));
492
493        // Pick adjacent runs where size ratio is acceptable
494        let size_ratio_threshold = 2.0;
495        let mut inputs = Vec::new();
496        let mut total_size = 0u64;
497
498        for file in sorted_files {
499            if inputs.is_empty() || (total_size as f64 / file.size as f64) < size_ratio_threshold {
500                total_size += file.size;
501                inputs.push(file);
502            } else {
503                break;
504            }
505        }
506
507        if inputs.len() < 2 {
508            return None;
509        }
510
511        Some(CompactionJob {
512            id: self.job_counter.fetch_add(1, AtomicOrdering::SeqCst),
513            inputs,
514            target_level: 0, // Universal keeps everything at L0
515            priority: CompactionPriority::Normal,
516            estimated_output_size: total_size,
517            reason: CompactionReason::LevelSizeLimit,
518        })
519    }
520
521    fn calculate_level_targets(&self, _state: &CompactionState) -> Vec<u64> {
522        // Universal compaction doesn't use level targets
523        vec![]
524    }
525}
526
527/// Version pruning filter
528///
529/// Determines which versions can be safely garbage collected.
530pub struct VersionPruner {
531    /// Retention configuration
532    config: RetentionConfig,
533    /// Set of active snapshot timestamps
534    active_snapshots: RwLock<HashSet<Timestamp>>,
535    /// Current timestamp
536    current_time: AtomicU64,
537}
538
539impl VersionPruner {
540    pub fn new(config: RetentionConfig) -> Self {
541        let now = SystemTime::now()
542            .duration_since(UNIX_EPOCH)
543            .map(|d| d.as_millis() as u64)
544            .unwrap_or(0);
545
546        Self {
547            config,
548            active_snapshots: RwLock::new(HashSet::new()),
549            current_time: AtomicU64::new(now),
550        }
551    }
552
553    /// Register an active snapshot
554    pub fn register_snapshot(&self, timestamp: Timestamp) {
555        self.active_snapshots.write().insert(timestamp);
556    }
557
558    /// Unregister a snapshot
559    pub fn unregister_snapshot(&self, timestamp: Timestamp) {
560        self.active_snapshots.write().remove(&timestamp);
561    }
562
563    /// Get the oldest active snapshot
564    pub fn oldest_snapshot(&self) -> Option<Timestamp> {
565        self.active_snapshots.read().iter().min().copied()
566    }
567
568    /// Check if a version can be pruned
569    ///
570    /// A version can be pruned if:
571    /// 1. It's not the latest version
572    /// 2. It's older than max_version_age
573    /// 3. No active snapshot needs it
574    ///
575    /// A snapshot at time T needs all versions with timestamp <= T
576    /// (these are the versions that were visible at snapshot time)
577    pub fn can_prune_version(
578        &self,
579        version_timestamp: Timestamp,
580        is_latest: bool,
581        version_index: usize,
582    ) -> bool {
583        // Never prune the latest version
584        if is_latest {
585            return false;
586        }
587
588        // Check if any snapshot protects this version
589        // A snapshot at time T protects versions with timestamp <= T
590        if let Some(oldest) = self.oldest_snapshot() {
591            if version_timestamp <= oldest {
592                // Version was visible to the oldest snapshot, cannot prune
593                return false;
594            }
595        }
596
597        // Check max versions per key
598        if version_index >= self.config.max_versions_per_key {
599            return true;
600        }
601
602        // Check age-based pruning
603        if let Some(max_age) = self.config.max_version_age {
604            let now = self.current_time.load(AtomicOrdering::Relaxed);
605            let age_ms = now.saturating_sub(version_timestamp);
606            let max_age_ms = max_age.as_millis() as u64;
607
608            if age_ms > max_age_ms {
609                return true;
610            }
611        }
612
613        false
614    }
615
616    /// Check if a tombstone can be collected
617    ///
618    /// A tombstone can be collected only if:
619    /// 1. It's older than the grace period
620    /// 2. No active snapshot needs to see it
621    pub fn can_collect_tombstone(&self, tombstone_timestamp: Timestamp) -> bool {
622        let now = self.current_time.load(AtomicOrdering::Relaxed);
623        let grace_period_ms = self.config.tombstone_grace_period.as_millis() as u64;
624
625        if now.saturating_sub(tombstone_timestamp) < grace_period_ms {
626            return false;
627        }
628
629        // Check if any snapshot protects this tombstone
630        if let Some(oldest) = self.oldest_snapshot() {
631            if tombstone_timestamp <= oldest {
632                // Tombstone was visible to the oldest snapshot, cannot collect
633                return false;
634            }
635        }
636
637        true
638    }
639
640    /// Update current time
641    pub fn update_time(&self) {
642        let now = SystemTime::now()
643            .duration_since(UNIX_EPOCH)
644            .map(|d| d.as_millis() as u64)
645            .unwrap_or(0);
646        self.current_time.store(now, AtomicOrdering::Relaxed);
647    }
648}
649
650/// Compaction statistics
651#[derive(Debug, Default, Clone)]
652pub struct CompactionStats {
653    /// Number of compactions completed
654    pub compactions_completed: u64,
655    /// Total bytes read during compaction
656    pub bytes_read: u64,
657    /// Total bytes written during compaction
658    pub bytes_written: u64,
659    /// Entries processed
660    pub entries_processed: u64,
661    /// Tombstones collected
662    pub tombstones_collected: u64,
663    /// Old versions pruned
664    pub versions_pruned: u64,
665    /// Write amplification factor
666    pub write_amplification: f64,
667    /// Space amplification factor
668    pub space_amplification: f64,
669}
670
671// =============================================================================
672// Tests
673// =============================================================================
674
675#[cfg(test)]
676mod tests {
677    use super::*;
678
679    fn make_file(id: u64, level: u32, size: u64, smallest: &str, largest: &str) -> CompactionFile {
680        CompactionFile {
681            id,
682            level,
683            size,
684            smallest_key: smallest.as_bytes().to_vec(),
685            largest_key: largest.as_bytes().to_vec(),
686            num_entries: 1000,
687            num_deletions: 0,
688            num_old_versions: 0,
689            oldest_timestamp: 0,
690            newest_timestamp: 100,
691            created_at: Instant::now(),
692        }
693    }
694
695    #[test]
696    fn test_leveled_picker_l0() {
697        let picker = LeveledCompactionPicker::new(CompactionConfig {
698            l0_compaction_trigger: 4,
699            ..Default::default()
700        });
701
702        let mut state = CompactionState::default();
703        state.files_by_level = vec![
704            vec![
705                make_file(1, 0, 1000, "a", "d"),
706                make_file(2, 0, 1000, "c", "f"),
707                make_file(3, 0, 1000, "e", "h"),
708                make_file(4, 0, 1000, "g", "j"),
709            ],
710            vec![make_file(10, 1, 10000, "a", "z")],
711        ];
712
713        let job = picker.pick_compaction(&state);
714        assert!(job.is_some());
715
716        let job = job.unwrap();
717        assert_eq!(job.target_level, 1);
718        assert!(job.inputs.len() >= 4); // All L0 files + overlapping L1
719    }
720
721    #[test]
722    fn test_tombstone_density() {
723        let mut file = make_file(1, 0, 1000, "a", "z");
724        file.num_entries = 100;
725        file.num_deletions = 60;
726
727        assert!(file.tombstone_density() > 0.5);
728    }
729
730    #[test]
731    fn test_version_pruner() {
732        let config = RetentionConfig {
733            max_version_age: Some(Duration::from_secs(3600)),
734            max_versions_per_key: 5,
735            tombstone_grace_period: Duration::from_secs(60),
736            min_file_age: Duration::from_secs(60),
737        };
738
739        let pruner = VersionPruner::new(config);
740
741        // Latest version should never be pruned
742        assert!(!pruner.can_prune_version(0, true, 0));
743
744        // Version beyond max_versions_per_key can be pruned (if no snapshots)
745        assert!(pruner.can_prune_version(0, false, 10));
746
747        // Register a snapshot at timestamp 1000
748        let snapshot_ts = 1000;
749        pruner.register_snapshot(snapshot_ts);
750
751        // Version with timestamp 500 (before snapshot) is protected by the snapshot
752        // because the snapshot might need to read it
753        assert!(!pruner.can_prune_version(500, false, 10));
754
755        // Version with timestamp 1500 (after snapshot) can be pruned
756        // because it was written after the snapshot was taken
757        assert!(pruner.can_prune_version(1500, false, 10));
758    }
759
760    #[test]
761    fn test_level_targets() {
762        let picker = LeveledCompactionPicker::new(CompactionConfig {
763            max_bytes_for_level_base: 256 * 1024 * 1024,
764            max_bytes_for_level_multiplier: 10.0,
765            ..Default::default()
766        });
767
768        let state = CompactionState {
769            files_by_level: vec![vec![], vec![], vec![], vec![]],
770            size_by_level: vec![0, 0, 0, 0],
771            current_time: 0,
772            oldest_snapshot: 0,
773        };
774
775        let targets = picker.calculate_level_targets(&state);
776
777        assert_eq!(targets[0], 0); // L0 has no size target
778        assert_eq!(targets[1], 256 * 1024 * 1024);
779        assert_eq!(targets[2], 2560 * 1024 * 1024); // 10x L1
780    }
781
782    #[test]
783    fn test_compaction_priority() {
784        assert!(CompactionPriority::Urgent > CompactionPriority::High);
785        assert!(CompactionPriority::High > CompactionPriority::Normal);
786        assert!(CompactionPriority::Normal > CompactionPriority::Low);
787    }
788
789    #[test]
790    fn test_file_overlaps() {
791        let file = make_file(1, 0, 1000, "d", "h");
792
793        assert!(file.overlaps(b"a", b"e")); // Overlaps start
794        assert!(file.overlaps(b"f", b"z")); // Overlaps end
795        assert!(file.overlaps(b"e", b"g")); // Contained
796        assert!(file.overlaps(b"a", b"z")); // Contains file
797        assert!(!file.overlaps(b"a", b"c")); // Before
798        assert!(!file.overlaps(b"i", b"z")); // After
799    }
800}