1use std::sync::atomic::{AtomicU64, Ordering};
45use std::sync::RwLock;
46use std::time::Instant;
47
48const NUM_ARMS: usize = 8;
50
51const UCB_C: f64 = 1.41421356; #[allow(dead_code)]
56const FEATURE_WINDOW_SIZE: usize = 1000;
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum WorkloadType {
65 Oltp,
67 Olap,
69 Mixed,
71 VectorSearch,
73 Unknown,
75}
76
77impl WorkloadType {
78 pub fn as_str(&self) -> &'static str {
80 match self {
81 Self::Oltp => "OLTP",
82 Self::Olap => "OLAP",
83 Self::Mixed => "Mixed",
84 Self::VectorSearch => "VectorSearch",
85 Self::Unknown => "Unknown",
86 }
87 }
88}
89
90#[derive(Default)]
96struct OperationCounters {
97 point_reads: AtomicU64,
99 range_scans: AtomicU64,
101 inserts: AtomicU64,
103 updates: AtomicU64,
105 deletes: AtomicU64,
107 vector_searches: AtomicU64,
109}
110
111#[derive(Debug, Clone, Default)]
113pub struct FeatureVector {
114 pub read_fraction: f64,
116 pub write_fraction: f64,
118 pub scan_fraction: f64,
120 pub vector_fraction: f64,
122 pub avg_latency_ms: f64,
124 pub ops_per_second: f64,
126 pub key_locality: f64,
128}
129
130impl FeatureVector {
131 pub fn classify(&self) -> WorkloadType {
133 if self.ops_per_second < 1.0 {
134 return WorkloadType::Unknown;
135 }
136
137 if self.vector_fraction > 0.3 {
138 return WorkloadType::VectorSearch;
139 }
140
141 if self.write_fraction > 0.7 {
142 return WorkloadType::Oltp;
143 }
144
145 if self.scan_fraction > 0.3 {
146 return WorkloadType::Olap;
147 }
148
149 if self.read_fraction > 0.7 {
150 return WorkloadType::Olap;
151 }
152
153 WorkloadType::Mixed
154 }
155}
156
157#[derive(Debug, Clone)]
163pub struct TuningConfig {
164 pub memtable_size: usize,
166 pub write_buffer_count: usize,
168 pub batch_size: usize,
170 pub prefetch_distance: usize,
172 pub flush_interval_ms: u64,
174 pub compaction_priority: CompactionPriority,
176 pub cache_ratio: f64,
178 pub hnsw_ef_search: usize,
180}
181
182impl Default for TuningConfig {
183 fn default() -> Self {
184 Self {
185 memtable_size: 64 * 1024 * 1024, write_buffer_count: 2,
187 batch_size: 256,
188 prefetch_distance: 4,
189 flush_interval_ms: 1000,
190 compaction_priority: CompactionPriority::Balanced,
191 cache_ratio: 0.5,
192 hnsw_ef_search: 100,
193 }
194 }
195}
196
197#[derive(Debug, Clone, Copy, PartialEq, Eq)]
199pub enum CompactionPriority {
200 WriteOptimized,
202 ReadOptimized,
204 Balanced,
206}
207
208fn get_arm_config(arm: usize) -> TuningConfig {
210 match arm {
211 0 => TuningConfig {
212 memtable_size: 32 * 1024 * 1024,
214 write_buffer_count: 4,
215 batch_size: 64,
216 prefetch_distance: 2,
217 flush_interval_ms: 500,
218 compaction_priority: CompactionPriority::WriteOptimized,
219 cache_ratio: 0.3,
220 hnsw_ef_search: 50,
221 },
222 1 => TuningConfig {
223 memtable_size: 256 * 1024 * 1024,
225 write_buffer_count: 2,
226 batch_size: 1024,
227 prefetch_distance: 16,
228 flush_interval_ms: 5000,
229 compaction_priority: CompactionPriority::ReadOptimized,
230 cache_ratio: 0.8,
231 hnsw_ef_search: 200,
232 },
233 2 => TuningConfig {
234 memtable_size: 128 * 1024 * 1024,
236 write_buffer_count: 2,
237 batch_size: 512,
238 prefetch_distance: 8,
239 flush_interval_ms: 2000,
240 compaction_priority: CompactionPriority::Balanced,
241 cache_ratio: 0.9,
242 hnsw_ef_search: 300,
243 },
244 3 => TuningConfig {
245 memtable_size: 64 * 1024 * 1024,
247 write_buffer_count: 3,
248 batch_size: 256,
249 prefetch_distance: 4,
250 flush_interval_ms: 1000,
251 compaction_priority: CompactionPriority::Balanced,
252 cache_ratio: 0.5,
253 hnsw_ef_search: 100,
254 },
255 4 => TuningConfig {
256 memtable_size: 128 * 1024 * 1024,
258 write_buffer_count: 6,
259 batch_size: 128,
260 prefetch_distance: 2,
261 flush_interval_ms: 300,
262 compaction_priority: CompactionPriority::WriteOptimized,
263 cache_ratio: 0.2,
264 hnsw_ef_search: 50,
265 },
266 5 => TuningConfig {
267 memtable_size: 32 * 1024 * 1024,
269 write_buffer_count: 2,
270 batch_size: 512,
271 prefetch_distance: 32,
272 flush_interval_ms: 3000,
273 compaction_priority: CompactionPriority::ReadOptimized,
274 cache_ratio: 0.95,
275 hnsw_ef_search: 150,
276 },
277 6 => TuningConfig {
278 memtable_size: 16 * 1024 * 1024,
280 write_buffer_count: 8,
281 batch_size: 32,
282 prefetch_distance: 1,
283 flush_interval_ms: 200,
284 compaction_priority: CompactionPriority::Balanced,
285 cache_ratio: 0.6,
286 hnsw_ef_search: 75,
287 },
288 7 => TuningConfig {
289 memtable_size: 512 * 1024 * 1024,
291 write_buffer_count: 2,
292 batch_size: 2048,
293 prefetch_distance: 64,
294 flush_interval_ms: 10000,
295 compaction_priority: CompactionPriority::WriteOptimized,
296 cache_ratio: 0.4,
297 hnsw_ef_search: 100,
298 },
299 _ => TuningConfig::default(),
300 }
301}
302
303struct UcbArm {
309 count: AtomicU64,
311 total_reward: RwLock<f64>,
313 sum_squared_reward: RwLock<f64>,
315}
316
317impl UcbArm {
318 fn new() -> Self {
319 Self {
320 count: AtomicU64::new(0),
321 total_reward: RwLock::new(0.0),
322 sum_squared_reward: RwLock::new(0.0),
323 }
324 }
325
326 fn avg_reward(&self) -> f64 {
328 let count = self.count.load(Ordering::Relaxed);
329 if count == 0 {
330 return 0.0;
331 }
332 *self.total_reward.read().unwrap() / count as f64
333 }
334
335 fn record_reward(&self, reward: f64) {
337 self.count.fetch_add(1, Ordering::Relaxed);
338 *self.total_reward.write().unwrap() += reward;
339 *self.sum_squared_reward.write().unwrap() += reward * reward;
340 }
341
342 fn ucb(&self, total_count: u64) -> f64 {
344 let count = self.count.load(Ordering::Relaxed);
345 if count == 0 {
346 return f64::MAX; }
348
349 let avg = self.avg_reward();
350 let exploration = UCB_C * ((total_count as f64).ln() / count as f64).sqrt();
351
352 avg + exploration
353 }
354}
355
356pub struct WorkloadClassifier {
362 counters: OperationCounters,
364 arms: [UcbArm; NUM_ARMS],
366 current_arm: RwLock<usize>,
368 current_config: RwLock<TuningConfig>,
370 start_time: Instant,
372 #[allow(dead_code)]
374 last_feature_time: RwLock<Instant>,
375 #[allow(dead_code)]
377 cached_features: RwLock<FeatureVector>,
378 reward_start: RwLock<Option<Instant>>,
380 ops_at_reward_start: AtomicU64,
382}
383
384impl WorkloadClassifier {
385 pub fn new() -> Self {
387 Self {
388 counters: OperationCounters::default(),
389 arms: std::array::from_fn(|_| UcbArm::new()),
390 current_arm: RwLock::new(3), current_config: RwLock::new(get_arm_config(3)),
392 start_time: Instant::now(),
393 last_feature_time: RwLock::new(Instant::now()),
394 cached_features: RwLock::new(FeatureVector::default()),
395 reward_start: RwLock::new(None),
396 ops_at_reward_start: AtomicU64::new(0),
397 }
398 }
399
400 #[inline]
402 pub fn record_point_read(&self) {
403 self.counters.point_reads.fetch_add(1, Ordering::Relaxed);
404 }
405
406 #[inline]
408 pub fn record_range_scan(&self) {
409 self.counters.range_scans.fetch_add(1, Ordering::Relaxed);
410 }
411
412 #[inline]
414 pub fn record_insert(&self) {
415 self.counters.inserts.fetch_add(1, Ordering::Relaxed);
416 }
417
418 #[inline]
420 pub fn record_update(&self) {
421 self.counters.updates.fetch_add(1, Ordering::Relaxed);
422 }
423
424 #[inline]
426 pub fn record_delete(&self) {
427 self.counters.deletes.fetch_add(1, Ordering::Relaxed);
428 }
429
430 #[inline]
432 pub fn record_vector_search(&self) {
433 self.counters.vector_searches.fetch_add(1, Ordering::Relaxed);
434 }
435
436 fn total_ops(&self) -> u64 {
438 self.counters.point_reads.load(Ordering::Relaxed)
439 + self.counters.range_scans.load(Ordering::Relaxed)
440 + self.counters.inserts.load(Ordering::Relaxed)
441 + self.counters.updates.load(Ordering::Relaxed)
442 + self.counters.deletes.load(Ordering::Relaxed)
443 + self.counters.vector_searches.load(Ordering::Relaxed)
444 }
445
446 pub fn extract_features(&self) -> FeatureVector {
448 let reads = self.counters.point_reads.load(Ordering::Relaxed);
449 let scans = self.counters.range_scans.load(Ordering::Relaxed);
450 let inserts = self.counters.inserts.load(Ordering::Relaxed);
451 let updates = self.counters.updates.load(Ordering::Relaxed);
452 let deletes = self.counters.deletes.load(Ordering::Relaxed);
453 let vectors = self.counters.vector_searches.load(Ordering::Relaxed);
454
455 let total = reads + scans + inserts + updates + deletes + vectors;
456 let total_f = total.max(1) as f64;
457
458 let elapsed = self.start_time.elapsed().as_secs_f64().max(0.001);
459
460 FeatureVector {
461 read_fraction: (reads + scans) as f64 / total_f,
462 write_fraction: (inserts + updates + deletes) as f64 / total_f,
463 scan_fraction: scans as f64 / total_f,
464 vector_fraction: vectors as f64 / total_f,
465 avg_latency_ms: 1.0, ops_per_second: total as f64 / elapsed,
467 key_locality: 0.5, }
469 }
470
471 pub fn workload_type(&self) -> WorkloadType {
473 self.extract_features().classify()
474 }
475
476 pub fn current_config(&self) -> TuningConfig {
478 self.current_config.read().unwrap().clone()
479 }
480
481 pub fn start_reward_measurement(&self) {
483 *self.reward_start.write().unwrap() = Some(Instant::now());
484 self.ops_at_reward_start.store(self.total_ops(), Ordering::Relaxed);
485 }
486
487 pub fn end_reward_measurement(&self) {
489 let start = match *self.reward_start.read().unwrap() {
490 Some(t) => t,
491 None => return,
492 };
493
494 let elapsed = start.elapsed().as_secs_f64();
495 if elapsed < 0.001 {
496 return;
497 }
498
499 let ops_start = self.ops_at_reward_start.load(Ordering::Relaxed);
500 let ops_now = self.total_ops();
501 let throughput = (ops_now - ops_start) as f64 / elapsed;
502
503 let reward = (throughput / 100000.0).min(1.0);
505
506 let arm_idx = *self.current_arm.read().unwrap();
508 self.arms[arm_idx].record_reward(reward);
509
510 *self.reward_start.write().unwrap() = None;
511 }
512
513 pub fn select_arm(&self) -> usize {
515 let total_count: u64 = self.arms.iter()
516 .map(|a| a.count.load(Ordering::Relaxed))
517 .sum();
518
519 if total_count < NUM_ARMS as u64 {
520 return total_count as usize;
522 }
523
524 self.arms.iter()
526 .enumerate()
527 .max_by(|(_, a), (_, b)| {
528 a.ucb(total_count)
529 .partial_cmp(&b.ucb(total_count))
530 .unwrap_or(std::cmp::Ordering::Equal)
531 })
532 .map(|(i, _)| i)
533 .unwrap_or(0)
534 }
535
536 pub fn update_config(&self) {
538 self.end_reward_measurement();
540
541 let new_arm = self.select_arm();
543 let new_config = get_arm_config(new_arm);
544
545 *self.current_arm.write().unwrap() = new_arm;
546 *self.current_config.write().unwrap() = new_config;
547
548 self.start_reward_measurement();
550 }
551
552 pub fn stats(&self) -> ClassifierStats {
554 let features = self.extract_features();
555 let arm_stats: Vec<_> = self.arms.iter()
556 .enumerate()
557 .map(|(i, arm)| ArmStats {
558 arm_id: i,
559 count: arm.count.load(Ordering::Relaxed),
560 avg_reward: arm.avg_reward(),
561 })
562 .collect();
563
564 ClassifierStats {
565 workload_type: features.classify(),
566 features,
567 current_arm: *self.current_arm.read().unwrap(),
568 arm_stats,
569 }
570 }
571}
572
573impl Default for WorkloadClassifier {
574 fn default() -> Self {
575 Self::new()
576 }
577}
578
579#[derive(Debug, Clone)]
581pub struct ArmStats {
582 pub arm_id: usize,
584 pub count: u64,
586 pub avg_reward: f64,
588}
589
590#[derive(Debug, Clone)]
592pub struct ClassifierStats {
593 pub workload_type: WorkloadType,
595 pub features: FeatureVector,
597 pub current_arm: usize,
599 pub arm_stats: Vec<ArmStats>,
601}
602
603#[cfg(test)]
604mod tests {
605 use super::*;
606 use std::thread;
607 use std::time::Duration;
608
609 #[test]
610 fn test_feature_extraction() {
611 let classifier = WorkloadClassifier::new();
612
613 for _ in 0..100 {
615 classifier.record_insert();
616 classifier.record_update();
617 }
618 for _ in 0..50 {
619 classifier.record_point_read();
620 }
621
622 let features = classifier.extract_features();
623 assert!(features.write_fraction > 0.5);
624
625 let workload = features.classify();
626 assert_eq!(workload, WorkloadType::Oltp);
627 }
628
629 #[test]
630 fn test_olap_detection() {
631 let classifier = WorkloadClassifier::new();
632
633 for _ in 0..100 {
635 classifier.record_range_scan();
636 classifier.record_point_read();
637 }
638 for _ in 0..10 {
639 classifier.record_insert();
640 }
641
642 let features = classifier.extract_features();
643 let workload = features.classify();
644 assert_eq!(workload, WorkloadType::Olap);
645 }
646
647 #[test]
648 fn test_vector_search_detection() {
649 let classifier = WorkloadClassifier::new();
650
651 for _ in 0..100 {
652 classifier.record_vector_search();
653 }
654 for _ in 0..50 {
655 classifier.record_point_read();
656 }
657
658 let features = classifier.extract_features();
659 let workload = features.classify();
660 assert_eq!(workload, WorkloadType::VectorSearch);
661 }
662
663 #[test]
664 fn test_ucb_arm_selection() {
665 let classifier = WorkloadClassifier::new();
666
667 for i in 0..NUM_ARMS {
669 let arm = classifier.select_arm();
670 classifier.arms[arm].record_reward(if arm % 2 == 0 { 0.8 } else { 0.2 });
672 }
673
674 let selected: Vec<_> = (0..20).map(|_| classifier.select_arm()).collect();
676 let even_count = selected.iter().filter(|&&a| a % 2 == 0).count();
677
678 assert!(even_count > 10);
680 }
681
682 #[test]
683 fn test_config_update() {
684 let classifier = WorkloadClassifier::new();
685
686 let config1 = classifier.current_config();
687
688 for _ in 0..100 {
690 classifier.record_insert();
691 }
692
693 classifier.start_reward_measurement();
694 thread::sleep(Duration::from_millis(10));
695 classifier.update_config();
696
697 let config2 = classifier.current_config();
699 assert!(config2.memtable_size > 0);
700 }
701
702 #[test]
703 fn test_arm_configs() {
704 for i in 0..NUM_ARMS {
705 let config = get_arm_config(i);
706 assert!(config.memtable_size > 0);
707 assert!(config.batch_size > 0);
708 assert!(config.prefetch_distance > 0);
709 }
710 }
711
712 #[test]
713 fn test_stats() {
714 let classifier = WorkloadClassifier::new();
715
716 for _ in 0..50 {
717 classifier.record_insert();
718 classifier.record_point_read();
719 }
720
721 let stats = classifier.stats();
722 assert_eq!(stats.arm_stats.len(), NUM_ARMS);
723 assert!(stats.features.ops_per_second > 0.0);
724 }
725}