use std::{
sync::Arc,
time::{
Duration,
Instant,
},
};
use crate::{
compaction::workload::{
WorkloadAnalysis,
WorkloadPattern,
WorkloadStats,
},
levels::CompactionStrategy,
};
#[derive(Debug, Clone)]
pub struct AdaptationPolicy {
pub min_confidence: f64,
pub min_adaptation_interval: Duration,
pub min_ops_before_adapt: u64,
pub read_amp_threshold: f64,
pub write_amp_threshold: f64,
}
impl Default for AdaptationPolicy {
fn default() -> Self {
Self {
min_confidence: 0.7,
min_adaptation_interval: Duration::from_secs(300), min_ops_before_adapt: 10000,
read_amp_threshold: 5.0,
write_amp_threshold: 20.0,
}
}
}
pub struct WorkloadAdaptor {
stats: Arc<WorkloadStats>,
policy: AdaptationPolicy,
last_adaptation: Option<Instant>,
current_strategy: Option<CompactionStrategy>,
}
impl WorkloadAdaptor {
pub fn new(stats: Arc<WorkloadStats>, policy: AdaptationPolicy) -> Self {
Self {
stats,
policy,
last_adaptation: None,
current_strategy: None,
}
}
pub fn recommend_strategy(&mut self) -> Option<StrategyRecommendation> {
let analysis = self.stats.analyze();
if analysis.confidence < self.policy.min_confidence {
return None;
}
let snapshot = self.stats.snapshot();
let total_ops = snapshot.gets + snapshot.puts + snapshot.deletes + snapshot.scans;
if total_ops < self.policy.min_ops_before_adapt {
return None;
}
if let Some(last) = self.last_adaptation {
if last.elapsed() < self.policy.min_adaptation_interval {
return None;
}
}
let recommended = match analysis.pattern {
| WorkloadPattern::WriteHeavy => {
CompactionStrategy::Leveled {
fanout: 10,
target_file_count: 10,
}
},
| WorkloadPattern::ReadHeavy => {
CompactionStrategy::Tiered {
size_ratio: 2.0,
min_merge_width: 2,
max_merge_width: 4,
}
},
| WorkloadPattern::ScanHeavy => {
CompactionStrategy::Leveled {
fanout: 10,
target_file_count: 10,
}
},
| WorkloadPattern::PointLookup => {
CompactionStrategy::Tiered {
size_ratio: 2.0,
min_merge_width: 2,
max_merge_width: 4,
}
},
| WorkloadPattern::Balanced => {
CompactionStrategy::Leveled {
fanout: 8,
target_file_count: 8,
}
},
};
let should_switch = if let Some(ref current) = self.current_strategy {
!self.strategies_equivalent(current, &recommended)
} else {
true };
if !should_switch {
return None;
}
let reason = if analysis.read_amplification > self.policy.read_amp_threshold {
ChangeReason::HighReadAmplification(analysis.read_amplification)
} else if analysis.write_amplification > self.policy.write_amp_threshold {
ChangeReason::HighWriteAmplification(analysis.write_amplification)
} else {
ChangeReason::WorkloadPatternChange(analysis.pattern)
};
self.last_adaptation = Some(Instant::now());
self.current_strategy = Some(recommended.clone());
Some(StrategyRecommendation {
strategy: recommended,
reason,
analysis: analysis.clone(),
})
}
fn strategies_equivalent(&self, a: &CompactionStrategy, b: &CompactionStrategy) -> bool {
match (a, b) {
| (CompactionStrategy::Tiered { .. }, CompactionStrategy::Tiered { .. }) => true,
| (CompactionStrategy::Leveled { .. }, CompactionStrategy::Leveled { .. }) => true,
| (CompactionStrategy::Universal { .. }, CompactionStrategy::Universal { .. }) => true,
| _ => false,
}
}
pub fn current_analysis(&self) -> WorkloadAnalysis {
self.stats.analyze()
}
pub fn reset(&mut self) {
self.last_adaptation = None;
self.current_strategy = None;
}
}
#[derive(Debug, Clone)]
pub enum ChangeReason {
WorkloadPatternChange(WorkloadPattern),
HighReadAmplification(f64),
HighWriteAmplification(f64),
}
impl std::fmt::Display for ChangeReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
| Self::WorkloadPatternChange(pattern) => {
write!(f, "Workload pattern changed to {:?}", pattern)
},
| Self::HighReadAmplification(amp) => {
write!(f, "High read amplification ({:.2}x)", amp)
},
| Self::HighWriteAmplification(amp) => {
write!(f, "High write amplification ({:.2}x)", amp)
},
}
}
}
#[derive(Debug, Clone)]
pub struct StrategyRecommendation {
pub strategy: CompactionStrategy,
pub reason: ChangeReason,
pub analysis: WorkloadAnalysis,
}
impl std::fmt::Display for StrategyRecommendation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Recommend {:?} strategy: {} ({})",
self.strategy, self.reason, self.analysis
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_adaptor_creation() {
let stats = Arc::new(WorkloadStats::new());
let policy = AdaptationPolicy::default();
let adaptor = WorkloadAdaptor::new(stats, policy);
assert!(adaptor.last_adaptation.is_none());
assert!(adaptor.current_strategy.is_none());
}
#[test]
fn test_not_enough_confidence() {
let stats = Arc::new(WorkloadStats::new());
let policy = AdaptationPolicy {
min_confidence: 0.9,
..Default::default()
};
let mut adaptor = WorkloadAdaptor::new(stats, policy);
assert!(adaptor.recommend_strategy().is_none());
}
#[test]
fn test_write_heavy_recommendation() {
let stats = Arc::new(WorkloadStats::new());
let policy = AdaptationPolicy {
min_ops_before_adapt: 100,
min_confidence: 0.6,
..Default::default()
};
let mut adaptor = WorkloadAdaptor::new(Arc::clone(&stats), policy);
for _ in 0..800 {
stats.record_put(1000);
}
for _ in 0..200 {
stats.record_get(1000);
}
let recommendation = adaptor.recommend_strategy();
assert!(recommendation.is_some());
let rec = recommendation.unwrap();
assert!(matches!(rec.strategy, CompactionStrategy::Leveled { .. }));
assert_eq!(rec.analysis.pattern, WorkloadPattern::WriteHeavy);
}
#[test]
fn test_read_heavy_recommendation() {
let stats = Arc::new(WorkloadStats::new());
let policy = AdaptationPolicy {
min_ops_before_adapt: 100,
min_confidence: 0.6,
..Default::default()
};
let mut adaptor = WorkloadAdaptor::new(Arc::clone(&stats), policy);
for _ in 0..800 {
stats.record_get(1000);
}
for _ in 0..200 {
stats.record_put(1000);
}
let recommendation = adaptor.recommend_strategy();
assert!(recommendation.is_some());
let rec = recommendation.unwrap();
assert!(matches!(rec.strategy, CompactionStrategy::Tiered { .. }));
assert_eq!(rec.analysis.pattern, WorkloadPattern::ReadHeavy);
}
#[test]
fn test_scan_heavy_recommendation() {
let stats = Arc::new(WorkloadStats::new());
let policy = AdaptationPolicy {
min_ops_before_adapt: 100,
min_confidence: 0.6,
..Default::default()
};
let mut adaptor = WorkloadAdaptor::new(Arc::clone(&stats), policy);
for _ in 0..500 {
stats.record_scan(100, 10000);
}
for _ in 0..500 {
stats.record_get(1000);
}
let recommendation = adaptor.recommend_strategy();
assert!(recommendation.is_some());
let rec = recommendation.unwrap();
assert!(matches!(rec.strategy, CompactionStrategy::Leveled { .. }));
assert_eq!(rec.analysis.pattern, WorkloadPattern::ScanHeavy);
}
#[test]
fn test_min_ops_threshold() {
let stats = Arc::new(WorkloadStats::new());
let policy = AdaptationPolicy {
min_ops_before_adapt: 1000,
..Default::default()
};
let mut adaptor = WorkloadAdaptor::new(Arc::clone(&stats), policy);
for _ in 0..500 {
stats.record_put(1000);
}
assert!(adaptor.recommend_strategy().is_none());
for _ in 0..600 {
stats.record_put(1000);
}
assert!(adaptor.recommend_strategy().is_some());
}
#[test]
fn test_adaptation_interval() {
let stats = Arc::new(WorkloadStats::new());
let policy = AdaptationPolicy {
min_ops_before_adapt: 100,
min_confidence: 0.6,
min_adaptation_interval: Duration::from_secs(60),
..Default::default()
};
let mut adaptor = WorkloadAdaptor::new(Arc::clone(&stats), policy);
for _ in 0..800 {
stats.record_put(1000);
}
for _ in 0..200 {
stats.record_get(1000);
}
assert!(adaptor.recommend_strategy().is_some());
stats.reset();
for _ in 0..800 {
stats.record_get(1000);
}
for _ in 0..200 {
stats.record_put(1000);
}
assert!(adaptor.recommend_strategy().is_none());
}
#[test]
fn test_no_change_same_strategy() {
let stats = Arc::new(WorkloadStats::new());
let policy = AdaptationPolicy {
min_ops_before_adapt: 100,
min_confidence: 0.6,
min_adaptation_interval: Duration::from_secs(0),
..Default::default()
};
let mut adaptor = WorkloadAdaptor::new(Arc::clone(&stats), policy);
for _ in 0..800 {
stats.record_put(1000);
}
for _ in 0..200 {
stats.record_get(1000);
}
assert!(adaptor.recommend_strategy().is_some());
stats.reset();
for _ in 0..800 {
stats.record_put(1000);
}
for _ in 0..200 {
stats.record_get(1000);
}
assert!(adaptor.recommend_strategy().is_none());
}
#[test]
fn test_reset() {
let stats = Arc::new(WorkloadStats::new());
let policy = AdaptationPolicy::default();
let mut adaptor = WorkloadAdaptor::new(stats, policy);
adaptor.last_adaptation = Some(Instant::now());
adaptor.current_strategy = Some(CompactionStrategy::Leveled {
fanout: 10,
target_file_count: 10,
});
adaptor.reset();
assert!(adaptor.last_adaptation.is_none());
assert!(adaptor.current_strategy.is_none());
}
}