1use crate::error::{AmateRSError, ErrorContext, Result};
12use crate::storage::{SSTableConfig, SSTableMetadata, SSTableReader, SSTableWriter};
13use crate::types::{CipherBlob, Key};
14use std::collections::BTreeMap;
15use std::path::{Path, PathBuf};
16use std::sync::Arc;
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::time::{Duration, Instant};
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum CompactionStrategy {
23 LevelBased,
25 SizeTiered,
27}
28
29#[derive(Debug, Clone)]
31pub struct CompactionConfig {
32 pub strategy: CompactionStrategy,
34 pub l0_threshold: usize,
36 pub level_multiplier: usize,
38 pub base_level_size: u64,
40 pub max_compaction_bytes: u64,
42 pub min_sstable_size: u64,
44 pub size_ratio: f64,
47 pub min_tier_size: usize,
49 pub max_compaction_bytes_per_sec: u64,
51 pub tombstone_ttl: Duration,
53}
54
55impl Default for CompactionConfig {
56 fn default() -> Self {
57 Self {
58 strategy: CompactionStrategy::LevelBased,
59 l0_threshold: 4,
60 level_multiplier: 10,
61 base_level_size: 10 * 1024 * 1024, max_compaction_bytes: 100 * 1024 * 1024, min_sstable_size: 1024, size_ratio: 2.0,
65 min_tier_size: 4,
66 max_compaction_bytes_per_sec: 0, tombstone_ttl: Duration::from_secs(7 * 24 * 3600), }
69 }
70}
71
72#[derive(Debug, Clone)]
74pub struct CompactionTask {
75 pub source_level: usize,
77 pub target_level: usize,
79 pub source_sstables: Vec<SSTableMetadata>,
81 pub target_sstables: Vec<SSTableMetadata>,
83}
84
85pub struct CompactionStats {
87 pub bytes_read: AtomicU64,
89 pub bytes_written: AtomicU64,
91 pub files_merged: AtomicU64,
93 pub compactions_completed: AtomicU64,
95 pub total_duration_ms: AtomicU64,
97 pub keys_processed: AtomicU64,
99 pub tombstones_removed: AtomicU64,
101}
102
103impl Default for CompactionStats {
104 fn default() -> Self {
105 Self {
106 bytes_read: AtomicU64::new(0),
107 bytes_written: AtomicU64::new(0),
108 files_merged: AtomicU64::new(0),
109 compactions_completed: AtomicU64::new(0),
110 total_duration_ms: AtomicU64::new(0),
111 keys_processed: AtomicU64::new(0),
112 tombstones_removed: AtomicU64::new(0),
113 }
114 }
115}
116
117impl std::fmt::Debug for CompactionStats {
118 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119 f.debug_struct("CompactionStats")
120 .field("bytes_read", &self.bytes_read.load(Ordering::Relaxed))
121 .field("bytes_written", &self.bytes_written.load(Ordering::Relaxed))
122 .field("files_merged", &self.files_merged.load(Ordering::Relaxed))
123 .field(
124 "compactions_completed",
125 &self.compactions_completed.load(Ordering::Relaxed),
126 )
127 .field(
128 "total_duration_ms",
129 &self.total_duration_ms.load(Ordering::Relaxed),
130 )
131 .field(
132 "keys_processed",
133 &self.keys_processed.load(Ordering::Relaxed),
134 )
135 .field(
136 "tombstones_removed",
137 &self.tombstones_removed.load(Ordering::Relaxed),
138 )
139 .finish()
140 }
141}
142
143impl CompactionStats {
144 pub fn snapshot(&self) -> CompactionStatsSnapshot {
146 CompactionStatsSnapshot {
147 bytes_read: self.bytes_read.load(Ordering::Relaxed),
148 bytes_written: self.bytes_written.load(Ordering::Relaxed),
149 files_merged: self.files_merged.load(Ordering::Relaxed),
150 compactions_completed: self.compactions_completed.load(Ordering::Relaxed),
151 total_duration_ms: self.total_duration_ms.load(Ordering::Relaxed),
152 keys_processed: self.keys_processed.load(Ordering::Relaxed),
153 tombstones_removed: self.tombstones_removed.load(Ordering::Relaxed),
154 }
155 }
156}
157
158#[derive(Debug, Clone, Default)]
160pub struct CompactionStatsSnapshot {
161 pub bytes_read: u64,
163 pub bytes_written: u64,
165 pub files_merged: u64,
167 pub compactions_completed: u64,
169 pub total_duration_ms: u64,
171 pub keys_processed: u64,
173 pub tombstones_removed: u64,
175}
176
177#[derive(Debug)]
182pub struct CompactionThrottler {
183 max_bytes_per_sec: u64,
185 bytes_in_window: u64,
187 window_start: Instant,
189}
190
191impl CompactionThrottler {
192 pub fn new(max_bytes_per_sec: u64) -> Self {
194 Self {
195 max_bytes_per_sec,
196 bytes_in_window: 0,
197 window_start: Instant::now(),
198 }
199 }
200
201 pub fn throttle(&mut self, bytes_written: u64) {
203 if self.max_bytes_per_sec == 0 {
204 return; }
206
207 self.bytes_in_window += bytes_written;
208
209 let elapsed = self.window_start.elapsed();
210 let elapsed_secs = elapsed.as_secs_f64();
211
212 let expected_secs = self.bytes_in_window as f64 / self.max_bytes_per_sec as f64;
214
215 if expected_secs > elapsed_secs {
216 let sleep_duration = Duration::from_secs_f64(expected_secs - elapsed_secs);
217 std::thread::sleep(sleep_duration);
218 }
219
220 if elapsed_secs >= 1.0 {
222 self.bytes_in_window = 0;
223 self.window_start = Instant::now();
224 }
225 }
226
227 pub fn is_enabled(&self) -> bool {
229 self.max_bytes_per_sec > 0
230 }
231}
232
233#[derive(Debug, Clone)]
235pub struct SizeTier {
236 pub sstables: Vec<SSTableMetadata>,
238 pub avg_size: u64,
240}
241
242pub struct CompactionPlanner {
244 config: CompactionConfig,
245}
246
247impl CompactionPlanner {
248 pub fn new(config: CompactionConfig) -> Self {
250 Self { config }
251 }
252
253 pub fn needs_l0_compaction(&self, l0_sstable_count: usize) -> bool {
255 l0_sstable_count >= self.config.l0_threshold
256 }
257
258 pub fn needs_level_compaction(&self, level: usize, level_size: u64) -> bool {
260 if level == 0 {
261 return false; }
263
264 let target_size = self.level_target_size(level);
265 level_size > target_size
266 }
267
268 pub fn level_target_size(&self, level: usize) -> u64 {
270 if level == 0 {
271 return 0; }
273
274 self.config.base_level_size * (self.config.level_multiplier as u64).pow(level as u32 - 1)
275 }
276
277 pub fn plan_compaction(
279 &self,
280 source_level: usize,
281 source_sstables: Vec<SSTableMetadata>,
282 target_sstables: Vec<SSTableMetadata>,
283 ) -> Option<CompactionTask> {
284 if source_sstables.is_empty() {
285 return None;
286 }
287
288 let source_to_compact = if source_level == 0 {
290 source_sstables
291 } else {
292 self.select_sstables_for_compaction(source_sstables)
294 };
295
296 if source_to_compact.is_empty() {
297 return None;
298 }
299
300 let target_to_merge = self.find_overlapping_sstables(&source_to_compact, &target_sstables);
302
303 Some(CompactionTask {
304 source_level,
305 target_level: source_level + 1,
306 source_sstables: source_to_compact,
307 target_sstables: target_to_merge,
308 })
309 }
310
311 pub fn plan_size_tiered_compaction(
316 &self,
317 sstables: Vec<SSTableMetadata>,
318 ) -> Option<CompactionTask> {
319 let tiers = self.group_by_size_tier(sstables);
320
321 for tier in tiers {
323 if tier.sstables.len() >= self.config.min_tier_size {
324 let max_level = tier.sstables.iter().map(|s| s.level).max().unwrap_or(0);
327 let target_level = max_level + 1;
328
329 return Some(CompactionTask {
330 source_level: max_level,
331 target_level,
332 source_sstables: tier.sstables,
333 target_sstables: Vec::new(),
334 });
335 }
336 }
337
338 None
339 }
340
341 pub fn group_by_size_tier(&self, mut sstables: Vec<SSTableMetadata>) -> Vec<SizeTier> {
346 if sstables.is_empty() {
347 return Vec::new();
348 }
349
350 sstables.sort_by_key(|s| s.file_size);
352
353 let sstables: Vec<SSTableMetadata> = sstables
355 .into_iter()
356 .filter(|s| s.file_size >= self.config.min_sstable_size)
357 .collect();
358
359 if sstables.is_empty() {
360 return Vec::new();
361 }
362
363 let mut tiers: Vec<SizeTier> = Vec::new();
364 let mut current_tier_sstables: Vec<SSTableMetadata> = Vec::new();
365 let mut tier_min_size: u64 = 0;
366
367 for sstable in sstables {
368 if current_tier_sstables.is_empty() {
369 tier_min_size = sstable.file_size;
370 current_tier_sstables.push(sstable);
371 } else if (sstable.file_size as f64) <= (tier_min_size as f64 * self.config.size_ratio)
372 {
373 current_tier_sstables.push(sstable);
375 } else {
376 let avg_size = current_tier_sstables
378 .iter()
379 .map(|s| s.file_size)
380 .sum::<u64>()
381 / current_tier_sstables.len().max(1) as u64;
382 tiers.push(SizeTier {
383 sstables: std::mem::take(&mut current_tier_sstables),
384 avg_size,
385 });
386 tier_min_size = sstable.file_size;
387 current_tier_sstables.push(sstable);
388 }
389 }
390
391 if !current_tier_sstables.is_empty() {
393 let avg_size = current_tier_sstables
394 .iter()
395 .map(|s| s.file_size)
396 .sum::<u64>()
397 / current_tier_sstables.len().max(1) as u64;
398 tiers.push(SizeTier {
399 sstables: current_tier_sstables,
400 avg_size,
401 });
402 }
403
404 tiers
405 }
406
407 fn select_sstables_for_compaction(
409 &self,
410 sstables: Vec<SSTableMetadata>,
411 ) -> Vec<SSTableMetadata> {
412 let mut selected = Vec::new();
413 let mut total_size = 0u64;
414
415 for sstable in sstables {
416 if total_size + sstable.file_size > self.config.max_compaction_bytes {
417 break;
418 }
419
420 total_size += sstable.file_size;
421 selected.push(sstable);
422
423 if selected.len() >= 2 {
425 break;
426 }
427 }
428
429 selected
430 }
431
432 pub fn find_overlapping_sstables(
434 &self,
435 source_sstables: &[SSTableMetadata],
436 target_sstables: &[SSTableMetadata],
437 ) -> Vec<SSTableMetadata> {
438 if source_sstables.is_empty() {
439 return Vec::new();
440 }
441
442 let min_key = source_sstables
444 .iter()
445 .map(|s| &s.min_key)
446 .min()
447 .expect("source_sstables is non-empty");
448
449 let max_key = source_sstables
450 .iter()
451 .map(|s| &s.max_key)
452 .max()
453 .expect("source_sstables is non-empty");
454
455 target_sstables
457 .iter()
458 .filter(|sstable| {
459 !(&sstable.max_key < min_key || &sstable.min_key > max_key)
461 })
462 .cloned()
463 .collect()
464 }
465}
466
467#[derive(Debug, Clone)]
469pub struct TombstoneEntry {
470 pub key: Key,
472 pub created_at: Instant,
474}
475
476pub struct CompactionExecutor {
478 config: SSTableConfig,
479 compaction_config: CompactionConfig,
480 stats: Arc<CompactionStats>,
481 throttler: CompactionThrottler,
482 tombstones: BTreeMap<Key, Instant>,
484}
485
486impl CompactionExecutor {
487 pub fn new(config: SSTableConfig) -> Self {
489 Self {
490 config,
491 compaction_config: CompactionConfig::default(),
492 stats: Arc::new(CompactionStats::default()),
493 throttler: CompactionThrottler::new(0),
494 tombstones: BTreeMap::new(),
495 }
496 }
497
498 pub fn with_compaction_config(
500 config: SSTableConfig,
501 compaction_config: CompactionConfig,
502 ) -> Self {
503 let throttler = CompactionThrottler::new(compaction_config.max_compaction_bytes_per_sec);
504 Self {
505 config,
506 compaction_config,
507 stats: Arc::new(CompactionStats::default()),
508 throttler,
509 tombstones: BTreeMap::new(),
510 }
511 }
512
513 pub fn register_tombstone(&mut self, key: Key, created_at: Instant) {
515 self.tombstones.insert(key, created_at);
516 }
517
518 fn is_tombstone_expired(&self, key: &Key) -> bool {
520 if let Some(created_at) = self.tombstones.get(key) {
521 created_at.elapsed() >= self.compaction_config.tombstone_ttl
522 } else {
523 false
524 }
525 }
526
527 pub fn execute_compaction(
529 &mut self,
530 task: CompactionTask,
531 output_dir: &Path,
532 next_sstable_id: &mut u64,
533 ) -> Result<Vec<SSTableMetadata>> {
534 let start_time = Instant::now();
535
536 let files_merged = (task.source_sstables.len() + task.target_sstables.len()) as u64;
538 self.stats
539 .files_merged
540 .fetch_add(files_merged, Ordering::Relaxed);
541
542 let _span =
543 tracing::debug_span!("amaters.storage.compaction", files_merged = files_merged,)
544 .entered();
545
546 let mut all_entries: BTreeMap<Key, Option<CipherBlob>> = BTreeMap::new();
548
549 for sstable in &task.source_sstables {
551 self.read_sstable_entries(&sstable.path, &mut all_entries)?;
552 self.stats
553 .bytes_read
554 .fetch_add(sstable.file_size, Ordering::Relaxed);
555 }
556
557 for sstable in &task.target_sstables {
559 self.read_sstable_entries(&sstable.path, &mut all_entries)?;
560 self.stats
561 .bytes_read
562 .fetch_add(sstable.file_size, Ordering::Relaxed);
563 }
564
565 let output_sstables = self.write_compacted_sstables(
567 all_entries,
568 task.target_level,
569 output_dir,
570 next_sstable_id,
571 )?;
572
573 self.stats
574 .compactions_completed
575 .fetch_add(1, Ordering::Relaxed);
576
577 let duration_ms = start_time.elapsed().as_millis() as u64;
578 self.stats
579 .total_duration_ms
580 .fetch_add(duration_ms, Ordering::Relaxed);
581
582 Ok(output_sstables)
583 }
584
585 fn read_sstable_entries(
587 &self,
588 path: &Path,
589 entries: &mut BTreeMap<Key, Option<CipherBlob>>,
590 ) -> Result<()> {
591 let reader = SSTableReader::open(path)?;
592 let sstable_entries = reader.iter()?;
593
594 for (key, value) in sstable_entries {
595 self.stats.keys_processed.fetch_add(1, Ordering::Relaxed);
596 entries.insert(key, Some(value));
598 }
599
600 Ok(())
601 }
602
603 fn write_compacted_sstables(
605 &mut self,
606 entries: BTreeMap<Key, Option<CipherBlob>>,
607 target_level: usize,
608 output_dir: &Path,
609 next_id: &mut u64,
610 ) -> Result<Vec<SSTableMetadata>> {
611 let mut output_sstables = Vec::new();
612 let mut current_writer: Option<SSTableWriter> = None;
613 let mut current_path: Option<PathBuf> = None;
614 let mut current_size = 0usize;
615 let mut current_min_key: Option<Key> = None;
616 let mut current_max_key: Option<Key> = None;
617 let mut current_entries = 0usize;
618
619 const MAX_SSTABLE_SIZE: usize = 2 * 1024 * 1024; for (key, value_opt) in entries {
622 let value = match value_opt {
624 Some(v) => v,
625 None => {
626 if self.is_tombstone_expired(&key) {
628 self.stats
629 .tombstones_removed
630 .fetch_add(1, Ordering::Relaxed);
631 self.tombstones.remove(&key);
633 continue;
634 }
635 self.stats
638 .tombstones_removed
639 .fetch_add(1, Ordering::Relaxed);
640 continue;
641 }
642 };
643
644 if current_writer.is_none() || current_size >= MAX_SSTABLE_SIZE {
646 if let Some(writer) = current_writer.take() {
648 writer.finish()?;
649
650 if let (Some(path), Some(min_key), Some(max_key)) = (
651 current_path.take(),
652 current_min_key.take(),
653 current_max_key.take(),
654 ) {
655 let file_size = std::fs::metadata(&path)
656 .map_err(|e| {
657 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
658 "Failed to get SSTable size: {}",
659 e
660 )))
661 })?
662 .len();
663
664 self.stats
665 .bytes_written
666 .fetch_add(file_size, Ordering::Relaxed);
667 self.throttler.throttle(file_size);
668
669 output_sstables.push(SSTableMetadata {
670 path,
671 min_key,
672 max_key,
673 num_entries: current_entries,
674 file_size,
675 level: target_level,
676 });
677 }
678 }
679
680 let id = *next_id;
682 *next_id += 1;
683 let path = output_dir.join(format!("L{}_{:08}.sst", target_level, id));
684 let writer = SSTableWriter::new(&path, self.config.clone())?;
685
686 current_writer = Some(writer);
687 current_path = Some(path);
688 current_size = 0;
689 current_min_key = None;
690 current_max_key = None;
691 current_entries = 0;
692 }
693
694 if let Some(ref mut writer) = current_writer {
696 let entry_size = 16 + key.as_bytes().len() + value.as_bytes().len();
697 writer.add(key.clone(), value)?;
698 current_size += entry_size;
699 current_entries += 1;
700
701 if current_min_key.is_none() {
702 current_min_key = Some(key.clone());
703 }
704 current_max_key = Some(key);
705 }
706 }
707
708 if let Some(writer) = current_writer {
710 writer.finish()?;
711
712 if let (Some(path), Some(min_key), Some(max_key)) =
713 (current_path, current_min_key, current_max_key)
714 {
715 let file_size = std::fs::metadata(&path)
716 .map_err(|e| {
717 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
718 "Failed to get SSTable size: {}",
719 e
720 )))
721 })?
722 .len();
723
724 self.stats
725 .bytes_written
726 .fetch_add(file_size, Ordering::Relaxed);
727 self.throttler.throttle(file_size);
728
729 output_sstables.push(SSTableMetadata {
730 path,
731 min_key,
732 max_key,
733 num_entries: current_entries,
734 file_size,
735 level: target_level,
736 });
737 }
738 }
739
740 Ok(output_sstables)
741 }
742
743 pub fn stats(&self) -> &CompactionStats {
745 &self.stats
746 }
747
748 pub fn stats_snapshot(&self) -> CompactionStatsSnapshot {
750 self.stats.snapshot()
751 }
752}
753
754#[cfg(test)]
755mod tests {
756 use super::*;
757
758 #[test]
759 fn test_compaction_planner_l0_threshold() {
760 let config = CompactionConfig::default();
761 let planner = CompactionPlanner::new(config);
762
763 assert!(!planner.needs_l0_compaction(3));
764 assert!(planner.needs_l0_compaction(4));
765 assert!(planner.needs_l0_compaction(5));
766 }
767
768 #[test]
769 fn test_compaction_planner_level_sizes() {
770 let config = CompactionConfig {
771 base_level_size: 10 * 1024 * 1024, level_multiplier: 10,
773 ..Default::default()
774 };
775 let planner = CompactionPlanner::new(config);
776
777 assert_eq!(planner.level_target_size(1), 10 * 1024 * 1024); assert_eq!(planner.level_target_size(2), 100 * 1024 * 1024); assert_eq!(planner.level_target_size(3), 1000 * 1024 * 1024); }
781
782 #[test]
783 fn test_compaction_planner_needs_compaction() {
784 let config = CompactionConfig::default();
785 let planner = CompactionPlanner::new(config);
786
787 assert!(!planner.needs_level_compaction(0, 100 * 1024 * 1024));
789
790 assert!(!planner.needs_level_compaction(1, 5 * 1024 * 1024));
792 assert!(planner.needs_level_compaction(1, 15 * 1024 * 1024));
793 }
794
795 #[test]
796 fn test_find_overlapping_sstables() {
797 let config = CompactionConfig::default();
798 let planner = CompactionPlanner::new(config);
799
800 let source = vec![SSTableMetadata {
801 path: PathBuf::from("s1.sst"),
802 min_key: Key::from_str("key_005"),
803 max_key: Key::from_str("key_015"),
804 num_entries: 10,
805 file_size: 1000,
806 level: 0,
807 }];
808
809 let target = vec![
810 SSTableMetadata {
811 path: PathBuf::from("t1.sst"),
812 min_key: Key::from_str("key_000"),
813 max_key: Key::from_str("key_010"),
814 num_entries: 10,
815 file_size: 1000,
816 level: 1,
817 },
818 SSTableMetadata {
819 path: PathBuf::from("t2.sst"),
820 min_key: Key::from_str("key_020"),
821 max_key: Key::from_str("key_030"),
822 num_entries: 10,
823 file_size: 1000,
824 level: 1,
825 },
826 ];
827
828 let overlapping = planner.find_overlapping_sstables(&source, &target);
829
830 assert_eq!(overlapping.len(), 1);
831 assert_eq!(overlapping[0].path, PathBuf::from("t1.sst"));
832 }
833
834 #[test]
837 fn test_size_tiered_grouping_basic() {
838 let config = CompactionConfig {
839 strategy: CompactionStrategy::SizeTiered,
840 min_sstable_size: 100,
841 size_ratio: 2.0,
842 min_tier_size: 4,
843 ..Default::default()
844 };
845 let planner = CompactionPlanner::new(config);
846
847 let sstables = vec![
849 make_metadata("a.sst", 1000, 0),
850 make_metadata("b.sst", 1200, 0),
851 make_metadata("c.sst", 1500, 0),
852 make_metadata("d.sst", 1800, 0),
853 ];
854
855 let tiers = planner.group_by_size_tier(sstables);
856
857 assert_eq!(tiers.len(), 1);
859 assert_eq!(tiers[0].sstables.len(), 4);
860 }
861
862 #[test]
863 fn test_size_tiered_grouping_multiple_tiers() {
864 let config = CompactionConfig {
865 strategy: CompactionStrategy::SizeTiered,
866 min_sstable_size: 100,
867 size_ratio: 2.0,
868 min_tier_size: 2,
869 ..Default::default()
870 };
871 let planner = CompactionPlanner::new(config);
872
873 let sstables = vec![
875 make_metadata("small1.sst", 1000, 0),
876 make_metadata("small2.sst", 1500, 0),
877 make_metadata("big1.sst", 10000, 0),
878 make_metadata("big2.sst", 15000, 0),
879 ];
880
881 let tiers = planner.group_by_size_tier(sstables);
882
883 assert_eq!(tiers.len(), 2);
884 assert_eq!(tiers[0].sstables.len(), 2); assert_eq!(tiers[1].sstables.len(), 2); }
887
888 #[test]
889 fn test_size_tiered_merge_trigger() {
890 let config = CompactionConfig {
891 strategy: CompactionStrategy::SizeTiered,
892 min_sstable_size: 100,
893 size_ratio: 2.0,
894 min_tier_size: 4,
895 ..Default::default()
896 };
897 let planner = CompactionPlanner::new(config);
898
899 let sstables = vec![
901 make_metadata("a.sst", 1000, 0),
902 make_metadata("b.sst", 1200, 0),
903 make_metadata("c.sst", 1500, 0),
904 ];
905
906 let task = planner.plan_size_tiered_compaction(sstables);
907 assert!(task.is_none(), "Should not trigger with only 3 SSTables");
908
909 let sstables = vec![
911 make_metadata("a.sst", 1000, 0),
912 make_metadata("b.sst", 1200, 0),
913 make_metadata("c.sst", 1500, 0),
914 make_metadata("d.sst", 1800, 0),
915 ];
916
917 let task = planner.plan_size_tiered_compaction(sstables);
918 assert!(
919 task.is_some(),
920 "Should trigger with 4 SSTables in same tier"
921 );
922
923 let task = task.expect("task should be Some");
924 assert_eq!(task.source_sstables.len(), 4);
925 assert_eq!(task.target_level, 1);
926 }
927
928 #[test]
929 fn test_size_tiered_filters_small_sstables() {
930 let config = CompactionConfig {
931 strategy: CompactionStrategy::SizeTiered,
932 min_sstable_size: 500,
933 size_ratio: 2.0,
934 min_tier_size: 4,
935 ..Default::default()
936 };
937 let planner = CompactionPlanner::new(config);
938
939 let sstables = vec![
941 make_metadata("a.sst", 100, 0),
942 make_metadata("b.sst", 200, 0),
943 make_metadata("c.sst", 300, 0),
944 make_metadata("d.sst", 400, 0),
945 ];
946
947 let tiers = planner.group_by_size_tier(sstables);
948 assert!(tiers.is_empty());
949 }
950
951 #[test]
954 fn test_compaction_stats_default() {
955 let stats = CompactionStats::default();
956 let snapshot = stats.snapshot();
957
958 assert_eq!(snapshot.bytes_read, 0);
959 assert_eq!(snapshot.bytes_written, 0);
960 assert_eq!(snapshot.files_merged, 0);
961 assert_eq!(snapshot.compactions_completed, 0);
962 assert_eq!(snapshot.total_duration_ms, 0);
963 assert_eq!(snapshot.keys_processed, 0);
964 assert_eq!(snapshot.tombstones_removed, 0);
965 }
966
967 #[test]
968 fn test_compaction_stats_atomic_updates() {
969 let stats = CompactionStats::default();
970
971 stats.bytes_read.fetch_add(1000, Ordering::Relaxed);
972 stats.bytes_written.fetch_add(500, Ordering::Relaxed);
973 stats.files_merged.fetch_add(3, Ordering::Relaxed);
974 stats.compactions_completed.fetch_add(1, Ordering::Relaxed);
975 stats.total_duration_ms.fetch_add(42, Ordering::Relaxed);
976 stats.keys_processed.fetch_add(100, Ordering::Relaxed);
977 stats.tombstones_removed.fetch_add(5, Ordering::Relaxed);
978
979 let snapshot = stats.snapshot();
980 assert_eq!(snapshot.bytes_read, 1000);
981 assert_eq!(snapshot.bytes_written, 500);
982 assert_eq!(snapshot.files_merged, 3);
983 assert_eq!(snapshot.compactions_completed, 1);
984 assert_eq!(snapshot.total_duration_ms, 42);
985 assert_eq!(snapshot.keys_processed, 100);
986 assert_eq!(snapshot.tombstones_removed, 5);
987 }
988
989 #[test]
990 fn test_compaction_stats_thread_safety() {
991 let stats = Arc::new(CompactionStats::default());
992
993 let handles: Vec<_> = (0..10)
994 .map(|_| {
995 let stats_clone = Arc::clone(&stats);
996 std::thread::spawn(move || {
997 for _ in 0..100 {
998 stats_clone.bytes_read.fetch_add(1, Ordering::Relaxed);
999 stats_clone.keys_processed.fetch_add(1, Ordering::Relaxed);
1000 }
1001 })
1002 })
1003 .collect();
1004
1005 for handle in handles {
1006 handle.join().expect("thread should complete");
1007 }
1008
1009 let snapshot = stats.snapshot();
1010 assert_eq!(snapshot.bytes_read, 1000);
1011 assert_eq!(snapshot.keys_processed, 1000);
1012 }
1013
1014 #[test]
1017 fn test_throttler_disabled() {
1018 let mut throttler = CompactionThrottler::new(0);
1019 assert!(!throttler.is_enabled());
1020
1021 let start = Instant::now();
1023 throttler.throttle(1_000_000);
1024 let elapsed = start.elapsed();
1025
1026 assert!(elapsed < Duration::from_millis(50));
1028 }
1029
1030 #[test]
1031 fn test_throttler_enabled() {
1032 let mut throttler = CompactionThrottler::new(10_000); assert!(throttler.is_enabled());
1034
1035 let start = Instant::now();
1037 throttler.throttle(20_000);
1038 let elapsed = start.elapsed();
1039
1040 assert!(
1042 elapsed >= Duration::from_millis(1500),
1043 "Expected >= 1.5s delay, got {:?}",
1044 elapsed
1045 );
1046 }
1047
1048 #[test]
1049 fn test_throttler_small_writes_no_delay() {
1050 let mut throttler = CompactionThrottler::new(1_000_000); assert!(throttler.is_enabled());
1052
1053 let start = Instant::now();
1055 throttler.throttle(100);
1056 let elapsed = start.elapsed();
1057
1058 assert!(elapsed < Duration::from_millis(50));
1059 }
1060
1061 #[test]
1064 fn test_tombstone_registration() {
1065 let config = SSTableConfig::default();
1066 let mut executor = CompactionExecutor::new(config);
1067
1068 let key = Key::from_str("test_key");
1069 let created_at = Instant::now();
1070 executor.register_tombstone(key.clone(), created_at);
1071
1072 assert!(!executor.is_tombstone_expired(&key));
1074 }
1075
1076 #[test]
1077 fn test_tombstone_expiry_with_short_ttl() {
1078 let config = SSTableConfig::default();
1079 let compaction_config = CompactionConfig {
1080 tombstone_ttl: Duration::from_millis(1), ..Default::default()
1082 };
1083 let mut executor = CompactionExecutor::with_compaction_config(config, compaction_config);
1084
1085 let key = Key::from_str("expired_key");
1086 let old_time = Instant::now() - Duration::from_millis(10);
1088 executor.register_tombstone(key.clone(), old_time);
1089
1090 assert!(executor.is_tombstone_expired(&key));
1092 }
1093
1094 #[test]
1095 fn test_tombstone_not_expired() {
1096 let config = SSTableConfig::default();
1097 let compaction_config = CompactionConfig {
1098 tombstone_ttl: Duration::from_secs(3600), ..Default::default()
1100 };
1101 let mut executor = CompactionExecutor::with_compaction_config(config, compaction_config);
1102
1103 let key = Key::from_str("fresh_key");
1104 executor.register_tombstone(key.clone(), Instant::now());
1105
1106 assert!(!executor.is_tombstone_expired(&key));
1108 }
1109
1110 #[test]
1111 fn test_unknown_tombstone_not_expired() {
1112 let config = SSTableConfig::default();
1113 let executor = CompactionExecutor::new(config);
1114
1115 let key = Key::from_str("unknown_key");
1117 assert!(!executor.is_tombstone_expired(&key));
1118 }
1119
1120 #[test]
1123 fn test_plan_compaction_empty_source() {
1124 let config = CompactionConfig::default();
1125 let planner = CompactionPlanner::new(config);
1126
1127 let task = planner.plan_compaction(0, Vec::new(), Vec::new());
1128 assert!(task.is_none());
1129 }
1130
1131 #[test]
1132 fn test_plan_compaction_single_sstable() {
1133 let config = CompactionConfig::default();
1134 let planner = CompactionPlanner::new(config);
1135
1136 let source = vec![SSTableMetadata {
1137 path: PathBuf::from("single.sst"),
1138 min_key: Key::from_str("key_001"),
1139 max_key: Key::from_str("key_010"),
1140 num_entries: 10,
1141 file_size: 1000,
1142 level: 0,
1143 }];
1144
1145 let task = planner.plan_compaction(0, source, Vec::new());
1147 assert!(task.is_some());
1148 let task = task.expect("task should be Some for L0");
1149 assert_eq!(task.source_sstables.len(), 1);
1150 }
1151
1152 #[test]
1153 fn test_size_tiered_empty_input() {
1154 let config = CompactionConfig {
1155 strategy: CompactionStrategy::SizeTiered,
1156 ..Default::default()
1157 };
1158 let planner = CompactionPlanner::new(config);
1159
1160 let task = planner.plan_size_tiered_compaction(Vec::new());
1161 assert!(task.is_none());
1162 }
1163
1164 #[test]
1165 fn test_find_overlapping_empty_source() {
1166 let config = CompactionConfig::default();
1167 let planner = CompactionPlanner::new(config);
1168
1169 let target = vec![SSTableMetadata {
1170 path: PathBuf::from("t1.sst"),
1171 min_key: Key::from_str("key_000"),
1172 max_key: Key::from_str("key_010"),
1173 num_entries: 10,
1174 file_size: 1000,
1175 level: 1,
1176 }];
1177
1178 let overlapping = planner.find_overlapping_sstables(&[], &target);
1179 assert!(overlapping.is_empty());
1180 }
1181
1182 #[test]
1183 fn test_find_overlapping_no_overlap() {
1184 let config = CompactionConfig::default();
1185 let planner = CompactionPlanner::new(config);
1186
1187 let source = vec![SSTableMetadata {
1188 path: PathBuf::from("s1.sst"),
1189 min_key: Key::from_str("aaa"),
1190 max_key: Key::from_str("bbb"),
1191 num_entries: 10,
1192 file_size: 1000,
1193 level: 0,
1194 }];
1195
1196 let target = vec![SSTableMetadata {
1197 path: PathBuf::from("t1.sst"),
1198 min_key: Key::from_str("zzz_000"),
1199 max_key: Key::from_str("zzz_999"),
1200 num_entries: 10,
1201 file_size: 1000,
1202 level: 1,
1203 }];
1204
1205 let overlapping = planner.find_overlapping_sstables(&source, &target);
1206 assert!(overlapping.is_empty());
1207 }
1208
1209 #[test]
1210 fn test_compaction_config_defaults() {
1211 let config = CompactionConfig::default();
1212 assert_eq!(config.strategy, CompactionStrategy::LevelBased);
1213 assert_eq!(config.l0_threshold, 4);
1214 assert_eq!(config.level_multiplier, 10);
1215 assert_eq!(config.min_sstable_size, 1024);
1216 assert_eq!(config.size_ratio, 2.0);
1217 assert_eq!(config.min_tier_size, 4);
1218 assert_eq!(config.max_compaction_bytes_per_sec, 0);
1219 assert_eq!(config.tombstone_ttl, Duration::from_secs(7 * 24 * 3600));
1220 }
1221
1222 #[test]
1223 fn test_executor_stats_accessible() {
1224 let executor = CompactionExecutor::new(SSTableConfig::default());
1225 let snapshot = executor.stats_snapshot();
1226 assert_eq!(snapshot.compactions_completed, 0);
1227 assert_eq!(snapshot.bytes_read, 0);
1228 }
1229
1230 #[test]
1231 fn test_size_tiered_preserves_level_info() {
1232 let config = CompactionConfig {
1233 strategy: CompactionStrategy::SizeTiered,
1234 min_sstable_size: 100,
1235 size_ratio: 2.0,
1236 min_tier_size: 2,
1237 ..Default::default()
1238 };
1239 let planner = CompactionPlanner::new(config);
1240
1241 let sstables = vec![
1243 make_metadata("a.sst", 1000, 2),
1244 make_metadata("b.sst", 1500, 2),
1245 ];
1246
1247 let task = planner.plan_size_tiered_compaction(sstables);
1248 assert!(task.is_some());
1249 let task = task.expect("task should be Some");
1250 assert_eq!(task.source_level, 2);
1251 assert_eq!(task.target_level, 3);
1252 }
1253
1254 #[test]
1255 fn test_level_target_size_l0() {
1256 let config = CompactionConfig::default();
1257 let planner = CompactionPlanner::new(config);
1258 assert_eq!(planner.level_target_size(0), 0);
1259 }
1260
1261 #[test]
1264 fn test_executor_compaction_with_stats() {
1265 let temp_dir =
1266 std::env::temp_dir().join(format!("amaters_compaction_test_{}", std::process::id()));
1267 std::fs::create_dir_all(&temp_dir).expect("should create temp dir");
1268
1269 let sstable_config = SSTableConfig::default();
1270
1271 let path1 = temp_dir.join("L0_00000001.sst");
1273 let path2 = temp_dir.join("L0_00000002.sst");
1274
1275 create_test_sstable(
1276 &path1,
1277 &sstable_config,
1278 &[("key_01", "val_01"), ("key_02", "val_02")],
1279 );
1280 create_test_sstable(
1281 &path2,
1282 &sstable_config,
1283 &[("key_03", "val_03"), ("key_04", "val_04")],
1284 );
1285
1286 let meta1 = make_file_metadata(&path1, 0);
1287 let meta2 = make_file_metadata(&path2, 0);
1288
1289 let task = CompactionTask {
1290 source_level: 0,
1291 target_level: 1,
1292 source_sstables: vec![meta1, meta2],
1293 target_sstables: Vec::new(),
1294 };
1295
1296 let mut executor = CompactionExecutor::new(sstable_config);
1297 let mut next_id = 100u64;
1298
1299 let result = executor.execute_compaction(task, &temp_dir, &mut next_id);
1300 assert!(result.is_ok(), "compaction should succeed");
1301
1302 let output = result.expect("should have output");
1303 assert!(!output.is_empty(), "should produce output SSTables");
1304
1305 let snapshot = executor.stats_snapshot();
1306 assert!(snapshot.bytes_read > 0, "should have read bytes");
1307 assert!(snapshot.bytes_written > 0, "should have written bytes");
1308 assert_eq!(snapshot.compactions_completed, 1);
1309 assert_eq!(snapshot.files_merged, 2);
1310 assert!(
1311 snapshot.keys_processed >= 4,
1312 "should have processed at least 4 keys"
1313 );
1314 assert!(
1315 snapshot.total_duration_ms < 10_000,
1316 "should complete quickly"
1317 );
1318
1319 let _ = std::fs::remove_dir_all(&temp_dir);
1321 }
1322
1323 #[test]
1324 fn test_executor_compaction_with_throttling() {
1325 let temp_dir =
1326 std::env::temp_dir().join(format!("amaters_throttle_test_{}", std::process::id()));
1327 std::fs::create_dir_all(&temp_dir).expect("should create temp dir");
1328
1329 let sstable_config = SSTableConfig::default();
1330
1331 let path1 = temp_dir.join("L0_00000001.sst");
1333 create_test_sstable(
1334 &path1,
1335 &sstable_config,
1336 &[("key_01", "val_01"), ("key_02", "val_02")],
1337 );
1338
1339 let meta1 = make_file_metadata(&path1, 0);
1340
1341 let compaction_config = CompactionConfig {
1342 max_compaction_bytes_per_sec: 0, ..Default::default()
1344 };
1345
1346 let task = CompactionTask {
1347 source_level: 0,
1348 target_level: 1,
1349 source_sstables: vec![meta1],
1350 target_sstables: Vec::new(),
1351 };
1352
1353 let mut executor =
1354 CompactionExecutor::with_compaction_config(sstable_config, compaction_config);
1355 let mut next_id = 200u64;
1356
1357 let start = Instant::now();
1358 let result = executor.execute_compaction(task, &temp_dir, &mut next_id);
1359 let elapsed = start.elapsed();
1360
1361 assert!(result.is_ok());
1362 assert!(elapsed < Duration::from_secs(5));
1364
1365 let _ = std::fs::remove_dir_all(&temp_dir);
1367 }
1368
1369 fn make_metadata(name: &str, file_size: u64, level: usize) -> SSTableMetadata {
1372 SSTableMetadata {
1373 path: PathBuf::from(name),
1374 min_key: Key::from_str(&format!("{}_min", name)),
1375 max_key: Key::from_str(&format!("{}_max", name)),
1376 num_entries: 10,
1377 file_size,
1378 level,
1379 }
1380 }
1381
1382 fn create_test_sstable(path: &Path, config: &SSTableConfig, entries: &[(&str, &str)]) {
1383 let mut writer =
1384 SSTableWriter::new(path, config.clone()).expect("should create SSTable writer");
1385 for (k, v) in entries {
1386 let key = Key::from_str(k);
1387 let value = CipherBlob::new(v.as_bytes().to_vec());
1388 writer.add(key, value).expect("should add entry");
1389 }
1390 writer.finish().expect("should finish writing");
1391 }
1392
1393 fn make_file_metadata(path: &Path, level: usize) -> SSTableMetadata {
1394 let file_size = std::fs::metadata(path)
1395 .expect("SSTable file should exist")
1396 .len();
1397
1398 let reader = SSTableReader::open(path).expect("should open SSTable");
1400 let entries = reader.iter().expect("should read entries");
1401
1402 let min_key = entries
1403 .first()
1404 .map(|(k, _)| k.clone())
1405 .unwrap_or_else(|| Key::from_str(""));
1406 let max_key = entries
1407 .last()
1408 .map(|(k, _)| k.clone())
1409 .unwrap_or_else(|| Key::from_str(""));
1410
1411 SSTableMetadata {
1412 path: path.to_path_buf(),
1413 min_key,
1414 max_key,
1415 num_entries: entries.len(),
1416 file_size,
1417 level,
1418 }
1419 }
1420}