1use std::collections::{BinaryHeap, HashMap, HashSet};
31use std::cmp::Ordering;
32use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
33use std::sync::Arc;
34use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
35
36use parking_lot::RwLock;
37
38pub type Timestamp = u64;
40
41#[derive(Debug, Clone)]
43pub struct RetentionConfig {
44 pub max_version_age: Option<Duration>,
46 pub max_versions_per_key: usize,
48 pub tombstone_grace_period: Duration,
50 pub min_file_age: Duration,
52}
53
54impl Default for RetentionConfig {
55 fn default() -> Self {
56 Self {
57 max_version_age: Some(Duration::from_secs(7 * 24 * 60 * 60)), max_versions_per_key: 10,
59 tombstone_grace_period: Duration::from_secs(24 * 60 * 60), min_file_age: Duration::from_secs(60), }
62 }
63}
64
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum CompactionStrategy {
68 Leveled,
70 Universal,
72 Fifo,
74 Tiered,
76}
77
78impl Default for CompactionStrategy {
79 fn default() -> Self {
80 Self::Leveled
81 }
82}
83
84#[derive(Debug, Clone)]
86pub struct CompactionConfig {
87 pub strategy: CompactionStrategy,
89 pub retention: RetentionConfig,
91 pub l0_compaction_trigger: usize,
93 pub l0_stop_writes_trigger: usize,
95 pub max_bytes_for_level_base: u64,
97 pub max_bytes_for_level_multiplier: f64,
99 pub target_file_size_base: u64,
101 pub target_file_size_multiplier: f64,
103 pub max_concurrent_compactions: usize,
105 pub tombstone_sample_rate: f64,
107}
108
109impl Default for CompactionConfig {
110 fn default() -> Self {
111 Self {
112 strategy: CompactionStrategy::Leveled,
113 retention: RetentionConfig::default(),
114 l0_compaction_trigger: 4,
115 l0_stop_writes_trigger: 12,
116 max_bytes_for_level_base: 256 * 1024 * 1024, max_bytes_for_level_multiplier: 10.0,
118 target_file_size_base: 64 * 1024 * 1024, target_file_size_multiplier: 1.0,
120 max_concurrent_compactions: 4,
121 tombstone_sample_rate: 0.01,
122 }
123 }
124}
125
126#[derive(Debug, Clone)]
128pub struct CompactionFile {
129 pub id: u64,
131 pub level: u32,
133 pub size: u64,
135 pub smallest_key: Vec<u8>,
137 pub largest_key: Vec<u8>,
139 pub num_entries: u64,
141 pub num_deletions: u64,
143 pub num_old_versions: u64,
145 pub oldest_timestamp: Timestamp,
147 pub newest_timestamp: Timestamp,
149 pub created_at: Instant,
151}
152
153impl CompactionFile {
154 pub fn tombstone_density(&self) -> f64 {
156 if self.num_entries == 0 {
157 0.0
158 } else {
159 self.num_deletions as f64 / self.num_entries as f64
160 }
161 }
162
163 pub fn version_density(&self) -> f64 {
165 if self.num_entries == 0 {
166 0.0
167 } else {
168 self.num_old_versions as f64 / self.num_entries as f64
169 }
170 }
171
172 pub fn garbage_ratio(&self) -> f64 {
174 if self.num_entries == 0 {
175 0.0
176 } else {
177 (self.num_deletions + self.num_old_versions) as f64 / self.num_entries as f64
178 }
179 }
180
181 pub fn overlaps(&self, smallest: &[u8], largest: &[u8]) -> bool {
183 self.smallest_key.as_slice() <= largest && self.largest_key.as_slice() >= smallest
184 }
185}
186
187#[derive(Debug, Clone)]
189pub struct CompactionJob {
190 pub id: u64,
192 pub inputs: Vec<CompactionFile>,
194 pub target_level: u32,
196 pub priority: CompactionPriority,
198 pub estimated_output_size: u64,
200 pub reason: CompactionReason,
202}
203
204#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
206pub enum CompactionPriority {
207 Low = 0,
209 Normal = 1,
211 High = 2,
213 Urgent = 3,
215}
216
217#[derive(Debug, Clone, Copy, PartialEq, Eq)]
219pub enum CompactionReason {
220 L0FileCount,
222 LevelSizeLimit,
224 TombstoneDensity,
226 VersionPruning,
228 TtlExpiration,
230 Manual,
232 ForcedFlush,
234}
235
236pub trait CompactionPicker: Send + Sync {
238 fn pick_compaction(&self, state: &CompactionState) -> Option<CompactionJob>;
240
241 fn calculate_level_targets(&self, state: &CompactionState) -> Vec<u64>;
243}
244
245#[derive(Debug, Default)]
247pub struct CompactionState {
248 pub files_by_level: Vec<Vec<CompactionFile>>,
250 pub size_by_level: Vec<u64>,
252 pub current_time: Timestamp,
254 pub oldest_snapshot: Timestamp,
256}
257
258impl CompactionState {
259 pub fn l0_file_count(&self) -> usize {
261 self.files_by_level.get(0).map(|f| f.len()).unwrap_or(0)
262 }
263
264 pub fn level_count(&self) -> usize {
266 self.files_by_level.len()
267 }
268
269 pub fn find_overlapping(&self, level: usize, smallest: &[u8], largest: &[u8]) -> Vec<&CompactionFile> {
271 self.files_by_level
272 .get(level)
273 .map(|files| {
274 files.iter().filter(|f| f.overlaps(smallest, largest)).collect()
275 })
276 .unwrap_or_default()
277 }
278}
279
280pub struct LeveledCompactionPicker {
282 config: CompactionConfig,
283 job_counter: AtomicU64,
284}
285
286impl LeveledCompactionPicker {
287 pub fn new(config: CompactionConfig) -> Self {
288 Self {
289 config,
290 job_counter: AtomicU64::new(0),
291 }
292 }
293
294 fn pick_l0_compaction(&self, state: &CompactionState) -> Option<CompactionJob> {
296 let l0_files = state.files_by_level.get(0)?;
297
298 if l0_files.len() < self.config.l0_compaction_trigger {
299 return None;
300 }
301
302 let inputs: Vec<_> = l0_files.clone();
304 if inputs.is_empty() {
305 return None;
306 }
307
308 let smallest = inputs.iter().map(|f| f.smallest_key.as_slice()).min()?;
310 let largest = inputs.iter().map(|f| f.largest_key.as_slice()).max()?;
311
312 let l1_overlapping = state.find_overlapping(1, smallest, largest);
313 let mut all_inputs = inputs;
314 all_inputs.extend(l1_overlapping.into_iter().cloned());
315
316 let priority = if l0_files.len() >= self.config.l0_stop_writes_trigger {
317 CompactionPriority::Urgent
318 } else if l0_files.len() >= self.config.l0_compaction_trigger * 2 {
319 CompactionPriority::High
320 } else {
321 CompactionPriority::Normal
322 };
323
324 let estimated_output_size: u64 = all_inputs.iter().map(|f| f.size).sum();
325
326 Some(CompactionJob {
327 id: self.job_counter.fetch_add(1, AtomicOrdering::SeqCst),
328 inputs: all_inputs,
329 target_level: 1,
330 priority,
331 estimated_output_size,
332 reason: CompactionReason::L0FileCount,
333 })
334 }
335
336 fn pick_level_compaction(&self, state: &CompactionState) -> Option<CompactionJob> {
338 let targets = self.calculate_level_targets(state);
339
340 let mut max_score = 0.0f64;
342 let mut pick_level = None;
343
344 for level in 1..state.level_count() {
345 if level >= targets.len() {
346 continue;
347 }
348 let target = targets[level];
349 if target == 0 {
350 continue;
351 }
352 let actual = state.size_by_level.get(level).copied().unwrap_or(0);
353 let score = actual as f64 / target as f64;
354 if score > max_score && score > 1.0 {
355 max_score = score;
356 pick_level = Some(level);
357 }
358 }
359
360 let level = pick_level?;
361 let files = state.files_by_level.get(level)?;
362
363 let pick_file = files
365 .iter()
366 .max_by(|a, b| {
367 a.garbage_ratio()
368 .partial_cmp(&b.garbage_ratio())
369 .unwrap_or(Ordering::Equal)
370 })?
371 .clone();
372
373 let next_level = level + 1;
375 let overlapping = state.find_overlapping(
376 next_level,
377 &pick_file.smallest_key,
378 &pick_file.largest_key,
379 );
380
381 let mut inputs = vec![pick_file];
382 inputs.extend(overlapping.into_iter().cloned());
383
384 let estimated_output_size: u64 = inputs.iter().map(|f| f.size).sum();
385
386 Some(CompactionJob {
387 id: self.job_counter.fetch_add(1, AtomicOrdering::SeqCst),
388 inputs,
389 target_level: next_level as u32,
390 priority: CompactionPriority::Normal,
391 estimated_output_size,
392 reason: if max_score > 2.0 {
393 CompactionReason::LevelSizeLimit
394 } else {
395 CompactionReason::VersionPruning
396 },
397 })
398 }
399
400 fn pick_tombstone_compaction(&self, state: &CompactionState) -> Option<CompactionJob> {
402 for level in 0..state.level_count() {
404 if let Some(files) = state.files_by_level.get(level) {
405 for file in files {
406 if file.tombstone_density() > 0.5 {
408 let overlapping = state.find_overlapping(
409 level + 1,
410 &file.smallest_key,
411 &file.largest_key,
412 );
413
414 let mut inputs = vec![file.clone()];
415 inputs.extend(overlapping.into_iter().cloned());
416
417 return Some(CompactionJob {
418 id: self.job_counter.fetch_add(1, AtomicOrdering::SeqCst),
419 inputs,
420 target_level: (level + 1) as u32,
421 priority: CompactionPriority::Normal,
422 estimated_output_size: file.size / 2, reason: CompactionReason::TombstoneDensity,
424 });
425 }
426 }
427 }
428 }
429
430 None
431 }
432}
433
434impl CompactionPicker for LeveledCompactionPicker {
435 fn pick_compaction(&self, state: &CompactionState) -> Option<CompactionJob> {
436 self.pick_l0_compaction(state)
442 .or_else(|| self.pick_tombstone_compaction(state))
443 .or_else(|| self.pick_level_compaction(state))
444 }
445
446 fn calculate_level_targets(&self, state: &CompactionState) -> Vec<u64> {
447 let mut targets = vec![0u64; state.level_count().max(7)];
448
449 targets[0] = 0;
451
452 targets[1] = self.config.max_bytes_for_level_base;
454
455 for level in 2..targets.len() {
457 targets[level] = (targets[level - 1] as f64
458 * self.config.max_bytes_for_level_multiplier) as u64;
459 }
460
461 targets
462 }
463}
464
465pub struct UniversalCompactionPicker {
467 config: CompactionConfig,
468 job_counter: AtomicU64,
469}
470
471impl UniversalCompactionPicker {
472 pub fn new(config: CompactionConfig) -> Self {
473 Self {
474 config,
475 job_counter: AtomicU64::new(0),
476 }
477 }
478}
479
480impl CompactionPicker for UniversalCompactionPicker {
481 fn pick_compaction(&self, state: &CompactionState) -> Option<CompactionJob> {
482 let all_files: Vec<_> = state.files_by_level.iter().flatten().cloned().collect();
484
485 if all_files.len() < 2 {
486 return None;
487 }
488
489 let mut sorted_files = all_files;
491 sorted_files.sort_by(|a, b| a.created_at.cmp(&b.created_at));
492
493 let size_ratio_threshold = 2.0;
495 let mut inputs = Vec::new();
496 let mut total_size = 0u64;
497
498 for file in sorted_files {
499 if inputs.is_empty() || (total_size as f64 / file.size as f64) < size_ratio_threshold {
500 total_size += file.size;
501 inputs.push(file);
502 } else {
503 break;
504 }
505 }
506
507 if inputs.len() < 2 {
508 return None;
509 }
510
511 Some(CompactionJob {
512 id: self.job_counter.fetch_add(1, AtomicOrdering::SeqCst),
513 inputs,
514 target_level: 0, priority: CompactionPriority::Normal,
516 estimated_output_size: total_size,
517 reason: CompactionReason::LevelSizeLimit,
518 })
519 }
520
521 fn calculate_level_targets(&self, _state: &CompactionState) -> Vec<u64> {
522 vec![]
524 }
525}
526
527pub struct VersionPruner {
531 config: RetentionConfig,
533 active_snapshots: RwLock<HashSet<Timestamp>>,
535 current_time: AtomicU64,
537}
538
539impl VersionPruner {
540 pub fn new(config: RetentionConfig) -> Self {
541 let now = SystemTime::now()
542 .duration_since(UNIX_EPOCH)
543 .map(|d| d.as_millis() as u64)
544 .unwrap_or(0);
545
546 Self {
547 config,
548 active_snapshots: RwLock::new(HashSet::new()),
549 current_time: AtomicU64::new(now),
550 }
551 }
552
553 pub fn register_snapshot(&self, timestamp: Timestamp) {
555 self.active_snapshots.write().insert(timestamp);
556 }
557
558 pub fn unregister_snapshot(&self, timestamp: Timestamp) {
560 self.active_snapshots.write().remove(×tamp);
561 }
562
563 pub fn oldest_snapshot(&self) -> Option<Timestamp> {
565 self.active_snapshots.read().iter().min().copied()
566 }
567
568 pub fn can_prune_version(
578 &self,
579 version_timestamp: Timestamp,
580 is_latest: bool,
581 version_index: usize,
582 ) -> bool {
583 if is_latest {
585 return false;
586 }
587
588 if let Some(oldest) = self.oldest_snapshot() {
591 if version_timestamp <= oldest {
592 return false;
594 }
595 }
596
597 if version_index >= self.config.max_versions_per_key {
599 return true;
600 }
601
602 if let Some(max_age) = self.config.max_version_age {
604 let now = self.current_time.load(AtomicOrdering::Relaxed);
605 let age_ms = now.saturating_sub(version_timestamp);
606 let max_age_ms = max_age.as_millis() as u64;
607
608 if age_ms > max_age_ms {
609 return true;
610 }
611 }
612
613 false
614 }
615
616 pub fn can_collect_tombstone(&self, tombstone_timestamp: Timestamp) -> bool {
622 let now = self.current_time.load(AtomicOrdering::Relaxed);
623 let grace_period_ms = self.config.tombstone_grace_period.as_millis() as u64;
624
625 if now.saturating_sub(tombstone_timestamp) < grace_period_ms {
626 return false;
627 }
628
629 if let Some(oldest) = self.oldest_snapshot() {
631 if tombstone_timestamp <= oldest {
632 return false;
634 }
635 }
636
637 true
638 }
639
640 pub fn update_time(&self) {
642 let now = SystemTime::now()
643 .duration_since(UNIX_EPOCH)
644 .map(|d| d.as_millis() as u64)
645 .unwrap_or(0);
646 self.current_time.store(now, AtomicOrdering::Relaxed);
647 }
648}
649
650#[derive(Debug, Default, Clone)]
652pub struct CompactionStats {
653 pub compactions_completed: u64,
655 pub bytes_read: u64,
657 pub bytes_written: u64,
659 pub entries_processed: u64,
661 pub tombstones_collected: u64,
663 pub versions_pruned: u64,
665 pub write_amplification: f64,
667 pub space_amplification: f64,
669}
670
671#[cfg(test)]
676mod tests {
677 use super::*;
678
679 fn make_file(id: u64, level: u32, size: u64, smallest: &str, largest: &str) -> CompactionFile {
680 CompactionFile {
681 id,
682 level,
683 size,
684 smallest_key: smallest.as_bytes().to_vec(),
685 largest_key: largest.as_bytes().to_vec(),
686 num_entries: 1000,
687 num_deletions: 0,
688 num_old_versions: 0,
689 oldest_timestamp: 0,
690 newest_timestamp: 100,
691 created_at: Instant::now(),
692 }
693 }
694
695 #[test]
696 fn test_leveled_picker_l0() {
697 let picker = LeveledCompactionPicker::new(CompactionConfig {
698 l0_compaction_trigger: 4,
699 ..Default::default()
700 });
701
702 let mut state = CompactionState::default();
703 state.files_by_level = vec![
704 vec![
705 make_file(1, 0, 1000, "a", "d"),
706 make_file(2, 0, 1000, "c", "f"),
707 make_file(3, 0, 1000, "e", "h"),
708 make_file(4, 0, 1000, "g", "j"),
709 ],
710 vec![make_file(10, 1, 10000, "a", "z")],
711 ];
712
713 let job = picker.pick_compaction(&state);
714 assert!(job.is_some());
715
716 let job = job.unwrap();
717 assert_eq!(job.target_level, 1);
718 assert!(job.inputs.len() >= 4); }
720
721 #[test]
722 fn test_tombstone_density() {
723 let mut file = make_file(1, 0, 1000, "a", "z");
724 file.num_entries = 100;
725 file.num_deletions = 60;
726
727 assert!(file.tombstone_density() > 0.5);
728 }
729
730 #[test]
731 fn test_version_pruner() {
732 let config = RetentionConfig {
733 max_version_age: Some(Duration::from_secs(3600)),
734 max_versions_per_key: 5,
735 tombstone_grace_period: Duration::from_secs(60),
736 min_file_age: Duration::from_secs(60),
737 };
738
739 let pruner = VersionPruner::new(config);
740
741 assert!(!pruner.can_prune_version(0, true, 0));
743
744 assert!(pruner.can_prune_version(0, false, 10));
746
747 let snapshot_ts = 1000;
749 pruner.register_snapshot(snapshot_ts);
750
751 assert!(!pruner.can_prune_version(500, false, 10));
754
755 assert!(pruner.can_prune_version(1500, false, 10));
758 }
759
760 #[test]
761 fn test_level_targets() {
762 let picker = LeveledCompactionPicker::new(CompactionConfig {
763 max_bytes_for_level_base: 256 * 1024 * 1024,
764 max_bytes_for_level_multiplier: 10.0,
765 ..Default::default()
766 });
767
768 let state = CompactionState {
769 files_by_level: vec![vec![], vec![], vec![], vec![]],
770 size_by_level: vec![0, 0, 0, 0],
771 current_time: 0,
772 oldest_snapshot: 0,
773 };
774
775 let targets = picker.calculate_level_targets(&state);
776
777 assert_eq!(targets[0], 0); assert_eq!(targets[1], 256 * 1024 * 1024);
779 assert_eq!(targets[2], 2560 * 1024 * 1024); }
781
782 #[test]
783 fn test_compaction_priority() {
784 assert!(CompactionPriority::Urgent > CompactionPriority::High);
785 assert!(CompactionPriority::High > CompactionPriority::Normal);
786 assert!(CompactionPriority::Normal > CompactionPriority::Low);
787 }
788
789 #[test]
790 fn test_file_overlaps() {
791 let file = make_file(1, 0, 1000, "d", "h");
792
793 assert!(file.overlaps(b"a", b"e")); assert!(file.overlaps(b"f", b"z")); assert!(file.overlaps(b"e", b"g")); assert!(file.overlaps(b"a", b"z")); assert!(!file.overlaps(b"a", b"c")); assert!(!file.overlaps(b"i", b"z")); }
800}