1use std::collections::{BinaryHeap, HashMap, HashSet};
34use std::cmp::Ordering;
35use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
36use std::sync::Arc;
37use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
38
39use parking_lot::RwLock;
40
41pub type Timestamp = u64;
43
44#[derive(Debug, Clone)]
46pub struct RetentionConfig {
47 pub max_version_age: Option<Duration>,
49 pub max_versions_per_key: usize,
51 pub tombstone_grace_period: Duration,
53 pub min_file_age: Duration,
55}
56
57impl Default for RetentionConfig {
58 fn default() -> Self {
59 Self {
60 max_version_age: Some(Duration::from_secs(7 * 24 * 60 * 60)), max_versions_per_key: 10,
62 tombstone_grace_period: Duration::from_secs(24 * 60 * 60), min_file_age: Duration::from_secs(60), }
65 }
66}
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum CompactionStrategy {
71 Leveled,
73 Universal,
75 Fifo,
77 Tiered,
79}
80
81impl Default for CompactionStrategy {
82 fn default() -> Self {
83 Self::Leveled
84 }
85}
86
87#[derive(Debug, Clone)]
89pub struct CompactionConfig {
90 pub strategy: CompactionStrategy,
92 pub retention: RetentionConfig,
94 pub l0_compaction_trigger: usize,
96 pub l0_stop_writes_trigger: usize,
98 pub max_bytes_for_level_base: u64,
100 pub max_bytes_for_level_multiplier: f64,
102 pub target_file_size_base: u64,
104 pub target_file_size_multiplier: f64,
106 pub max_concurrent_compactions: usize,
108 pub tombstone_sample_rate: f64,
110}
111
112impl Default for CompactionConfig {
113 fn default() -> Self {
114 Self {
115 strategy: CompactionStrategy::Leveled,
116 retention: RetentionConfig::default(),
117 l0_compaction_trigger: 4,
118 l0_stop_writes_trigger: 12,
119 max_bytes_for_level_base: 256 * 1024 * 1024, max_bytes_for_level_multiplier: 10.0,
121 target_file_size_base: 64 * 1024 * 1024, target_file_size_multiplier: 1.0,
123 max_concurrent_compactions: 4,
124 tombstone_sample_rate: 0.01,
125 }
126 }
127}
128
129#[derive(Debug, Clone)]
131pub struct CompactionFile {
132 pub id: u64,
134 pub level: u32,
136 pub size: u64,
138 pub smallest_key: Vec<u8>,
140 pub largest_key: Vec<u8>,
142 pub num_entries: u64,
144 pub num_deletions: u64,
146 pub num_old_versions: u64,
148 pub oldest_timestamp: Timestamp,
150 pub newest_timestamp: Timestamp,
152 pub created_at: Instant,
154}
155
156impl CompactionFile {
157 pub fn tombstone_density(&self) -> f64 {
159 if self.num_entries == 0 {
160 0.0
161 } else {
162 self.num_deletions as f64 / self.num_entries as f64
163 }
164 }
165
166 pub fn version_density(&self) -> f64 {
168 if self.num_entries == 0 {
169 0.0
170 } else {
171 self.num_old_versions as f64 / self.num_entries as f64
172 }
173 }
174
175 pub fn garbage_ratio(&self) -> f64 {
177 if self.num_entries == 0 {
178 0.0
179 } else {
180 (self.num_deletions + self.num_old_versions) as f64 / self.num_entries as f64
181 }
182 }
183
184 pub fn overlaps(&self, smallest: &[u8], largest: &[u8]) -> bool {
186 self.smallest_key.as_slice() <= largest && self.largest_key.as_slice() >= smallest
187 }
188}
189
190#[derive(Debug, Clone)]
192pub struct CompactionJob {
193 pub id: u64,
195 pub inputs: Vec<CompactionFile>,
197 pub target_level: u32,
199 pub priority: CompactionPriority,
201 pub estimated_output_size: u64,
203 pub reason: CompactionReason,
205}
206
207#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
209pub enum CompactionPriority {
210 Low = 0,
212 Normal = 1,
214 High = 2,
216 Urgent = 3,
218}
219
220#[derive(Debug, Clone, Copy, PartialEq, Eq)]
222pub enum CompactionReason {
223 L0FileCount,
225 LevelSizeLimit,
227 TombstoneDensity,
229 VersionPruning,
231 TtlExpiration,
233 Manual,
235 ForcedFlush,
237}
238
239pub trait CompactionPicker: Send + Sync {
241 fn pick_compaction(&self, state: &CompactionState) -> Option<CompactionJob>;
243
244 fn calculate_level_targets(&self, state: &CompactionState) -> Vec<u64>;
246}
247
248#[derive(Debug, Default)]
250pub struct CompactionState {
251 pub files_by_level: Vec<Vec<CompactionFile>>,
253 pub size_by_level: Vec<u64>,
255 pub current_time: Timestamp,
257 pub oldest_snapshot: Timestamp,
259}
260
261impl CompactionState {
262 pub fn l0_file_count(&self) -> usize {
264 self.files_by_level.get(0).map(|f| f.len()).unwrap_or(0)
265 }
266
267 pub fn level_count(&self) -> usize {
269 self.files_by_level.len()
270 }
271
272 pub fn find_overlapping(&self, level: usize, smallest: &[u8], largest: &[u8]) -> Vec<&CompactionFile> {
274 self.files_by_level
275 .get(level)
276 .map(|files| {
277 files.iter().filter(|f| f.overlaps(smallest, largest)).collect()
278 })
279 .unwrap_or_default()
280 }
281}
282
283pub struct LeveledCompactionPicker {
285 config: CompactionConfig,
286 job_counter: AtomicU64,
287}
288
289impl LeveledCompactionPicker {
290 pub fn new(config: CompactionConfig) -> Self {
291 Self {
292 config,
293 job_counter: AtomicU64::new(0),
294 }
295 }
296
297 fn pick_l0_compaction(&self, state: &CompactionState) -> Option<CompactionJob> {
299 let l0_files = state.files_by_level.get(0)?;
300
301 if l0_files.len() < self.config.l0_compaction_trigger {
302 return None;
303 }
304
305 let inputs: Vec<_> = l0_files.clone();
307 if inputs.is_empty() {
308 return None;
309 }
310
311 let smallest = inputs.iter().map(|f| f.smallest_key.as_slice()).min()?;
313 let largest = inputs.iter().map(|f| f.largest_key.as_slice()).max()?;
314
315 let l1_overlapping = state.find_overlapping(1, smallest, largest);
316 let mut all_inputs = inputs;
317 all_inputs.extend(l1_overlapping.into_iter().cloned());
318
319 let priority = if l0_files.len() >= self.config.l0_stop_writes_trigger {
320 CompactionPriority::Urgent
321 } else if l0_files.len() >= self.config.l0_compaction_trigger * 2 {
322 CompactionPriority::High
323 } else {
324 CompactionPriority::Normal
325 };
326
327 let estimated_output_size: u64 = all_inputs.iter().map(|f| f.size).sum();
328
329 Some(CompactionJob {
330 id: self.job_counter.fetch_add(1, AtomicOrdering::SeqCst),
331 inputs: all_inputs,
332 target_level: 1,
333 priority,
334 estimated_output_size,
335 reason: CompactionReason::L0FileCount,
336 })
337 }
338
339 fn pick_level_compaction(&self, state: &CompactionState) -> Option<CompactionJob> {
341 let targets = self.calculate_level_targets(state);
342
343 let mut max_score = 0.0f64;
345 let mut pick_level = None;
346
347 for level in 1..state.level_count() {
348 if level >= targets.len() {
349 continue;
350 }
351 let target = targets[level];
352 if target == 0 {
353 continue;
354 }
355 let actual = state.size_by_level.get(level).copied().unwrap_or(0);
356 let score = actual as f64 / target as f64;
357 if score > max_score && score > 1.0 {
358 max_score = score;
359 pick_level = Some(level);
360 }
361 }
362
363 let level = pick_level?;
364 let files = state.files_by_level.get(level)?;
365
366 let pick_file = files
368 .iter()
369 .max_by(|a, b| {
370 a.garbage_ratio()
371 .partial_cmp(&b.garbage_ratio())
372 .unwrap_or(Ordering::Equal)
373 })?
374 .clone();
375
376 let next_level = level + 1;
378 let overlapping = state.find_overlapping(
379 next_level,
380 &pick_file.smallest_key,
381 &pick_file.largest_key,
382 );
383
384 let mut inputs = vec![pick_file];
385 inputs.extend(overlapping.into_iter().cloned());
386
387 let estimated_output_size: u64 = inputs.iter().map(|f| f.size).sum();
388
389 Some(CompactionJob {
390 id: self.job_counter.fetch_add(1, AtomicOrdering::SeqCst),
391 inputs,
392 target_level: next_level as u32,
393 priority: CompactionPriority::Normal,
394 estimated_output_size,
395 reason: if max_score > 2.0 {
396 CompactionReason::LevelSizeLimit
397 } else {
398 CompactionReason::VersionPruning
399 },
400 })
401 }
402
403 fn pick_tombstone_compaction(&self, state: &CompactionState) -> Option<CompactionJob> {
405 for level in 0..state.level_count() {
407 if let Some(files) = state.files_by_level.get(level) {
408 for file in files {
409 if file.tombstone_density() > 0.5 {
411 let overlapping = state.find_overlapping(
412 level + 1,
413 &file.smallest_key,
414 &file.largest_key,
415 );
416
417 let mut inputs = vec![file.clone()];
418 inputs.extend(overlapping.into_iter().cloned());
419
420 return Some(CompactionJob {
421 id: self.job_counter.fetch_add(1, AtomicOrdering::SeqCst),
422 inputs,
423 target_level: (level + 1) as u32,
424 priority: CompactionPriority::Normal,
425 estimated_output_size: file.size / 2, reason: CompactionReason::TombstoneDensity,
427 });
428 }
429 }
430 }
431 }
432
433 None
434 }
435}
436
437impl CompactionPicker for LeveledCompactionPicker {
438 fn pick_compaction(&self, state: &CompactionState) -> Option<CompactionJob> {
439 self.pick_l0_compaction(state)
445 .or_else(|| self.pick_tombstone_compaction(state))
446 .or_else(|| self.pick_level_compaction(state))
447 }
448
449 fn calculate_level_targets(&self, state: &CompactionState) -> Vec<u64> {
450 let mut targets = vec![0u64; state.level_count().max(7)];
451
452 targets[0] = 0;
454
455 targets[1] = self.config.max_bytes_for_level_base;
457
458 for level in 2..targets.len() {
460 targets[level] = (targets[level - 1] as f64
461 * self.config.max_bytes_for_level_multiplier) as u64;
462 }
463
464 targets
465 }
466}
467
468pub struct UniversalCompactionPicker {
470 config: CompactionConfig,
471 job_counter: AtomicU64,
472}
473
474impl UniversalCompactionPicker {
475 pub fn new(config: CompactionConfig) -> Self {
476 Self {
477 config,
478 job_counter: AtomicU64::new(0),
479 }
480 }
481}
482
483impl CompactionPicker for UniversalCompactionPicker {
484 fn pick_compaction(&self, state: &CompactionState) -> Option<CompactionJob> {
485 let all_files: Vec<_> = state.files_by_level.iter().flatten().cloned().collect();
487
488 if all_files.len() < 2 {
489 return None;
490 }
491
492 let mut sorted_files = all_files;
494 sorted_files.sort_by(|a, b| a.created_at.cmp(&b.created_at));
495
496 let size_ratio_threshold = 2.0;
498 let mut inputs = Vec::new();
499 let mut total_size = 0u64;
500
501 for file in sorted_files {
502 if inputs.is_empty() || (total_size as f64 / file.size as f64) < size_ratio_threshold {
503 total_size += file.size;
504 inputs.push(file);
505 } else {
506 break;
507 }
508 }
509
510 if inputs.len() < 2 {
511 return None;
512 }
513
514 Some(CompactionJob {
515 id: self.job_counter.fetch_add(1, AtomicOrdering::SeqCst),
516 inputs,
517 target_level: 0, priority: CompactionPriority::Normal,
519 estimated_output_size: total_size,
520 reason: CompactionReason::LevelSizeLimit,
521 })
522 }
523
524 fn calculate_level_targets(&self, _state: &CompactionState) -> Vec<u64> {
525 vec![]
527 }
528}
529
530pub struct VersionPruner {
534 config: RetentionConfig,
536 active_snapshots: RwLock<HashSet<Timestamp>>,
538 current_time: AtomicU64,
540}
541
542impl VersionPruner {
543 pub fn new(config: RetentionConfig) -> Self {
544 let now = SystemTime::now()
545 .duration_since(UNIX_EPOCH)
546 .map(|d| d.as_millis() as u64)
547 .unwrap_or(0);
548
549 Self {
550 config,
551 active_snapshots: RwLock::new(HashSet::new()),
552 current_time: AtomicU64::new(now),
553 }
554 }
555
556 pub fn register_snapshot(&self, timestamp: Timestamp) {
558 self.active_snapshots.write().insert(timestamp);
559 }
560
561 pub fn unregister_snapshot(&self, timestamp: Timestamp) {
563 self.active_snapshots.write().remove(×tamp);
564 }
565
566 pub fn oldest_snapshot(&self) -> Option<Timestamp> {
568 self.active_snapshots.read().iter().min().copied()
569 }
570
571 pub fn can_prune_version(
581 &self,
582 version_timestamp: Timestamp,
583 is_latest: bool,
584 version_index: usize,
585 ) -> bool {
586 if is_latest {
588 return false;
589 }
590
591 if let Some(oldest) = self.oldest_snapshot() {
594 if version_timestamp <= oldest {
595 return false;
597 }
598 }
599
600 if version_index >= self.config.max_versions_per_key {
602 return true;
603 }
604
605 if let Some(max_age) = self.config.max_version_age {
607 let now = self.current_time.load(AtomicOrdering::Relaxed);
608 let age_ms = now.saturating_sub(version_timestamp);
609 let max_age_ms = max_age.as_millis() as u64;
610
611 if age_ms > max_age_ms {
612 return true;
613 }
614 }
615
616 false
617 }
618
619 pub fn can_collect_tombstone(&self, tombstone_timestamp: Timestamp) -> bool {
625 let now = self.current_time.load(AtomicOrdering::Relaxed);
626 let grace_period_ms = self.config.tombstone_grace_period.as_millis() as u64;
627
628 if now.saturating_sub(tombstone_timestamp) < grace_period_ms {
629 return false;
630 }
631
632 if let Some(oldest) = self.oldest_snapshot() {
634 if tombstone_timestamp <= oldest {
635 return false;
637 }
638 }
639
640 true
641 }
642
643 pub fn update_time(&self) {
645 let now = SystemTime::now()
646 .duration_since(UNIX_EPOCH)
647 .map(|d| d.as_millis() as u64)
648 .unwrap_or(0);
649 self.current_time.store(now, AtomicOrdering::Relaxed);
650 }
651}
652
653#[derive(Debug, Default, Clone)]
655pub struct CompactionStats {
656 pub compactions_completed: u64,
658 pub bytes_read: u64,
660 pub bytes_written: u64,
662 pub entries_processed: u64,
664 pub tombstones_collected: u64,
666 pub versions_pruned: u64,
668 pub write_amplification: f64,
670 pub space_amplification: f64,
672}
673
674#[cfg(test)]
679mod tests {
680 use super::*;
681
682 fn make_file(id: u64, level: u32, size: u64, smallest: &str, largest: &str) -> CompactionFile {
683 CompactionFile {
684 id,
685 level,
686 size,
687 smallest_key: smallest.as_bytes().to_vec(),
688 largest_key: largest.as_bytes().to_vec(),
689 num_entries: 1000,
690 num_deletions: 0,
691 num_old_versions: 0,
692 oldest_timestamp: 0,
693 newest_timestamp: 100,
694 created_at: Instant::now(),
695 }
696 }
697
698 #[test]
699 fn test_leveled_picker_l0() {
700 let picker = LeveledCompactionPicker::new(CompactionConfig {
701 l0_compaction_trigger: 4,
702 ..Default::default()
703 });
704
705 let mut state = CompactionState::default();
706 state.files_by_level = vec![
707 vec![
708 make_file(1, 0, 1000, "a", "d"),
709 make_file(2, 0, 1000, "c", "f"),
710 make_file(3, 0, 1000, "e", "h"),
711 make_file(4, 0, 1000, "g", "j"),
712 ],
713 vec![make_file(10, 1, 10000, "a", "z")],
714 ];
715
716 let job = picker.pick_compaction(&state);
717 assert!(job.is_some());
718
719 let job = job.unwrap();
720 assert_eq!(job.target_level, 1);
721 assert!(job.inputs.len() >= 4); }
723
724 #[test]
725 fn test_tombstone_density() {
726 let mut file = make_file(1, 0, 1000, "a", "z");
727 file.num_entries = 100;
728 file.num_deletions = 60;
729
730 assert!(file.tombstone_density() > 0.5);
731 }
732
733 #[test]
734 fn test_version_pruner() {
735 let config = RetentionConfig {
736 max_version_age: Some(Duration::from_secs(3600)),
737 max_versions_per_key: 5,
738 tombstone_grace_period: Duration::from_secs(60),
739 min_file_age: Duration::from_secs(60),
740 };
741
742 let pruner = VersionPruner::new(config);
743
744 assert!(!pruner.can_prune_version(0, true, 0));
746
747 assert!(pruner.can_prune_version(0, false, 10));
749
750 let snapshot_ts = 1000;
752 pruner.register_snapshot(snapshot_ts);
753
754 assert!(!pruner.can_prune_version(500, false, 10));
757
758 assert!(pruner.can_prune_version(1500, false, 10));
761 }
762
763 #[test]
764 fn test_level_targets() {
765 let picker = LeveledCompactionPicker::new(CompactionConfig {
766 max_bytes_for_level_base: 256 * 1024 * 1024,
767 max_bytes_for_level_multiplier: 10.0,
768 ..Default::default()
769 });
770
771 let state = CompactionState {
772 files_by_level: vec![vec![], vec![], vec![], vec![]],
773 size_by_level: vec![0, 0, 0, 0],
774 current_time: 0,
775 oldest_snapshot: 0,
776 };
777
778 let targets = picker.calculate_level_targets(&state);
779
780 assert_eq!(targets[0], 0); assert_eq!(targets[1], 256 * 1024 * 1024);
782 assert_eq!(targets[2], 2560 * 1024 * 1024); }
784
785 #[test]
786 fn test_compaction_priority() {
787 assert!(CompactionPriority::Urgent > CompactionPriority::High);
788 assert!(CompactionPriority::High > CompactionPriority::Normal);
789 assert!(CompactionPriority::Normal > CompactionPriority::Low);
790 }
791
792 #[test]
793 fn test_file_overlaps() {
794 let file = make_file(1, 0, 1000, "d", "h");
795
796 assert!(file.overlaps(b"a", b"e")); assert!(file.overlaps(b"f", b"z")); assert!(file.overlaps(b"e", b"g")); assert!(file.overlaps(b"a", b"z")); assert!(!file.overlaps(b"a", b"c")); assert!(!file.overlaps(b"i", b"z")); }
803}