1use crate::error::{AmateRSError, ErrorContext, Result};
12use crate::storage::{SSTableConfig, SSTableMetadata, SSTableReader, SSTableWriter};
13use crate::types::{CipherBlob, Key};
14use std::collections::BTreeMap;
15use std::path::{Path, PathBuf};
16use std::sync::Arc;
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::time::{Duration, Instant};
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum CompactionStrategy {
23 LevelBased,
25 SizeTiered,
27}
28
29#[derive(Debug, Clone)]
31pub struct CompactionConfig {
32 pub strategy: CompactionStrategy,
34 pub l0_threshold: usize,
36 pub level_multiplier: usize,
38 pub base_level_size: u64,
40 pub max_compaction_bytes: u64,
42 pub min_sstable_size: u64,
44 pub size_ratio: f64,
47 pub min_tier_size: usize,
49 pub max_compaction_bytes_per_sec: u64,
51 pub tombstone_ttl: Duration,
53}
54
55impl Default for CompactionConfig {
56 fn default() -> Self {
57 Self {
58 strategy: CompactionStrategy::LevelBased,
59 l0_threshold: 4,
60 level_multiplier: 10,
61 base_level_size: 10 * 1024 * 1024, max_compaction_bytes: 100 * 1024 * 1024, min_sstable_size: 1024, size_ratio: 2.0,
65 min_tier_size: 4,
66 max_compaction_bytes_per_sec: 0, tombstone_ttl: Duration::from_secs(7 * 24 * 3600), }
69 }
70}
71
72#[derive(Debug, Clone)]
74pub struct CompactionTask {
75 pub source_level: usize,
77 pub target_level: usize,
79 pub source_sstables: Vec<SSTableMetadata>,
81 pub target_sstables: Vec<SSTableMetadata>,
83}
84
85pub struct CompactionStats {
87 pub bytes_read: AtomicU64,
89 pub bytes_written: AtomicU64,
91 pub files_merged: AtomicU64,
93 pub compactions_completed: AtomicU64,
95 pub total_duration_ms: AtomicU64,
97 pub keys_processed: AtomicU64,
99 pub tombstones_removed: AtomicU64,
101}
102
103impl Default for CompactionStats {
104 fn default() -> Self {
105 Self {
106 bytes_read: AtomicU64::new(0),
107 bytes_written: AtomicU64::new(0),
108 files_merged: AtomicU64::new(0),
109 compactions_completed: AtomicU64::new(0),
110 total_duration_ms: AtomicU64::new(0),
111 keys_processed: AtomicU64::new(0),
112 tombstones_removed: AtomicU64::new(0),
113 }
114 }
115}
116
117impl std::fmt::Debug for CompactionStats {
118 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119 f.debug_struct("CompactionStats")
120 .field("bytes_read", &self.bytes_read.load(Ordering::Relaxed))
121 .field("bytes_written", &self.bytes_written.load(Ordering::Relaxed))
122 .field("files_merged", &self.files_merged.load(Ordering::Relaxed))
123 .field(
124 "compactions_completed",
125 &self.compactions_completed.load(Ordering::Relaxed),
126 )
127 .field(
128 "total_duration_ms",
129 &self.total_duration_ms.load(Ordering::Relaxed),
130 )
131 .field(
132 "keys_processed",
133 &self.keys_processed.load(Ordering::Relaxed),
134 )
135 .field(
136 "tombstones_removed",
137 &self.tombstones_removed.load(Ordering::Relaxed),
138 )
139 .finish()
140 }
141}
142
143impl CompactionStats {
144 pub fn snapshot(&self) -> CompactionStatsSnapshot {
146 CompactionStatsSnapshot {
147 bytes_read: self.bytes_read.load(Ordering::Relaxed),
148 bytes_written: self.bytes_written.load(Ordering::Relaxed),
149 files_merged: self.files_merged.load(Ordering::Relaxed),
150 compactions_completed: self.compactions_completed.load(Ordering::Relaxed),
151 total_duration_ms: self.total_duration_ms.load(Ordering::Relaxed),
152 keys_processed: self.keys_processed.load(Ordering::Relaxed),
153 tombstones_removed: self.tombstones_removed.load(Ordering::Relaxed),
154 }
155 }
156}
157
158#[derive(Debug, Clone, Default)]
160pub struct CompactionStatsSnapshot {
161 pub bytes_read: u64,
163 pub bytes_written: u64,
165 pub files_merged: u64,
167 pub compactions_completed: u64,
169 pub total_duration_ms: u64,
171 pub keys_processed: u64,
173 pub tombstones_removed: u64,
175}
176
177#[derive(Debug)]
182pub struct CompactionThrottler {
183 max_bytes_per_sec: u64,
185 bytes_in_window: u64,
187 window_start: Instant,
189}
190
191impl CompactionThrottler {
192 pub fn new(max_bytes_per_sec: u64) -> Self {
194 Self {
195 max_bytes_per_sec,
196 bytes_in_window: 0,
197 window_start: Instant::now(),
198 }
199 }
200
201 pub fn throttle(&mut self, bytes_written: u64) {
203 if self.max_bytes_per_sec == 0 {
204 return; }
206
207 self.bytes_in_window += bytes_written;
208
209 let elapsed = self.window_start.elapsed();
210 let elapsed_secs = elapsed.as_secs_f64();
211
212 let expected_secs = self.bytes_in_window as f64 / self.max_bytes_per_sec as f64;
214
215 if expected_secs > elapsed_secs {
216 let sleep_duration = Duration::from_secs_f64(expected_secs - elapsed_secs);
217 std::thread::sleep(sleep_duration);
218 }
219
220 if elapsed_secs >= 1.0 {
222 self.bytes_in_window = 0;
223 self.window_start = Instant::now();
224 }
225 }
226
227 pub fn is_enabled(&self) -> bool {
229 self.max_bytes_per_sec > 0
230 }
231}
232
233#[derive(Debug, Clone)]
235pub struct SizeTier {
236 pub sstables: Vec<SSTableMetadata>,
238 pub avg_size: u64,
240}
241
242pub struct CompactionPlanner {
244 config: CompactionConfig,
245}
246
247impl CompactionPlanner {
248 pub fn new(config: CompactionConfig) -> Self {
250 Self { config }
251 }
252
253 pub fn needs_l0_compaction(&self, l0_sstable_count: usize) -> bool {
255 l0_sstable_count >= self.config.l0_threshold
256 }
257
258 pub fn needs_level_compaction(&self, level: usize, level_size: u64) -> bool {
260 if level == 0 {
261 return false; }
263
264 let target_size = self.level_target_size(level);
265 level_size > target_size
266 }
267
268 pub fn level_target_size(&self, level: usize) -> u64 {
270 if level == 0 {
271 return 0; }
273
274 self.config.base_level_size * (self.config.level_multiplier as u64).pow(level as u32 - 1)
275 }
276
277 pub fn plan_compaction(
279 &self,
280 source_level: usize,
281 source_sstables: Vec<SSTableMetadata>,
282 target_sstables: Vec<SSTableMetadata>,
283 ) -> Option<CompactionTask> {
284 if source_sstables.is_empty() {
285 return None;
286 }
287
288 let source_to_compact = if source_level == 0 {
290 source_sstables
291 } else {
292 self.select_sstables_for_compaction(source_sstables)
294 };
295
296 if source_to_compact.is_empty() {
297 return None;
298 }
299
300 let target_to_merge = self.find_overlapping_sstables(&source_to_compact, &target_sstables);
302
303 Some(CompactionTask {
304 source_level,
305 target_level: source_level + 1,
306 source_sstables: source_to_compact,
307 target_sstables: target_to_merge,
308 })
309 }
310
311 pub fn plan_size_tiered_compaction(
316 &self,
317 sstables: Vec<SSTableMetadata>,
318 ) -> Option<CompactionTask> {
319 let tiers = self.group_by_size_tier(sstables);
320
321 for tier in tiers {
323 if tier.sstables.len() >= self.config.min_tier_size {
324 let max_level = tier.sstables.iter().map(|s| s.level).max().unwrap_or(0);
327 let target_level = max_level + 1;
328
329 return Some(CompactionTask {
330 source_level: max_level,
331 target_level,
332 source_sstables: tier.sstables,
333 target_sstables: Vec::new(),
334 });
335 }
336 }
337
338 None
339 }
340
341 pub fn group_by_size_tier(&self, mut sstables: Vec<SSTableMetadata>) -> Vec<SizeTier> {
346 if sstables.is_empty() {
347 return Vec::new();
348 }
349
350 sstables.sort_by_key(|s| s.file_size);
352
353 let sstables: Vec<SSTableMetadata> = sstables
355 .into_iter()
356 .filter(|s| s.file_size >= self.config.min_sstable_size)
357 .collect();
358
359 if sstables.is_empty() {
360 return Vec::new();
361 }
362
363 let mut tiers: Vec<SizeTier> = Vec::new();
364 let mut current_tier_sstables: Vec<SSTableMetadata> = Vec::new();
365 let mut tier_min_size: u64 = 0;
366
367 for sstable in sstables {
368 if current_tier_sstables.is_empty() {
369 tier_min_size = sstable.file_size;
370 current_tier_sstables.push(sstable);
371 } else if (sstable.file_size as f64) <= (tier_min_size as f64 * self.config.size_ratio)
372 {
373 current_tier_sstables.push(sstable);
375 } else {
376 let avg_size = current_tier_sstables
378 .iter()
379 .map(|s| s.file_size)
380 .sum::<u64>()
381 / current_tier_sstables.len().max(1) as u64;
382 tiers.push(SizeTier {
383 sstables: std::mem::take(&mut current_tier_sstables),
384 avg_size,
385 });
386 tier_min_size = sstable.file_size;
387 current_tier_sstables.push(sstable);
388 }
389 }
390
391 if !current_tier_sstables.is_empty() {
393 let avg_size = current_tier_sstables
394 .iter()
395 .map(|s| s.file_size)
396 .sum::<u64>()
397 / current_tier_sstables.len().max(1) as u64;
398 tiers.push(SizeTier {
399 sstables: current_tier_sstables,
400 avg_size,
401 });
402 }
403
404 tiers
405 }
406
407 fn select_sstables_for_compaction(
409 &self,
410 sstables: Vec<SSTableMetadata>,
411 ) -> Vec<SSTableMetadata> {
412 let mut selected = Vec::new();
413 let mut total_size = 0u64;
414
415 for sstable in sstables {
416 if total_size + sstable.file_size > self.config.max_compaction_bytes {
417 break;
418 }
419
420 total_size += sstable.file_size;
421 selected.push(sstable);
422
423 if selected.len() >= 2 {
425 break;
426 }
427 }
428
429 selected
430 }
431
432 pub fn find_overlapping_sstables(
434 &self,
435 source_sstables: &[SSTableMetadata],
436 target_sstables: &[SSTableMetadata],
437 ) -> Vec<SSTableMetadata> {
438 if source_sstables.is_empty() {
439 return Vec::new();
440 }
441
442 let min_key = source_sstables
444 .iter()
445 .map(|s| &s.min_key)
446 .min()
447 .expect("source_sstables is non-empty");
448
449 let max_key = source_sstables
450 .iter()
451 .map(|s| &s.max_key)
452 .max()
453 .expect("source_sstables is non-empty");
454
455 target_sstables
457 .iter()
458 .filter(|sstable| {
459 !(&sstable.max_key < min_key || &sstable.min_key > max_key)
461 })
462 .cloned()
463 .collect()
464 }
465}
466
467#[derive(Debug, Clone)]
469pub struct TombstoneEntry {
470 pub key: Key,
472 pub created_at: Instant,
474}
475
476pub struct CompactionExecutor {
478 config: SSTableConfig,
479 compaction_config: CompactionConfig,
480 stats: Arc<CompactionStats>,
481 throttler: CompactionThrottler,
482 tombstones: BTreeMap<Key, Instant>,
484}
485
486impl CompactionExecutor {
487 pub fn new(config: SSTableConfig) -> Self {
489 Self {
490 config,
491 compaction_config: CompactionConfig::default(),
492 stats: Arc::new(CompactionStats::default()),
493 throttler: CompactionThrottler::new(0),
494 tombstones: BTreeMap::new(),
495 }
496 }
497
498 pub fn with_compaction_config(
500 config: SSTableConfig,
501 compaction_config: CompactionConfig,
502 ) -> Self {
503 let throttler = CompactionThrottler::new(compaction_config.max_compaction_bytes_per_sec);
504 Self {
505 config,
506 compaction_config,
507 stats: Arc::new(CompactionStats::default()),
508 throttler,
509 tombstones: BTreeMap::new(),
510 }
511 }
512
513 pub fn register_tombstone(&mut self, key: Key, created_at: Instant) {
515 self.tombstones.insert(key, created_at);
516 }
517
518 fn is_tombstone_expired(&self, key: &Key) -> bool {
520 if let Some(created_at) = self.tombstones.get(key) {
521 created_at.elapsed() >= self.compaction_config.tombstone_ttl
522 } else {
523 false
524 }
525 }
526
527 pub fn execute_compaction(
529 &mut self,
530 task: CompactionTask,
531 output_dir: &Path,
532 next_sstable_id: &mut u64,
533 ) -> Result<Vec<SSTableMetadata>> {
534 let start_time = Instant::now();
535
536 let files_merged = (task.source_sstables.len() + task.target_sstables.len()) as u64;
538 self.stats
539 .files_merged
540 .fetch_add(files_merged, Ordering::Relaxed);
541
542 let mut all_entries: BTreeMap<Key, Option<CipherBlob>> = BTreeMap::new();
544
545 for sstable in &task.source_sstables {
547 self.read_sstable_entries(&sstable.path, &mut all_entries)?;
548 self.stats
549 .bytes_read
550 .fetch_add(sstable.file_size, Ordering::Relaxed);
551 }
552
553 for sstable in &task.target_sstables {
555 self.read_sstable_entries(&sstable.path, &mut all_entries)?;
556 self.stats
557 .bytes_read
558 .fetch_add(sstable.file_size, Ordering::Relaxed);
559 }
560
561 let output_sstables = self.write_compacted_sstables(
563 all_entries,
564 task.target_level,
565 output_dir,
566 next_sstable_id,
567 )?;
568
569 self.stats
570 .compactions_completed
571 .fetch_add(1, Ordering::Relaxed);
572
573 let duration_ms = start_time.elapsed().as_millis() as u64;
574 self.stats
575 .total_duration_ms
576 .fetch_add(duration_ms, Ordering::Relaxed);
577
578 Ok(output_sstables)
579 }
580
581 fn read_sstable_entries(
583 &self,
584 path: &Path,
585 entries: &mut BTreeMap<Key, Option<CipherBlob>>,
586 ) -> Result<()> {
587 let reader = SSTableReader::open(path)?;
588 let sstable_entries = reader.iter()?;
589
590 for (key, value) in sstable_entries {
591 self.stats.keys_processed.fetch_add(1, Ordering::Relaxed);
592 entries.insert(key, Some(value));
594 }
595
596 Ok(())
597 }
598
599 fn write_compacted_sstables(
601 &mut self,
602 entries: BTreeMap<Key, Option<CipherBlob>>,
603 target_level: usize,
604 output_dir: &Path,
605 next_id: &mut u64,
606 ) -> Result<Vec<SSTableMetadata>> {
607 let mut output_sstables = Vec::new();
608 let mut current_writer: Option<SSTableWriter> = None;
609 let mut current_path: Option<PathBuf> = None;
610 let mut current_size = 0usize;
611 let mut current_min_key: Option<Key> = None;
612 let mut current_max_key: Option<Key> = None;
613 let mut current_entries = 0usize;
614
615 const MAX_SSTABLE_SIZE: usize = 2 * 1024 * 1024; for (key, value_opt) in entries {
618 let value = match value_opt {
620 Some(v) => v,
621 None => {
622 if self.is_tombstone_expired(&key) {
624 self.stats
625 .tombstones_removed
626 .fetch_add(1, Ordering::Relaxed);
627 self.tombstones.remove(&key);
629 continue;
630 }
631 self.stats
634 .tombstones_removed
635 .fetch_add(1, Ordering::Relaxed);
636 continue;
637 }
638 };
639
640 if current_writer.is_none() || current_size >= MAX_SSTABLE_SIZE {
642 if let Some(writer) = current_writer.take() {
644 writer.finish()?;
645
646 if let (Some(path), Some(min_key), Some(max_key)) = (
647 current_path.take(),
648 current_min_key.take(),
649 current_max_key.take(),
650 ) {
651 let file_size = std::fs::metadata(&path)
652 .map_err(|e| {
653 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
654 "Failed to get SSTable size: {}",
655 e
656 )))
657 })?
658 .len();
659
660 self.stats
661 .bytes_written
662 .fetch_add(file_size, Ordering::Relaxed);
663 self.throttler.throttle(file_size);
664
665 output_sstables.push(SSTableMetadata {
666 path,
667 min_key,
668 max_key,
669 num_entries: current_entries,
670 file_size,
671 level: target_level,
672 });
673 }
674 }
675
676 let id = *next_id;
678 *next_id += 1;
679 let path = output_dir.join(format!("L{}_{:08}.sst", target_level, id));
680 let writer = SSTableWriter::new(&path, self.config.clone())?;
681
682 current_writer = Some(writer);
683 current_path = Some(path);
684 current_size = 0;
685 current_min_key = None;
686 current_max_key = None;
687 current_entries = 0;
688 }
689
690 if let Some(ref mut writer) = current_writer {
692 let entry_size = 16 + key.as_bytes().len() + value.as_bytes().len();
693 writer.add(key.clone(), value)?;
694 current_size += entry_size;
695 current_entries += 1;
696
697 if current_min_key.is_none() {
698 current_min_key = Some(key.clone());
699 }
700 current_max_key = Some(key);
701 }
702 }
703
704 if let Some(writer) = current_writer {
706 writer.finish()?;
707
708 if let (Some(path), Some(min_key), Some(max_key)) =
709 (current_path, current_min_key, current_max_key)
710 {
711 let file_size = std::fs::metadata(&path)
712 .map_err(|e| {
713 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
714 "Failed to get SSTable size: {}",
715 e
716 )))
717 })?
718 .len();
719
720 self.stats
721 .bytes_written
722 .fetch_add(file_size, Ordering::Relaxed);
723 self.throttler.throttle(file_size);
724
725 output_sstables.push(SSTableMetadata {
726 path,
727 min_key,
728 max_key,
729 num_entries: current_entries,
730 file_size,
731 level: target_level,
732 });
733 }
734 }
735
736 Ok(output_sstables)
737 }
738
739 pub fn stats(&self) -> &CompactionStats {
741 &self.stats
742 }
743
744 pub fn stats_snapshot(&self) -> CompactionStatsSnapshot {
746 self.stats.snapshot()
747 }
748}
749
750#[cfg(test)]
751mod tests {
752 use super::*;
753
754 #[test]
755 fn test_compaction_planner_l0_threshold() {
756 let config = CompactionConfig::default();
757 let planner = CompactionPlanner::new(config);
758
759 assert!(!planner.needs_l0_compaction(3));
760 assert!(planner.needs_l0_compaction(4));
761 assert!(planner.needs_l0_compaction(5));
762 }
763
764 #[test]
765 fn test_compaction_planner_level_sizes() {
766 let config = CompactionConfig {
767 base_level_size: 10 * 1024 * 1024, level_multiplier: 10,
769 ..Default::default()
770 };
771 let planner = CompactionPlanner::new(config);
772
773 assert_eq!(planner.level_target_size(1), 10 * 1024 * 1024); assert_eq!(planner.level_target_size(2), 100 * 1024 * 1024); assert_eq!(planner.level_target_size(3), 1000 * 1024 * 1024); }
777
778 #[test]
779 fn test_compaction_planner_needs_compaction() {
780 let config = CompactionConfig::default();
781 let planner = CompactionPlanner::new(config);
782
783 assert!(!planner.needs_level_compaction(0, 100 * 1024 * 1024));
785
786 assert!(!planner.needs_level_compaction(1, 5 * 1024 * 1024));
788 assert!(planner.needs_level_compaction(1, 15 * 1024 * 1024));
789 }
790
791 #[test]
792 fn test_find_overlapping_sstables() {
793 let config = CompactionConfig::default();
794 let planner = CompactionPlanner::new(config);
795
796 let source = vec![SSTableMetadata {
797 path: PathBuf::from("s1.sst"),
798 min_key: Key::from_str("key_005"),
799 max_key: Key::from_str("key_015"),
800 num_entries: 10,
801 file_size: 1000,
802 level: 0,
803 }];
804
805 let target = vec![
806 SSTableMetadata {
807 path: PathBuf::from("t1.sst"),
808 min_key: Key::from_str("key_000"),
809 max_key: Key::from_str("key_010"),
810 num_entries: 10,
811 file_size: 1000,
812 level: 1,
813 },
814 SSTableMetadata {
815 path: PathBuf::from("t2.sst"),
816 min_key: Key::from_str("key_020"),
817 max_key: Key::from_str("key_030"),
818 num_entries: 10,
819 file_size: 1000,
820 level: 1,
821 },
822 ];
823
824 let overlapping = planner.find_overlapping_sstables(&source, &target);
825
826 assert_eq!(overlapping.len(), 1);
827 assert_eq!(overlapping[0].path, PathBuf::from("t1.sst"));
828 }
829
830 #[test]
833 fn test_size_tiered_grouping_basic() {
834 let config = CompactionConfig {
835 strategy: CompactionStrategy::SizeTiered,
836 min_sstable_size: 100,
837 size_ratio: 2.0,
838 min_tier_size: 4,
839 ..Default::default()
840 };
841 let planner = CompactionPlanner::new(config);
842
843 let sstables = vec![
845 make_metadata("a.sst", 1000, 0),
846 make_metadata("b.sst", 1200, 0),
847 make_metadata("c.sst", 1500, 0),
848 make_metadata("d.sst", 1800, 0),
849 ];
850
851 let tiers = planner.group_by_size_tier(sstables);
852
853 assert_eq!(tiers.len(), 1);
855 assert_eq!(tiers[0].sstables.len(), 4);
856 }
857
858 #[test]
859 fn test_size_tiered_grouping_multiple_tiers() {
860 let config = CompactionConfig {
861 strategy: CompactionStrategy::SizeTiered,
862 min_sstable_size: 100,
863 size_ratio: 2.0,
864 min_tier_size: 2,
865 ..Default::default()
866 };
867 let planner = CompactionPlanner::new(config);
868
869 let sstables = vec![
871 make_metadata("small1.sst", 1000, 0),
872 make_metadata("small2.sst", 1500, 0),
873 make_metadata("big1.sst", 10000, 0),
874 make_metadata("big2.sst", 15000, 0),
875 ];
876
877 let tiers = planner.group_by_size_tier(sstables);
878
879 assert_eq!(tiers.len(), 2);
880 assert_eq!(tiers[0].sstables.len(), 2); assert_eq!(tiers[1].sstables.len(), 2); }
883
884 #[test]
885 fn test_size_tiered_merge_trigger() {
886 let config = CompactionConfig {
887 strategy: CompactionStrategy::SizeTiered,
888 min_sstable_size: 100,
889 size_ratio: 2.0,
890 min_tier_size: 4,
891 ..Default::default()
892 };
893 let planner = CompactionPlanner::new(config);
894
895 let sstables = vec![
897 make_metadata("a.sst", 1000, 0),
898 make_metadata("b.sst", 1200, 0),
899 make_metadata("c.sst", 1500, 0),
900 ];
901
902 let task = planner.plan_size_tiered_compaction(sstables);
903 assert!(task.is_none(), "Should not trigger with only 3 SSTables");
904
905 let sstables = vec![
907 make_metadata("a.sst", 1000, 0),
908 make_metadata("b.sst", 1200, 0),
909 make_metadata("c.sst", 1500, 0),
910 make_metadata("d.sst", 1800, 0),
911 ];
912
913 let task = planner.plan_size_tiered_compaction(sstables);
914 assert!(
915 task.is_some(),
916 "Should trigger with 4 SSTables in same tier"
917 );
918
919 let task = task.expect("task should be Some");
920 assert_eq!(task.source_sstables.len(), 4);
921 assert_eq!(task.target_level, 1);
922 }
923
924 #[test]
925 fn test_size_tiered_filters_small_sstables() {
926 let config = CompactionConfig {
927 strategy: CompactionStrategy::SizeTiered,
928 min_sstable_size: 500,
929 size_ratio: 2.0,
930 min_tier_size: 4,
931 ..Default::default()
932 };
933 let planner = CompactionPlanner::new(config);
934
935 let sstables = vec![
937 make_metadata("a.sst", 100, 0),
938 make_metadata("b.sst", 200, 0),
939 make_metadata("c.sst", 300, 0),
940 make_metadata("d.sst", 400, 0),
941 ];
942
943 let tiers = planner.group_by_size_tier(sstables);
944 assert!(tiers.is_empty());
945 }
946
947 #[test]
950 fn test_compaction_stats_default() {
951 let stats = CompactionStats::default();
952 let snapshot = stats.snapshot();
953
954 assert_eq!(snapshot.bytes_read, 0);
955 assert_eq!(snapshot.bytes_written, 0);
956 assert_eq!(snapshot.files_merged, 0);
957 assert_eq!(snapshot.compactions_completed, 0);
958 assert_eq!(snapshot.total_duration_ms, 0);
959 assert_eq!(snapshot.keys_processed, 0);
960 assert_eq!(snapshot.tombstones_removed, 0);
961 }
962
963 #[test]
964 fn test_compaction_stats_atomic_updates() {
965 let stats = CompactionStats::default();
966
967 stats.bytes_read.fetch_add(1000, Ordering::Relaxed);
968 stats.bytes_written.fetch_add(500, Ordering::Relaxed);
969 stats.files_merged.fetch_add(3, Ordering::Relaxed);
970 stats.compactions_completed.fetch_add(1, Ordering::Relaxed);
971 stats.total_duration_ms.fetch_add(42, Ordering::Relaxed);
972 stats.keys_processed.fetch_add(100, Ordering::Relaxed);
973 stats.tombstones_removed.fetch_add(5, Ordering::Relaxed);
974
975 let snapshot = stats.snapshot();
976 assert_eq!(snapshot.bytes_read, 1000);
977 assert_eq!(snapshot.bytes_written, 500);
978 assert_eq!(snapshot.files_merged, 3);
979 assert_eq!(snapshot.compactions_completed, 1);
980 assert_eq!(snapshot.total_duration_ms, 42);
981 assert_eq!(snapshot.keys_processed, 100);
982 assert_eq!(snapshot.tombstones_removed, 5);
983 }
984
985 #[test]
986 fn test_compaction_stats_thread_safety() {
987 let stats = Arc::new(CompactionStats::default());
988
989 let handles: Vec<_> = (0..10)
990 .map(|_| {
991 let stats_clone = Arc::clone(&stats);
992 std::thread::spawn(move || {
993 for _ in 0..100 {
994 stats_clone.bytes_read.fetch_add(1, Ordering::Relaxed);
995 stats_clone.keys_processed.fetch_add(1, Ordering::Relaxed);
996 }
997 })
998 })
999 .collect();
1000
1001 for handle in handles {
1002 handle.join().expect("thread should complete");
1003 }
1004
1005 let snapshot = stats.snapshot();
1006 assert_eq!(snapshot.bytes_read, 1000);
1007 assert_eq!(snapshot.keys_processed, 1000);
1008 }
1009
1010 #[test]
1013 fn test_throttler_disabled() {
1014 let mut throttler = CompactionThrottler::new(0);
1015 assert!(!throttler.is_enabled());
1016
1017 let start = Instant::now();
1019 throttler.throttle(1_000_000);
1020 let elapsed = start.elapsed();
1021
1022 assert!(elapsed < Duration::from_millis(50));
1024 }
1025
1026 #[test]
1027 fn test_throttler_enabled() {
1028 let mut throttler = CompactionThrottler::new(10_000); assert!(throttler.is_enabled());
1030
1031 let start = Instant::now();
1033 throttler.throttle(20_000);
1034 let elapsed = start.elapsed();
1035
1036 assert!(
1038 elapsed >= Duration::from_millis(1500),
1039 "Expected >= 1.5s delay, got {:?}",
1040 elapsed
1041 );
1042 }
1043
1044 #[test]
1045 fn test_throttler_small_writes_no_delay() {
1046 let mut throttler = CompactionThrottler::new(1_000_000); assert!(throttler.is_enabled());
1048
1049 let start = Instant::now();
1051 throttler.throttle(100);
1052 let elapsed = start.elapsed();
1053
1054 assert!(elapsed < Duration::from_millis(50));
1055 }
1056
1057 #[test]
1060 fn test_tombstone_registration() {
1061 let config = SSTableConfig::default();
1062 let mut executor = CompactionExecutor::new(config);
1063
1064 let key = Key::from_str("test_key");
1065 let created_at = Instant::now();
1066 executor.register_tombstone(key.clone(), created_at);
1067
1068 assert!(!executor.is_tombstone_expired(&key));
1070 }
1071
1072 #[test]
1073 fn test_tombstone_expiry_with_short_ttl() {
1074 let config = SSTableConfig::default();
1075 let compaction_config = CompactionConfig {
1076 tombstone_ttl: Duration::from_millis(1), ..Default::default()
1078 };
1079 let mut executor = CompactionExecutor::with_compaction_config(config, compaction_config);
1080
1081 let key = Key::from_str("expired_key");
1082 let old_time = Instant::now() - Duration::from_millis(10);
1084 executor.register_tombstone(key.clone(), old_time);
1085
1086 assert!(executor.is_tombstone_expired(&key));
1088 }
1089
1090 #[test]
1091 fn test_tombstone_not_expired() {
1092 let config = SSTableConfig::default();
1093 let compaction_config = CompactionConfig {
1094 tombstone_ttl: Duration::from_secs(3600), ..Default::default()
1096 };
1097 let mut executor = CompactionExecutor::with_compaction_config(config, compaction_config);
1098
1099 let key = Key::from_str("fresh_key");
1100 executor.register_tombstone(key.clone(), Instant::now());
1101
1102 assert!(!executor.is_tombstone_expired(&key));
1104 }
1105
1106 #[test]
1107 fn test_unknown_tombstone_not_expired() {
1108 let config = SSTableConfig::default();
1109 let executor = CompactionExecutor::new(config);
1110
1111 let key = Key::from_str("unknown_key");
1113 assert!(!executor.is_tombstone_expired(&key));
1114 }
1115
1116 #[test]
1119 fn test_plan_compaction_empty_source() {
1120 let config = CompactionConfig::default();
1121 let planner = CompactionPlanner::new(config);
1122
1123 let task = planner.plan_compaction(0, Vec::new(), Vec::new());
1124 assert!(task.is_none());
1125 }
1126
1127 #[test]
1128 fn test_plan_compaction_single_sstable() {
1129 let config = CompactionConfig::default();
1130 let planner = CompactionPlanner::new(config);
1131
1132 let source = vec![SSTableMetadata {
1133 path: PathBuf::from("single.sst"),
1134 min_key: Key::from_str("key_001"),
1135 max_key: Key::from_str("key_010"),
1136 num_entries: 10,
1137 file_size: 1000,
1138 level: 0,
1139 }];
1140
1141 let task = planner.plan_compaction(0, source, Vec::new());
1143 assert!(task.is_some());
1144 let task = task.expect("task should be Some for L0");
1145 assert_eq!(task.source_sstables.len(), 1);
1146 }
1147
1148 #[test]
1149 fn test_size_tiered_empty_input() {
1150 let config = CompactionConfig {
1151 strategy: CompactionStrategy::SizeTiered,
1152 ..Default::default()
1153 };
1154 let planner = CompactionPlanner::new(config);
1155
1156 let task = planner.plan_size_tiered_compaction(Vec::new());
1157 assert!(task.is_none());
1158 }
1159
1160 #[test]
1161 fn test_find_overlapping_empty_source() {
1162 let config = CompactionConfig::default();
1163 let planner = CompactionPlanner::new(config);
1164
1165 let target = vec![SSTableMetadata {
1166 path: PathBuf::from("t1.sst"),
1167 min_key: Key::from_str("key_000"),
1168 max_key: Key::from_str("key_010"),
1169 num_entries: 10,
1170 file_size: 1000,
1171 level: 1,
1172 }];
1173
1174 let overlapping = planner.find_overlapping_sstables(&[], &target);
1175 assert!(overlapping.is_empty());
1176 }
1177
1178 #[test]
1179 fn test_find_overlapping_no_overlap() {
1180 let config = CompactionConfig::default();
1181 let planner = CompactionPlanner::new(config);
1182
1183 let source = vec![SSTableMetadata {
1184 path: PathBuf::from("s1.sst"),
1185 min_key: Key::from_str("aaa"),
1186 max_key: Key::from_str("bbb"),
1187 num_entries: 10,
1188 file_size: 1000,
1189 level: 0,
1190 }];
1191
1192 let target = vec![SSTableMetadata {
1193 path: PathBuf::from("t1.sst"),
1194 min_key: Key::from_str("zzz_000"),
1195 max_key: Key::from_str("zzz_999"),
1196 num_entries: 10,
1197 file_size: 1000,
1198 level: 1,
1199 }];
1200
1201 let overlapping = planner.find_overlapping_sstables(&source, &target);
1202 assert!(overlapping.is_empty());
1203 }
1204
1205 #[test]
1206 fn test_compaction_config_defaults() {
1207 let config = CompactionConfig::default();
1208 assert_eq!(config.strategy, CompactionStrategy::LevelBased);
1209 assert_eq!(config.l0_threshold, 4);
1210 assert_eq!(config.level_multiplier, 10);
1211 assert_eq!(config.min_sstable_size, 1024);
1212 assert_eq!(config.size_ratio, 2.0);
1213 assert_eq!(config.min_tier_size, 4);
1214 assert_eq!(config.max_compaction_bytes_per_sec, 0);
1215 assert_eq!(config.tombstone_ttl, Duration::from_secs(7 * 24 * 3600));
1216 }
1217
1218 #[test]
1219 fn test_executor_stats_accessible() {
1220 let executor = CompactionExecutor::new(SSTableConfig::default());
1221 let snapshot = executor.stats_snapshot();
1222 assert_eq!(snapshot.compactions_completed, 0);
1223 assert_eq!(snapshot.bytes_read, 0);
1224 }
1225
1226 #[test]
1227 fn test_size_tiered_preserves_level_info() {
1228 let config = CompactionConfig {
1229 strategy: CompactionStrategy::SizeTiered,
1230 min_sstable_size: 100,
1231 size_ratio: 2.0,
1232 min_tier_size: 2,
1233 ..Default::default()
1234 };
1235 let planner = CompactionPlanner::new(config);
1236
1237 let sstables = vec![
1239 make_metadata("a.sst", 1000, 2),
1240 make_metadata("b.sst", 1500, 2),
1241 ];
1242
1243 let task = planner.plan_size_tiered_compaction(sstables);
1244 assert!(task.is_some());
1245 let task = task.expect("task should be Some");
1246 assert_eq!(task.source_level, 2);
1247 assert_eq!(task.target_level, 3);
1248 }
1249
1250 #[test]
1251 fn test_level_target_size_l0() {
1252 let config = CompactionConfig::default();
1253 let planner = CompactionPlanner::new(config);
1254 assert_eq!(planner.level_target_size(0), 0);
1255 }
1256
1257 #[test]
1260 fn test_executor_compaction_with_stats() {
1261 let temp_dir =
1262 std::env::temp_dir().join(format!("amaters_compaction_test_{}", std::process::id()));
1263 std::fs::create_dir_all(&temp_dir).expect("should create temp dir");
1264
1265 let sstable_config = SSTableConfig::default();
1266
1267 let path1 = temp_dir.join("L0_00000001.sst");
1269 let path2 = temp_dir.join("L0_00000002.sst");
1270
1271 create_test_sstable(
1272 &path1,
1273 &sstable_config,
1274 &[("key_01", "val_01"), ("key_02", "val_02")],
1275 );
1276 create_test_sstable(
1277 &path2,
1278 &sstable_config,
1279 &[("key_03", "val_03"), ("key_04", "val_04")],
1280 );
1281
1282 let meta1 = make_file_metadata(&path1, 0);
1283 let meta2 = make_file_metadata(&path2, 0);
1284
1285 let task = CompactionTask {
1286 source_level: 0,
1287 target_level: 1,
1288 source_sstables: vec![meta1, meta2],
1289 target_sstables: Vec::new(),
1290 };
1291
1292 let mut executor = CompactionExecutor::new(sstable_config);
1293 let mut next_id = 100u64;
1294
1295 let result = executor.execute_compaction(task, &temp_dir, &mut next_id);
1296 assert!(result.is_ok(), "compaction should succeed");
1297
1298 let output = result.expect("should have output");
1299 assert!(!output.is_empty(), "should produce output SSTables");
1300
1301 let snapshot = executor.stats_snapshot();
1302 assert!(snapshot.bytes_read > 0, "should have read bytes");
1303 assert!(snapshot.bytes_written > 0, "should have written bytes");
1304 assert_eq!(snapshot.compactions_completed, 1);
1305 assert_eq!(snapshot.files_merged, 2);
1306 assert!(
1307 snapshot.keys_processed >= 4,
1308 "should have processed at least 4 keys"
1309 );
1310 assert!(
1311 snapshot.total_duration_ms < 10_000,
1312 "should complete quickly"
1313 );
1314
1315 let _ = std::fs::remove_dir_all(&temp_dir);
1317 }
1318
1319 #[test]
1320 fn test_executor_compaction_with_throttling() {
1321 let temp_dir =
1322 std::env::temp_dir().join(format!("amaters_throttle_test_{}", std::process::id()));
1323 std::fs::create_dir_all(&temp_dir).expect("should create temp dir");
1324
1325 let sstable_config = SSTableConfig::default();
1326
1327 let path1 = temp_dir.join("L0_00000001.sst");
1329 create_test_sstable(
1330 &path1,
1331 &sstable_config,
1332 &[("key_01", "val_01"), ("key_02", "val_02")],
1333 );
1334
1335 let meta1 = make_file_metadata(&path1, 0);
1336
1337 let compaction_config = CompactionConfig {
1338 max_compaction_bytes_per_sec: 0, ..Default::default()
1340 };
1341
1342 let task = CompactionTask {
1343 source_level: 0,
1344 target_level: 1,
1345 source_sstables: vec![meta1],
1346 target_sstables: Vec::new(),
1347 };
1348
1349 let mut executor =
1350 CompactionExecutor::with_compaction_config(sstable_config, compaction_config);
1351 let mut next_id = 200u64;
1352
1353 let start = Instant::now();
1354 let result = executor.execute_compaction(task, &temp_dir, &mut next_id);
1355 let elapsed = start.elapsed();
1356
1357 assert!(result.is_ok());
1358 assert!(elapsed < Duration::from_secs(5));
1360
1361 let _ = std::fs::remove_dir_all(&temp_dir);
1363 }
1364
1365 fn make_metadata(name: &str, file_size: u64, level: usize) -> SSTableMetadata {
1368 SSTableMetadata {
1369 path: PathBuf::from(name),
1370 min_key: Key::from_str(&format!("{}_min", name)),
1371 max_key: Key::from_str(&format!("{}_max", name)),
1372 num_entries: 10,
1373 file_size,
1374 level,
1375 }
1376 }
1377
1378 fn create_test_sstable(path: &Path, config: &SSTableConfig, entries: &[(&str, &str)]) {
1379 let mut writer =
1380 SSTableWriter::new(path, config.clone()).expect("should create SSTable writer");
1381 for (k, v) in entries {
1382 let key = Key::from_str(k);
1383 let value = CipherBlob::new(v.as_bytes().to_vec());
1384 writer.add(key, value).expect("should add entry");
1385 }
1386 writer.finish().expect("should finish writing");
1387 }
1388
1389 fn make_file_metadata(path: &Path, level: usize) -> SSTableMetadata {
1390 let file_size = std::fs::metadata(path)
1391 .expect("SSTable file should exist")
1392 .len();
1393
1394 let reader = SSTableReader::open(path).expect("should open SSTable");
1396 let entries = reader.iter().expect("should read entries");
1397
1398 let min_key = entries
1399 .first()
1400 .map(|(k, _)| k.clone())
1401 .unwrap_or_else(|| Key::from_str(""));
1402 let max_key = entries
1403 .last()
1404 .map(|(k, _)| k.clone())
1405 .unwrap_or_else(|| Key::from_str(""));
1406
1407 SSTableMetadata {
1408 path: path.to_path_buf(),
1409 min_key,
1410 max_key,
1411 num_entries: entries.len(),
1412 file_size,
1413 level,
1414 }
1415 }
1416}