use std::fmt;
#[derive(Debug, Clone, Copy)]
pub struct CompactionThresholds {
pub operation_threshold: u64,
pub tombstone_ratio_threshold: u8,
pub min_ops_for_ratio_check: u64,
}
impl Default for CompactionThresholds {
fn default() -> Self {
Self {
operation_threshold: 1000, tombstone_ratio_threshold: 20, min_ops_for_ratio_check: 100, }
}
}
impl CompactionThresholds {
#[must_use]
pub fn new(operation_threshold: u64, tombstone_ratio_threshold: u8) -> Self {
Self {
operation_threshold,
tombstone_ratio_threshold,
min_ops_for_ratio_check: 100,
}
}
#[must_use]
pub fn with_min_ops(mut self, min_ops: u64) -> Self {
self.min_ops_for_ratio_check = min_ops;
self
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompactionTrigger {
None,
OperationThreshold {
count: u64,
threshold: u64,
},
TombstoneRatio {
ratio: u8,
threshold: u8,
},
}
impl fmt::Display for CompactionTrigger {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::None => write!(f, "no compaction needed"),
Self::OperationThreshold { count, threshold } => {
write!(
f,
"operation threshold exceeded: {count} ops >= {threshold} threshold"
)
}
Self::TombstoneRatio { ratio, threshold } => {
write!(
f,
"tombstone ratio exceeded: {ratio}% >= {threshold}% threshold"
)
}
}
}
}
#[derive(Debug, Clone, Default)]
pub struct CompactionScheduler {
thresholds: CompactionThresholds,
}
impl CompactionScheduler {
#[must_use]
pub fn new(thresholds: CompactionThresholds) -> Self {
Self { thresholds }
}
#[must_use]
pub fn thresholds(&self) -> &CompactionThresholds {
&self.thresholds
}
pub fn set_thresholds(&mut self, thresholds: CompactionThresholds) {
self.thresholds = thresholds;
}
#[must_use]
pub fn should_compact(
&self,
total_ops: u64,
_add_count: u64,
remove_count: u64,
) -> CompactionTrigger {
if total_ops >= self.thresholds.operation_threshold {
return CompactionTrigger::OperationThreshold {
count: total_ops,
threshold: self.thresholds.operation_threshold,
};
}
if total_ops >= self.thresholds.min_ops_for_ratio_check && total_ops > 0 {
let ratio = Self::calculate_ratio(remove_count, total_ops);
if ratio >= self.thresholds.tombstone_ratio_threshold {
return CompactionTrigger::TombstoneRatio {
ratio,
threshold: self.thresholds.tombstone_ratio_threshold,
};
}
}
CompactionTrigger::None
}
#[must_use]
pub fn should_compact_from_snapshot(
&self,
committed_delta: u64,
committed_removes: u64,
) -> CompactionTrigger {
let estimated_adds = committed_delta.saturating_sub(committed_removes);
self.should_compact(committed_delta, estimated_adds, committed_removes)
}
#[inline]
fn calculate_ratio(removes: u64, total: u64) -> u8 {
if total == 0 {
return 0;
}
let ratio = (removes.saturating_mul(100)) / total;
ratio.min(100) as u8
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct SchedulerStats {
pub check_count: u64,
pub trigger_count: u64,
pub operation_triggers: u64,
pub ratio_triggers: u64,
}
impl SchedulerStats {
pub fn record(&mut self, trigger: &CompactionTrigger) {
self.check_count += 1;
match trigger {
CompactionTrigger::None => {}
CompactionTrigger::OperationThreshold { .. } => {
self.trigger_count += 1;
self.operation_triggers += 1;
}
CompactionTrigger::TombstoneRatio { .. } => {
self.trigger_count += 1;
self.ratio_triggers += 1;
}
}
}
#[must_use]
pub fn trigger_rate(&self) -> u8 {
if self.check_count == 0 {
return 0;
}
let rate = (self.trigger_count.saturating_mul(100)) / self.check_count;
rate.min(100) as u8
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_default_thresholds() {
let thresholds = CompactionThresholds::default();
assert_eq!(thresholds.operation_threshold, 1000);
assert_eq!(thresholds.tombstone_ratio_threshold, 20);
assert_eq!(thresholds.min_ops_for_ratio_check, 100);
}
#[test]
fn test_custom_thresholds() {
let thresholds = CompactionThresholds::new(500, 30);
assert_eq!(thresholds.operation_threshold, 500);
assert_eq!(thresholds.tombstone_ratio_threshold, 30);
}
#[test]
fn test_thresholds_with_min_ops() {
let thresholds = CompactionThresholds::new(500, 30).with_min_ops(50);
assert_eq!(thresholds.min_ops_for_ratio_check, 50);
}
#[test]
fn test_scheduler_default() {
let scheduler = CompactionScheduler::default();
assert_eq!(scheduler.thresholds().operation_threshold, 1000);
}
#[test]
fn test_no_compaction_below_threshold() {
let scheduler = CompactionScheduler::default();
let trigger = scheduler.should_compact(500, 450, 50);
assert_eq!(trigger, CompactionTrigger::None);
}
#[test]
fn test_operation_threshold_trigger() {
let scheduler = CompactionScheduler::default();
let trigger = scheduler.should_compact(1000, 800, 200);
match trigger {
CompactionTrigger::OperationThreshold { count, threshold } => {
assert_eq!(count, 1000);
assert_eq!(threshold, 1000);
}
_ => panic!("expected OperationThreshold trigger"),
}
}
#[test]
fn test_operation_threshold_exceeded() {
let scheduler = CompactionScheduler::default();
let trigger = scheduler.should_compact(1500, 1200, 300);
match trigger {
CompactionTrigger::OperationThreshold { count, .. } => {
assert_eq!(count, 1500);
}
_ => panic!("expected OperationThreshold trigger"),
}
}
#[test]
fn test_tombstone_ratio_trigger() {
let thresholds = CompactionThresholds::new(2000, 20);
let scheduler = CompactionScheduler::new(thresholds);
let trigger = scheduler.should_compact(150, 100, 50);
match trigger {
CompactionTrigger::TombstoneRatio { ratio, threshold } => {
assert_eq!(ratio, 33);
assert_eq!(threshold, 20);
}
_ => panic!("expected TombstoneRatio trigger"),
}
}
#[test]
fn test_tombstone_ratio_below_min_ops() {
let scheduler = CompactionScheduler::default();
let trigger = scheduler.should_compact(50, 0, 50);
assert_eq!(trigger, CompactionTrigger::None);
}
#[test]
fn test_operation_threshold_takes_priority() {
let thresholds = CompactionThresholds::new(100, 20);
let scheduler = CompactionScheduler::new(thresholds);
let trigger = scheduler.should_compact(100, 20, 80);
match trigger {
CompactionTrigger::OperationThreshold { .. } => {}
_ => panic!("expected OperationThreshold trigger (takes priority)"),
}
}
#[test]
fn test_trigger_display() {
let none = CompactionTrigger::None;
assert_eq!(format!("{none}"), "no compaction needed");
let ops = CompactionTrigger::OperationThreshold {
count: 1500,
threshold: 1000,
};
assert!(format!("{ops}").contains("1500 ops"));
assert!(format!("{ops}").contains("1000 threshold"));
let ratio = CompactionTrigger::TombstoneRatio {
ratio: 25,
threshold: 20,
};
assert!(format!("{ratio}").contains("25%"));
assert!(format!("{ratio}").contains("20% threshold"));
}
#[test]
fn test_scheduler_stats_recording() {
let mut stats = SchedulerStats::default();
stats.record(&CompactionTrigger::None);
assert_eq!(stats.check_count, 1);
assert_eq!(stats.trigger_count, 0);
stats.record(&CompactionTrigger::OperationThreshold {
count: 1000,
threshold: 1000,
});
assert_eq!(stats.check_count, 2);
assert_eq!(stats.trigger_count, 1);
assert_eq!(stats.operation_triggers, 1);
stats.record(&CompactionTrigger::TombstoneRatio {
ratio: 30,
threshold: 20,
});
assert_eq!(stats.check_count, 3);
assert_eq!(stats.trigger_count, 2);
assert_eq!(stats.ratio_triggers, 1);
}
#[test]
fn test_trigger_rate_calculation() {
let mut stats = SchedulerStats::default();
assert_eq!(stats.trigger_rate(), 0);
stats.check_count = 4;
stats.trigger_count = 1;
assert_eq!(stats.trigger_rate(), 25);
stats.trigger_count = 3;
assert_eq!(stats.trigger_rate(), 75);
}
#[test]
fn test_should_compact_from_snapshot() {
let scheduler = CompactionScheduler::default();
let trigger = scheduler.should_compact_from_snapshot(500, 50);
assert_eq!(trigger, CompactionTrigger::None);
let trigger = scheduler.should_compact_from_snapshot(1500, 300);
match trigger {
CompactionTrigger::OperationThreshold { count, .. } => {
assert_eq!(count, 1500);
}
_ => panic!("expected OperationThreshold"),
}
}
#[test]
fn test_calculate_ratio_edge_cases() {
assert_eq!(CompactionScheduler::calculate_ratio(10, 0), 0);
assert_eq!(CompactionScheduler::calculate_ratio(100, 100), 100);
assert_eq!(CompactionScheduler::calculate_ratio(0, 100), 0);
assert_eq!(CompactionScheduler::calculate_ratio(25, 100), 25);
}
#[test]
fn test_set_thresholds() {
let mut scheduler = CompactionScheduler::default();
assert_eq!(scheduler.thresholds().operation_threshold, 1000);
scheduler.set_thresholds(CompactionThresholds::new(500, 30));
assert_eq!(scheduler.thresholds().operation_threshold, 500);
assert_eq!(scheduler.thresholds().tombstone_ratio_threshold, 30);
}
}