use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::RwLock;
use std::time::Instant;
const NUM_ARMS: usize = 8;
const UCB_C: f64 = 1.41421356;
#[allow(dead_code)]
const FEATURE_WINDOW_SIZE: usize = 1000;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WorkloadType {
Oltp,
Olap,
Mixed,
VectorSearch,
Unknown,
}
impl WorkloadType {
pub fn as_str(&self) -> &'static str {
match self {
Self::Oltp => "OLTP",
Self::Olap => "OLAP",
Self::Mixed => "Mixed",
Self::VectorSearch => "VectorSearch",
Self::Unknown => "Unknown",
}
}
}
#[derive(Default)]
struct OperationCounters {
point_reads: AtomicU64,
range_scans: AtomicU64,
inserts: AtomicU64,
updates: AtomicU64,
deletes: AtomicU64,
vector_searches: AtomicU64,
}
#[derive(Debug, Clone, Default)]
pub struct FeatureVector {
pub read_fraction: f64,
pub write_fraction: f64,
pub scan_fraction: f64,
pub vector_fraction: f64,
pub avg_latency_ms: f64,
pub ops_per_second: f64,
pub key_locality: f64,
}
impl FeatureVector {
pub fn classify(&self) -> WorkloadType {
if self.ops_per_second < 1.0 {
return WorkloadType::Unknown;
}
if self.vector_fraction > 0.3 {
return WorkloadType::VectorSearch;
}
if self.write_fraction > 0.7 {
return WorkloadType::Oltp;
}
if self.scan_fraction > 0.3 {
return WorkloadType::Olap;
}
if self.read_fraction > 0.7 {
return WorkloadType::Olap;
}
WorkloadType::Mixed
}
}
#[derive(Debug, Clone)]
pub struct TuningConfig {
pub memtable_size: usize,
pub write_buffer_count: usize,
pub batch_size: usize,
pub prefetch_distance: usize,
pub flush_interval_ms: u64,
pub compaction_priority: CompactionPriority,
pub cache_ratio: f64,
pub hnsw_ef_search: usize,
}
impl Default for TuningConfig {
fn default() -> Self {
Self {
memtable_size: 64 * 1024 * 1024, write_buffer_count: 2,
batch_size: 256,
prefetch_distance: 4,
flush_interval_ms: 1000,
compaction_priority: CompactionPriority::Balanced,
cache_ratio: 0.5,
hnsw_ef_search: 100,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompactionPriority {
WriteOptimized,
ReadOptimized,
Balanced,
}
fn get_arm_config(arm: usize) -> TuningConfig {
match arm {
0 => TuningConfig {
memtable_size: 32 * 1024 * 1024,
write_buffer_count: 4,
batch_size: 64,
prefetch_distance: 2,
flush_interval_ms: 500,
compaction_priority: CompactionPriority::WriteOptimized,
cache_ratio: 0.3,
hnsw_ef_search: 50,
},
1 => TuningConfig {
memtable_size: 256 * 1024 * 1024,
write_buffer_count: 2,
batch_size: 1024,
prefetch_distance: 16,
flush_interval_ms: 5000,
compaction_priority: CompactionPriority::ReadOptimized,
cache_ratio: 0.8,
hnsw_ef_search: 200,
},
2 => TuningConfig {
memtable_size: 128 * 1024 * 1024,
write_buffer_count: 2,
batch_size: 512,
prefetch_distance: 8,
flush_interval_ms: 2000,
compaction_priority: CompactionPriority::Balanced,
cache_ratio: 0.9,
hnsw_ef_search: 300,
},
3 => TuningConfig {
memtable_size: 64 * 1024 * 1024,
write_buffer_count: 3,
batch_size: 256,
prefetch_distance: 4,
flush_interval_ms: 1000,
compaction_priority: CompactionPriority::Balanced,
cache_ratio: 0.5,
hnsw_ef_search: 100,
},
4 => TuningConfig {
memtable_size: 128 * 1024 * 1024,
write_buffer_count: 6,
batch_size: 128,
prefetch_distance: 2,
flush_interval_ms: 300,
compaction_priority: CompactionPriority::WriteOptimized,
cache_ratio: 0.2,
hnsw_ef_search: 50,
},
5 => TuningConfig {
memtable_size: 32 * 1024 * 1024,
write_buffer_count: 2,
batch_size: 512,
prefetch_distance: 32,
flush_interval_ms: 3000,
compaction_priority: CompactionPriority::ReadOptimized,
cache_ratio: 0.95,
hnsw_ef_search: 150,
},
6 => TuningConfig {
memtable_size: 16 * 1024 * 1024,
write_buffer_count: 8,
batch_size: 32,
prefetch_distance: 1,
flush_interval_ms: 200,
compaction_priority: CompactionPriority::Balanced,
cache_ratio: 0.6,
hnsw_ef_search: 75,
},
7 => TuningConfig {
memtable_size: 512 * 1024 * 1024,
write_buffer_count: 2,
batch_size: 2048,
prefetch_distance: 64,
flush_interval_ms: 10000,
compaction_priority: CompactionPriority::WriteOptimized,
cache_ratio: 0.4,
hnsw_ef_search: 100,
},
_ => TuningConfig::default(),
}
}
struct UcbArm {
count: AtomicU64,
total_reward: RwLock<f64>,
sum_squared_reward: RwLock<f64>,
}
impl UcbArm {
fn new() -> Self {
Self {
count: AtomicU64::new(0),
total_reward: RwLock::new(0.0),
sum_squared_reward: RwLock::new(0.0),
}
}
fn avg_reward(&self) -> f64 {
let count = self.count.load(Ordering::Relaxed);
if count == 0 {
return 0.0;
}
*self.total_reward.read().unwrap() / count as f64
}
fn record_reward(&self, reward: f64) {
self.count.fetch_add(1, Ordering::Relaxed);
*self.total_reward.write().unwrap() += reward;
*self.sum_squared_reward.write().unwrap() += reward * reward;
}
fn ucb(&self, total_count: u64) -> f64 {
let count = self.count.load(Ordering::Relaxed);
if count == 0 {
return f64::MAX; }
let avg = self.avg_reward();
let exploration = UCB_C * ((total_count as f64).ln() / count as f64).sqrt();
avg + exploration
}
}
pub struct WorkloadClassifier {
counters: OperationCounters,
arms: [UcbArm; NUM_ARMS],
current_arm: RwLock<usize>,
current_config: RwLock<TuningConfig>,
start_time: Instant,
#[allow(dead_code)]
last_feature_time: RwLock<Instant>,
#[allow(dead_code)]
cached_features: RwLock<FeatureVector>,
reward_start: RwLock<Option<Instant>>,
ops_at_reward_start: AtomicU64,
}
impl WorkloadClassifier {
pub fn new() -> Self {
Self {
counters: OperationCounters::default(),
arms: std::array::from_fn(|_| UcbArm::new()),
current_arm: RwLock::new(3), current_config: RwLock::new(get_arm_config(3)),
start_time: Instant::now(),
last_feature_time: RwLock::new(Instant::now()),
cached_features: RwLock::new(FeatureVector::default()),
reward_start: RwLock::new(None),
ops_at_reward_start: AtomicU64::new(0),
}
}
#[inline]
pub fn record_point_read(&self) {
self.counters.point_reads.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn record_range_scan(&self) {
self.counters.range_scans.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn record_insert(&self) {
self.counters.inserts.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn record_update(&self) {
self.counters.updates.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn record_delete(&self) {
self.counters.deletes.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn record_vector_search(&self) {
self.counters.vector_searches.fetch_add(1, Ordering::Relaxed);
}
fn total_ops(&self) -> u64 {
self.counters.point_reads.load(Ordering::Relaxed)
+ self.counters.range_scans.load(Ordering::Relaxed)
+ self.counters.inserts.load(Ordering::Relaxed)
+ self.counters.updates.load(Ordering::Relaxed)
+ self.counters.deletes.load(Ordering::Relaxed)
+ self.counters.vector_searches.load(Ordering::Relaxed)
}
pub fn extract_features(&self) -> FeatureVector {
let reads = self.counters.point_reads.load(Ordering::Relaxed);
let scans = self.counters.range_scans.load(Ordering::Relaxed);
let inserts = self.counters.inserts.load(Ordering::Relaxed);
let updates = self.counters.updates.load(Ordering::Relaxed);
let deletes = self.counters.deletes.load(Ordering::Relaxed);
let vectors = self.counters.vector_searches.load(Ordering::Relaxed);
let total = reads + scans + inserts + updates + deletes + vectors;
let total_f = total.max(1) as f64;
let elapsed = self.start_time.elapsed().as_secs_f64().max(0.001);
FeatureVector {
read_fraction: (reads + scans) as f64 / total_f,
write_fraction: (inserts + updates + deletes) as f64 / total_f,
scan_fraction: scans as f64 / total_f,
vector_fraction: vectors as f64 / total_f,
avg_latency_ms: 1.0, ops_per_second: total as f64 / elapsed,
key_locality: 0.5, }
}
pub fn workload_type(&self) -> WorkloadType {
self.extract_features().classify()
}
pub fn current_config(&self) -> TuningConfig {
self.current_config.read().unwrap().clone()
}
pub fn start_reward_measurement(&self) {
*self.reward_start.write().unwrap() = Some(Instant::now());
self.ops_at_reward_start.store(self.total_ops(), Ordering::Relaxed);
}
pub fn end_reward_measurement(&self) {
let start = match *self.reward_start.read().unwrap() {
Some(t) => t,
None => return,
};
let elapsed = start.elapsed().as_secs_f64();
if elapsed < 0.001 {
return;
}
let ops_start = self.ops_at_reward_start.load(Ordering::Relaxed);
let ops_now = self.total_ops();
let throughput = (ops_now - ops_start) as f64 / elapsed;
let reward = (throughput / 100000.0).min(1.0);
let arm_idx = *self.current_arm.read().unwrap();
self.arms[arm_idx].record_reward(reward);
*self.reward_start.write().unwrap() = None;
}
pub fn select_arm(&self) -> usize {
let total_count: u64 = self.arms.iter()
.map(|a| a.count.load(Ordering::Relaxed))
.sum();
if total_count < NUM_ARMS as u64 {
return total_count as usize;
}
self.arms.iter()
.enumerate()
.max_by(|(_, a), (_, b)| {
a.ucb(total_count)
.partial_cmp(&b.ucb(total_count))
.unwrap_or(std::cmp::Ordering::Equal)
})
.map(|(i, _)| i)
.unwrap_or(0)
}
pub fn update_config(&self) {
self.end_reward_measurement();
let new_arm = self.select_arm();
let new_config = get_arm_config(new_arm);
*self.current_arm.write().unwrap() = new_arm;
*self.current_config.write().unwrap() = new_config;
self.start_reward_measurement();
}
pub fn stats(&self) -> ClassifierStats {
let features = self.extract_features();
let arm_stats: Vec<_> = self.arms.iter()
.enumerate()
.map(|(i, arm)| ArmStats {
arm_id: i,
count: arm.count.load(Ordering::Relaxed),
avg_reward: arm.avg_reward(),
})
.collect();
ClassifierStats {
workload_type: features.classify(),
features,
current_arm: *self.current_arm.read().unwrap(),
arm_stats,
}
}
}
impl Default for WorkloadClassifier {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct ArmStats {
pub arm_id: usize,
pub count: u64,
pub avg_reward: f64,
}
#[derive(Debug, Clone)]
pub struct ClassifierStats {
pub workload_type: WorkloadType,
pub features: FeatureVector,
pub current_arm: usize,
pub arm_stats: Vec<ArmStats>,
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
use std::time::Duration;
#[test]
fn test_feature_extraction() {
let classifier = WorkloadClassifier::new();
for _ in 0..100 {
classifier.record_insert();
classifier.record_update();
}
for _ in 0..50 {
classifier.record_point_read();
}
let features = classifier.extract_features();
assert!(features.write_fraction > 0.5);
let workload = features.classify();
assert_eq!(workload, WorkloadType::Oltp);
}
#[test]
fn test_olap_detection() {
let classifier = WorkloadClassifier::new();
for _ in 0..100 {
classifier.record_range_scan();
classifier.record_point_read();
}
for _ in 0..10 {
classifier.record_insert();
}
let features = classifier.extract_features();
let workload = features.classify();
assert_eq!(workload, WorkloadType::Olap);
}
#[test]
fn test_vector_search_detection() {
let classifier = WorkloadClassifier::new();
for _ in 0..100 {
classifier.record_vector_search();
}
for _ in 0..50 {
classifier.record_point_read();
}
let features = classifier.extract_features();
let workload = features.classify();
assert_eq!(workload, WorkloadType::VectorSearch);
}
#[test]
fn test_ucb_arm_selection() {
let classifier = WorkloadClassifier::new();
for i in 0..NUM_ARMS {
let arm = classifier.select_arm();
classifier.arms[arm].record_reward(if arm % 2 == 0 { 0.8 } else { 0.2 });
}
let selected: Vec<_> = (0..20).map(|_| classifier.select_arm()).collect();
let even_count = selected.iter().filter(|&&a| a % 2 == 0).count();
assert!(even_count > 10);
}
#[test]
fn test_config_update() {
let classifier = WorkloadClassifier::new();
let config1 = classifier.current_config();
for _ in 0..100 {
classifier.record_insert();
}
classifier.start_reward_measurement();
thread::sleep(Duration::from_millis(10));
classifier.update_config();
let config2 = classifier.current_config();
assert!(config2.memtable_size > 0);
}
#[test]
fn test_arm_configs() {
for i in 0..NUM_ARMS {
let config = get_arm_config(i);
assert!(config.memtable_size > 0);
assert!(config.batch_size > 0);
assert!(config.prefetch_distance > 0);
}
}
#[test]
fn test_stats() {
let classifier = WorkloadClassifier::new();
for _ in 0..50 {
classifier.record_insert();
classifier.record_point_read();
}
let stats = classifier.stats();
assert_eq!(stats.arm_stats.len(), NUM_ARMS);
assert!(stats.features.ops_per_second > 0.0);
}
}