use crate::{
compaction::SchedulerConfig,
state::DbStorageBuilder,
};
#[derive(Debug, Clone)]
pub struct ConfigPoint {
pub storage: DbStorageBuilder,
pub scheduler: SchedulerConfig,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ParameterCategory {
Memory,
Compaction,
Threading,
}
pub struct ParameterSpace {
pub memtable_sizes: Vec<u64>,
pub max_memtables: Vec<u64>,
pub target_segment_sizes: Vec<u64>,
pub target_file_size_multipliers: Vec<u64>,
pub l0_triggers: Vec<usize>,
pub l0_stop_triggers: Vec<usize>,
pub max_concurrent_jobs: Vec<usize>,
pub score_thresholds: Vec<f64>,
pub min_workers: Vec<usize>,
pub max_workers: Vec<usize>,
}
impl Default for ParameterSpace {
fn default() -> Self {
Self {
memtable_sizes: vec![
16 * 1024 * 1024,
32 * 1024 * 1024,
64 * 1024 * 1024,
128 * 1024 * 1024,
],
max_memtables: vec![2, 4, 8, 16],
target_segment_sizes: vec![32 * 1024 * 1024, 64 * 1024 * 1024, 128 * 1024 * 1024],
target_file_size_multipliers: vec![1, 2, 4],
l0_triggers: vec![4, 8, 16, 32],
l0_stop_triggers: vec![8, 16, 32, 64],
max_concurrent_jobs: vec![2, 4, 8, 16],
score_thresholds: vec![0.5, 1.0, 2.0],
min_workers: vec![1, 2, 4],
max_workers: vec![2, 4, 8],
}
}
}
impl ParameterSpace {
pub fn quick() -> Self {
Self {
memtable_sizes: vec![32 * 1024 * 1024, 64 * 1024 * 1024, 128 * 1024 * 1024],
max_memtables: vec![4, 8],
target_segment_sizes: vec![64 * 1024 * 1024, 128 * 1024 * 1024],
target_file_size_multipliers: vec![1, 2],
l0_triggers: vec![8, 16],
l0_stop_triggers: vec![16, 32],
max_concurrent_jobs: vec![4, 8],
score_thresholds: vec![1.0],
min_workers: vec![2, 4],
max_workers: vec![4, 8],
}
}
}
pub struct CoordinateDescent {
space: ParameterSpace,
}
impl CoordinateDescent {
pub fn new(space: ParameterSpace) -> Self {
Self { space }
}
pub fn iterate(&self, base: &ConfigPoint) -> Vec<(ParameterCategory, Vec<ConfigPoint>)> {
let num_cpus = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4);
let mut rounds = Vec::new();
let mut memory_cfgs = Vec::new();
for &size in &self.space.memtable_sizes {
if size == base.storage.memtable_size {
continue;
}
let mut cfg = base.clone();
cfg.storage.memtable_size = size;
memory_cfgs.push(cfg);
}
for &limit in &self.space.max_memtables {
if limit == base.storage.num_memtable_limit {
continue;
}
let mut cfg = base.clone();
cfg.storage.num_memtable_limit = limit;
memory_cfgs.push(cfg);
}
rounds.push((ParameterCategory::Memory, memory_cfgs));
let mut compaction_cfgs = Vec::new();
for &size in &self.space.target_segment_sizes {
if size == base.scheduler.target_segment_size {
continue;
}
let mut cfg = base.clone();
cfg.scheduler.target_segment_size = size;
cfg.storage.target_segment_size = size;
compaction_cfgs.push(cfg);
}
for &mult in &self.space.target_file_size_multipliers {
if mult == base.scheduler.target_file_size_multiplier {
continue;
}
let mut cfg = base.clone();
cfg.scheduler.target_file_size_multiplier = mult;
compaction_cfgs.push(cfg);
}
for &trigger in &self.space.l0_triggers {
if trigger == base.scheduler.l0_compaction_trigger {
continue;
}
let mut cfg = base.clone();
cfg.scheduler.l0_compaction_trigger = trigger;
compaction_cfgs.push(cfg);
}
for &stop in &self.space.l0_stop_triggers {
if stop == base.scheduler.l0_stop_writes_trigger {
continue;
}
let mut cfg = base.clone();
cfg.scheduler.l0_stop_writes_trigger = stop;
compaction_cfgs.push(cfg);
}
for &max_jobs in &self.space.max_concurrent_jobs {
if max_jobs == base.scheduler.max_concurrent_jobs {
continue;
}
let mut cfg = base.clone();
cfg.scheduler.max_concurrent_jobs = max_jobs;
compaction_cfgs.push(cfg);
}
for &threshold in &self.space.score_thresholds {
if (threshold - base.scheduler.score_threshold).abs() < f64::EPSILON {
continue;
}
let mut cfg = base.clone();
cfg.scheduler.score_threshold = threshold;
compaction_cfgs.push(cfg);
}
rounds.push((ParameterCategory::Compaction, compaction_cfgs));
let mut threading_cfgs = Vec::new();
for &min_w in &self.space.min_workers {
let mut cfg = base.clone();
cfg.scheduler.max_concurrent_jobs = min_w.max(cfg.scheduler.max_concurrent_jobs);
threading_cfgs.push(cfg);
}
for &max_w in &self.space.max_workers {
let mut cfg = base.clone();
if max_w >= num_cpus / 2 {
cfg.scheduler.max_concurrent_jobs = max_w;
threading_cfgs.push(cfg);
}
}
rounds.push((ParameterCategory::Threading, threading_cfgs));
rounds
}
}