1use std::cmp::Ordering;
34use std::collections::HashSet;
35use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
36use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
37
38use parking_lot::RwLock;
39
40pub type Timestamp = u64;
42
43#[derive(Debug, Clone)]
45pub struct RetentionConfig {
46 pub max_version_age: Option<Duration>,
48 pub max_versions_per_key: usize,
50 pub tombstone_grace_period: Duration,
52 pub min_file_age: Duration,
54}
55
56impl Default for RetentionConfig {
57 fn default() -> Self {
58 Self {
59 max_version_age: Some(Duration::from_secs(7 * 24 * 60 * 60)), max_versions_per_key: 10,
61 tombstone_grace_period: Duration::from_secs(24 * 60 * 60), min_file_age: Duration::from_secs(60), }
64 }
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub enum CompactionStrategy {
70 Leveled,
72 Universal,
74 Fifo,
76 Tiered,
78}
79
80impl Default for CompactionStrategy {
81 fn default() -> Self {
82 Self::Leveled
83 }
84}
85
86#[derive(Debug, Clone)]
88pub struct CompactionConfig {
89 pub strategy: CompactionStrategy,
91 pub retention: RetentionConfig,
93 pub l0_compaction_trigger: usize,
95 pub l0_stop_writes_trigger: usize,
97 pub max_bytes_for_level_base: u64,
99 pub max_bytes_for_level_multiplier: f64,
101 pub target_file_size_base: u64,
103 pub target_file_size_multiplier: f64,
105 pub max_concurrent_compactions: usize,
107 pub tombstone_sample_rate: f64,
109}
110
111impl Default for CompactionConfig {
112 fn default() -> Self {
113 Self {
114 strategy: CompactionStrategy::Leveled,
115 retention: RetentionConfig::default(),
116 l0_compaction_trigger: 4,
117 l0_stop_writes_trigger: 12,
118 max_bytes_for_level_base: 256 * 1024 * 1024, max_bytes_for_level_multiplier: 10.0,
120 target_file_size_base: 64 * 1024 * 1024, target_file_size_multiplier: 1.0,
122 max_concurrent_compactions: 4,
123 tombstone_sample_rate: 0.01,
124 }
125 }
126}
127
128#[derive(Debug, Clone)]
130pub struct CompactionFile {
131 pub id: u64,
133 pub level: u32,
135 pub size: u64,
137 pub smallest_key: Vec<u8>,
139 pub largest_key: Vec<u8>,
141 pub num_entries: u64,
143 pub num_deletions: u64,
145 pub num_old_versions: u64,
147 pub oldest_timestamp: Timestamp,
149 pub newest_timestamp: Timestamp,
151 pub created_at: Instant,
153}
154
155impl CompactionFile {
156 pub fn tombstone_density(&self) -> f64 {
158 if self.num_entries == 0 {
159 0.0
160 } else {
161 self.num_deletions as f64 / self.num_entries as f64
162 }
163 }
164
165 pub fn version_density(&self) -> f64 {
167 if self.num_entries == 0 {
168 0.0
169 } else {
170 self.num_old_versions as f64 / self.num_entries as f64
171 }
172 }
173
174 pub fn garbage_ratio(&self) -> f64 {
176 if self.num_entries == 0 {
177 0.0
178 } else {
179 (self.num_deletions + self.num_old_versions) as f64 / self.num_entries as f64
180 }
181 }
182
183 pub fn overlaps(&self, smallest: &[u8], largest: &[u8]) -> bool {
185 self.smallest_key.as_slice() <= largest && self.largest_key.as_slice() >= smallest
186 }
187}
188
189#[derive(Debug, Clone)]
191pub struct CompactionJob {
192 pub id: u64,
194 pub inputs: Vec<CompactionFile>,
196 pub target_level: u32,
198 pub priority: CompactionPriority,
200 pub estimated_output_size: u64,
202 pub reason: CompactionReason,
204}
205
206#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
208pub enum CompactionPriority {
209 Low = 0,
211 Normal = 1,
213 High = 2,
215 Urgent = 3,
217}
218
219#[derive(Debug, Clone, Copy, PartialEq, Eq)]
221pub enum CompactionReason {
222 L0FileCount,
224 LevelSizeLimit,
226 TombstoneDensity,
228 VersionPruning,
230 TtlExpiration,
232 Manual,
234 ForcedFlush,
236}
237
238pub trait CompactionPicker: Send + Sync {
240 fn pick_compaction(&self, state: &CompactionState) -> Option<CompactionJob>;
242
243 fn calculate_level_targets(&self, state: &CompactionState) -> Vec<u64>;
245}
246
247#[derive(Debug, Default)]
249pub struct CompactionState {
250 pub files_by_level: Vec<Vec<CompactionFile>>,
252 pub size_by_level: Vec<u64>,
254 pub current_time: Timestamp,
256 pub oldest_snapshot: Timestamp,
258}
259
260impl CompactionState {
261 pub fn l0_file_count(&self) -> usize {
263 self.files_by_level.get(0).map(|f| f.len()).unwrap_or(0)
264 }
265
266 pub fn level_count(&self) -> usize {
268 self.files_by_level.len()
269 }
270
271 pub fn find_overlapping(
273 &self,
274 level: usize,
275 smallest: &[u8],
276 largest: &[u8],
277 ) -> Vec<&CompactionFile> {
278 self.files_by_level
279 .get(level)
280 .map(|files| {
281 files
282 .iter()
283 .filter(|f| f.overlaps(smallest, largest))
284 .collect()
285 })
286 .unwrap_or_default()
287 }
288}
289
290pub struct LeveledCompactionPicker {
292 config: CompactionConfig,
293 job_counter: AtomicU64,
294}
295
296impl LeveledCompactionPicker {
297 pub fn new(config: CompactionConfig) -> Self {
298 Self {
299 config,
300 job_counter: AtomicU64::new(0),
301 }
302 }
303
304 fn pick_l0_compaction(&self, state: &CompactionState) -> Option<CompactionJob> {
306 let l0_files = state.files_by_level.get(0)?;
307
308 if l0_files.len() < self.config.l0_compaction_trigger {
309 return None;
310 }
311
312 let inputs: Vec<_> = l0_files.clone();
314 if inputs.is_empty() {
315 return None;
316 }
317
318 let smallest = inputs.iter().map(|f| f.smallest_key.as_slice()).min()?;
320 let largest = inputs.iter().map(|f| f.largest_key.as_slice()).max()?;
321
322 let l1_overlapping = state.find_overlapping(1, smallest, largest);
323 let mut all_inputs = inputs;
324 all_inputs.extend(l1_overlapping.into_iter().cloned());
325
326 let priority = if l0_files.len() >= self.config.l0_stop_writes_trigger {
327 CompactionPriority::Urgent
328 } else if l0_files.len() >= self.config.l0_compaction_trigger * 2 {
329 CompactionPriority::High
330 } else {
331 CompactionPriority::Normal
332 };
333
334 let estimated_output_size: u64 = all_inputs.iter().map(|f| f.size).sum();
335
336 Some(CompactionJob {
337 id: self.job_counter.fetch_add(1, AtomicOrdering::SeqCst),
338 inputs: all_inputs,
339 target_level: 1,
340 priority,
341 estimated_output_size,
342 reason: CompactionReason::L0FileCount,
343 })
344 }
345
346 fn pick_level_compaction(&self, state: &CompactionState) -> Option<CompactionJob> {
348 let targets = self.calculate_level_targets(state);
349
350 let mut max_score = 0.0f64;
352 let mut pick_level = None;
353
354 for level in 1..state.level_count() {
355 if level >= targets.len() {
356 continue;
357 }
358 let target = targets[level];
359 if target == 0 {
360 continue;
361 }
362 let actual = state.size_by_level.get(level).copied().unwrap_or(0);
363 let score = actual as f64 / target as f64;
364 if score > max_score && score > 1.0 {
365 max_score = score;
366 pick_level = Some(level);
367 }
368 }
369
370 let level = pick_level?;
371 let files = state.files_by_level.get(level)?;
372
373 let pick_file = files
375 .iter()
376 .max_by(|a, b| {
377 a.garbage_ratio()
378 .partial_cmp(&b.garbage_ratio())
379 .unwrap_or(Ordering::Equal)
380 })?
381 .clone();
382
383 let next_level = level + 1;
385 let overlapping =
386 state.find_overlapping(next_level, &pick_file.smallest_key, &pick_file.largest_key);
387
388 let mut inputs = vec![pick_file];
389 inputs.extend(overlapping.into_iter().cloned());
390
391 let estimated_output_size: u64 = inputs.iter().map(|f| f.size).sum();
392
393 Some(CompactionJob {
394 id: self.job_counter.fetch_add(1, AtomicOrdering::SeqCst),
395 inputs,
396 target_level: next_level as u32,
397 priority: CompactionPriority::Normal,
398 estimated_output_size,
399 reason: if max_score > 2.0 {
400 CompactionReason::LevelSizeLimit
401 } else {
402 CompactionReason::VersionPruning
403 },
404 })
405 }
406
407 fn pick_tombstone_compaction(&self, state: &CompactionState) -> Option<CompactionJob> {
409 for level in 0..state.level_count() {
411 if let Some(files) = state.files_by_level.get(level) {
412 for file in files {
413 if file.tombstone_density() > 0.5 {
415 let overlapping = state.find_overlapping(
416 level + 1,
417 &file.smallest_key,
418 &file.largest_key,
419 );
420
421 let mut inputs = vec![file.clone()];
422 inputs.extend(overlapping.into_iter().cloned());
423
424 return Some(CompactionJob {
425 id: self.job_counter.fetch_add(1, AtomicOrdering::SeqCst),
426 inputs,
427 target_level: (level + 1) as u32,
428 priority: CompactionPriority::Normal,
429 estimated_output_size: file.size / 2, reason: CompactionReason::TombstoneDensity,
431 });
432 }
433 }
434 }
435 }
436
437 None
438 }
439}
440
441impl CompactionPicker for LeveledCompactionPicker {
442 fn pick_compaction(&self, state: &CompactionState) -> Option<CompactionJob> {
443 self.pick_l0_compaction(state)
449 .or_else(|| self.pick_tombstone_compaction(state))
450 .or_else(|| self.pick_level_compaction(state))
451 }
452
453 fn calculate_level_targets(&self, state: &CompactionState) -> Vec<u64> {
454 let mut targets = vec![0u64; state.level_count().max(7)];
455
456 targets[0] = 0;
458
459 targets[1] = self.config.max_bytes_for_level_base;
461
462 for level in 2..targets.len() {
464 targets[level] =
465 (targets[level - 1] as f64 * self.config.max_bytes_for_level_multiplier) as u64;
466 }
467
468 targets
469 }
470}
471
472pub struct UniversalCompactionPicker {
474 config: CompactionConfig,
475 job_counter: AtomicU64,
476}
477
478impl UniversalCompactionPicker {
479 pub fn new(config: CompactionConfig) -> Self {
480 Self {
481 config,
482 job_counter: AtomicU64::new(0),
483 }
484 }
485}
486
487impl CompactionPicker for UniversalCompactionPicker {
488 fn pick_compaction(&self, state: &CompactionState) -> Option<CompactionJob> {
489 let all_files: Vec<_> = state.files_by_level.iter().flatten().cloned().collect();
491
492 if all_files.len() < 2 {
493 return None;
494 }
495
496 let mut sorted_files = all_files;
498 sorted_files.sort_by(|a, b| a.created_at.cmp(&b.created_at));
499
500 let size_ratio_threshold = 2.0;
502 let mut inputs = Vec::new();
503 let mut total_size = 0u64;
504
505 for file in sorted_files {
506 if inputs.is_empty() || (total_size as f64 / file.size as f64) < size_ratio_threshold {
507 total_size += file.size;
508 inputs.push(file);
509 } else {
510 break;
511 }
512 }
513
514 if inputs.len() < 2 {
515 return None;
516 }
517
518 Some(CompactionJob {
519 id: self.job_counter.fetch_add(1, AtomicOrdering::SeqCst),
520 inputs,
521 target_level: 0, priority: CompactionPriority::Normal,
523 estimated_output_size: total_size,
524 reason: CompactionReason::LevelSizeLimit,
525 })
526 }
527
528 fn calculate_level_targets(&self, _state: &CompactionState) -> Vec<u64> {
529 vec![]
531 }
532}
533
534pub struct VersionPruner {
538 config: RetentionConfig,
540 active_snapshots: RwLock<HashSet<Timestamp>>,
542 current_time: AtomicU64,
544}
545
546impl VersionPruner {
547 pub fn new(config: RetentionConfig) -> Self {
548 let now = SystemTime::now()
549 .duration_since(UNIX_EPOCH)
550 .map(|d| d.as_millis() as u64)
551 .unwrap_or(0);
552
553 Self {
554 config,
555 active_snapshots: RwLock::new(HashSet::new()),
556 current_time: AtomicU64::new(now),
557 }
558 }
559
560 pub fn register_snapshot(&self, timestamp: Timestamp) {
562 self.active_snapshots.write().insert(timestamp);
563 }
564
565 pub fn unregister_snapshot(&self, timestamp: Timestamp) {
567 self.active_snapshots.write().remove(×tamp);
568 }
569
570 pub fn oldest_snapshot(&self) -> Option<Timestamp> {
572 self.active_snapshots.read().iter().min().copied()
573 }
574
575 pub fn can_prune_version(
585 &self,
586 version_timestamp: Timestamp,
587 is_latest: bool,
588 version_index: usize,
589 ) -> bool {
590 if is_latest {
592 return false;
593 }
594
595 if let Some(oldest) = self.oldest_snapshot() {
598 if version_timestamp <= oldest {
599 return false;
601 }
602 }
603
604 if version_index >= self.config.max_versions_per_key {
606 return true;
607 }
608
609 if let Some(max_age) = self.config.max_version_age {
611 let now = self.current_time.load(AtomicOrdering::Relaxed);
612 let age_ms = now.saturating_sub(version_timestamp);
613 let max_age_ms = max_age.as_millis() as u64;
614
615 if age_ms > max_age_ms {
616 return true;
617 }
618 }
619
620 false
621 }
622
623 pub fn can_collect_tombstone(&self, tombstone_timestamp: Timestamp) -> bool {
629 let now = self.current_time.load(AtomicOrdering::Relaxed);
630 let grace_period_ms = self.config.tombstone_grace_period.as_millis() as u64;
631
632 if now.saturating_sub(tombstone_timestamp) < grace_period_ms {
633 return false;
634 }
635
636 if let Some(oldest) = self.oldest_snapshot() {
638 if tombstone_timestamp <= oldest {
639 return false;
641 }
642 }
643
644 true
645 }
646
647 pub fn update_time(&self) {
649 let now = SystemTime::now()
650 .duration_since(UNIX_EPOCH)
651 .map(|d| d.as_millis() as u64)
652 .unwrap_or(0);
653 self.current_time.store(now, AtomicOrdering::Relaxed);
654 }
655}
656
657#[derive(Debug, Default, Clone)]
659pub struct CompactionStats {
660 pub compactions_completed: u64,
662 pub bytes_read: u64,
664 pub bytes_written: u64,
666 pub entries_processed: u64,
668 pub tombstones_collected: u64,
670 pub versions_pruned: u64,
672 pub write_amplification: f64,
674 pub space_amplification: f64,
676}
677
678#[cfg(test)]
683mod tests {
684 use super::*;
685
686 fn make_file(id: u64, level: u32, size: u64, smallest: &str, largest: &str) -> CompactionFile {
687 CompactionFile {
688 id,
689 level,
690 size,
691 smallest_key: smallest.as_bytes().to_vec(),
692 largest_key: largest.as_bytes().to_vec(),
693 num_entries: 1000,
694 num_deletions: 0,
695 num_old_versions: 0,
696 oldest_timestamp: 0,
697 newest_timestamp: 100,
698 created_at: Instant::now(),
699 }
700 }
701
702 #[test]
703 fn test_leveled_picker_l0() {
704 let picker = LeveledCompactionPicker::new(CompactionConfig {
705 l0_compaction_trigger: 4,
706 ..Default::default()
707 });
708
709 let mut state = CompactionState::default();
710 state.files_by_level = vec![
711 vec![
712 make_file(1, 0, 1000, "a", "d"),
713 make_file(2, 0, 1000, "c", "f"),
714 make_file(3, 0, 1000, "e", "h"),
715 make_file(4, 0, 1000, "g", "j"),
716 ],
717 vec![make_file(10, 1, 10000, "a", "z")],
718 ];
719
720 let job = picker.pick_compaction(&state);
721 assert!(job.is_some());
722
723 let job = job.unwrap();
724 assert_eq!(job.target_level, 1);
725 assert!(job.inputs.len() >= 4); }
727
728 #[test]
729 fn test_tombstone_density() {
730 let mut file = make_file(1, 0, 1000, "a", "z");
731 file.num_entries = 100;
732 file.num_deletions = 60;
733
734 assert!(file.tombstone_density() > 0.5);
735 }
736
737 #[test]
738 fn test_version_pruner() {
739 let config = RetentionConfig {
740 max_version_age: Some(Duration::from_secs(3600)),
741 max_versions_per_key: 5,
742 tombstone_grace_period: Duration::from_secs(60),
743 min_file_age: Duration::from_secs(60),
744 };
745
746 let pruner = VersionPruner::new(config);
747
748 assert!(!pruner.can_prune_version(0, true, 0));
750
751 assert!(pruner.can_prune_version(0, false, 10));
753
754 let snapshot_ts = 1000;
756 pruner.register_snapshot(snapshot_ts);
757
758 assert!(!pruner.can_prune_version(500, false, 10));
761
762 assert!(pruner.can_prune_version(1500, false, 10));
765 }
766
767 #[test]
768 fn test_level_targets() {
769 let picker = LeveledCompactionPicker::new(CompactionConfig {
770 max_bytes_for_level_base: 256 * 1024 * 1024,
771 max_bytes_for_level_multiplier: 10.0,
772 ..Default::default()
773 });
774
775 let state = CompactionState {
776 files_by_level: vec![vec![], vec![], vec![], vec![]],
777 size_by_level: vec![0, 0, 0, 0],
778 current_time: 0,
779 oldest_snapshot: 0,
780 };
781
782 let targets = picker.calculate_level_targets(&state);
783
784 assert_eq!(targets[0], 0); assert_eq!(targets[1], 256 * 1024 * 1024);
786 assert_eq!(targets[2], 2560 * 1024 * 1024); }
788
789 #[test]
790 fn test_compaction_priority() {
791 assert!(CompactionPriority::Urgent > CompactionPriority::High);
792 assert!(CompactionPriority::High > CompactionPriority::Normal);
793 assert!(CompactionPriority::Normal > CompactionPriority::Low);
794 }
795
796 #[test]
797 fn test_file_overlaps() {
798 let file = make_file(1, 0, 1000, "d", "h");
799
800 assert!(file.overlaps(b"a", b"e")); assert!(file.overlaps(b"f", b"z")); assert!(file.overlaps(b"e", b"g")); assert!(file.overlaps(b"a", b"z")); assert!(!file.overlaps(b"a", b"c")); assert!(!file.overlaps(b"i", b"z")); }
807}