1use std::sync::atomic::{AtomicU64, Ordering};
42use std::sync::RwLock;
43use std::time::Instant;
44
45const NUM_ARMS: usize = 8;
47
48const UCB_C: f64 = 1.41421356; #[allow(dead_code)]
53const FEATURE_WINDOW_SIZE: usize = 1000;
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum WorkloadType {
62 Oltp,
64 Olap,
66 Mixed,
68 VectorSearch,
70 Unknown,
72}
73
74impl WorkloadType {
75 pub fn as_str(&self) -> &'static str {
77 match self {
78 Self::Oltp => "OLTP",
79 Self::Olap => "OLAP",
80 Self::Mixed => "Mixed",
81 Self::VectorSearch => "VectorSearch",
82 Self::Unknown => "Unknown",
83 }
84 }
85}
86
87#[derive(Default)]
93struct OperationCounters {
94 point_reads: AtomicU64,
96 range_scans: AtomicU64,
98 inserts: AtomicU64,
100 updates: AtomicU64,
102 deletes: AtomicU64,
104 vector_searches: AtomicU64,
106}
107
108#[derive(Debug, Clone, Default)]
110pub struct FeatureVector {
111 pub read_fraction: f64,
113 pub write_fraction: f64,
115 pub scan_fraction: f64,
117 pub vector_fraction: f64,
119 pub avg_latency_ms: f64,
121 pub ops_per_second: f64,
123 pub key_locality: f64,
125}
126
127impl FeatureVector {
128 pub fn classify(&self) -> WorkloadType {
130 if self.ops_per_second < 1.0 {
131 return WorkloadType::Unknown;
132 }
133
134 if self.vector_fraction > 0.3 {
135 return WorkloadType::VectorSearch;
136 }
137
138 if self.write_fraction > 0.7 {
139 return WorkloadType::Oltp;
140 }
141
142 if self.scan_fraction > 0.3 {
143 return WorkloadType::Olap;
144 }
145
146 if self.read_fraction > 0.7 {
147 return WorkloadType::Olap;
148 }
149
150 WorkloadType::Mixed
151 }
152}
153
154#[derive(Debug, Clone)]
160pub struct TuningConfig {
161 pub memtable_size: usize,
163 pub write_buffer_count: usize,
165 pub batch_size: usize,
167 pub prefetch_distance: usize,
169 pub flush_interval_ms: u64,
171 pub compaction_priority: CompactionPriority,
173 pub cache_ratio: f64,
175 pub hnsw_ef_search: usize,
177}
178
179impl Default for TuningConfig {
180 fn default() -> Self {
181 Self {
182 memtable_size: 64 * 1024 * 1024, write_buffer_count: 2,
184 batch_size: 256,
185 prefetch_distance: 4,
186 flush_interval_ms: 1000,
187 compaction_priority: CompactionPriority::Balanced,
188 cache_ratio: 0.5,
189 hnsw_ef_search: 100,
190 }
191 }
192}
193
194#[derive(Debug, Clone, Copy, PartialEq, Eq)]
196pub enum CompactionPriority {
197 WriteOptimized,
199 ReadOptimized,
201 Balanced,
203}
204
205fn get_arm_config(arm: usize) -> TuningConfig {
207 match arm {
208 0 => TuningConfig {
209 memtable_size: 32 * 1024 * 1024,
211 write_buffer_count: 4,
212 batch_size: 64,
213 prefetch_distance: 2,
214 flush_interval_ms: 500,
215 compaction_priority: CompactionPriority::WriteOptimized,
216 cache_ratio: 0.3,
217 hnsw_ef_search: 50,
218 },
219 1 => TuningConfig {
220 memtable_size: 256 * 1024 * 1024,
222 write_buffer_count: 2,
223 batch_size: 1024,
224 prefetch_distance: 16,
225 flush_interval_ms: 5000,
226 compaction_priority: CompactionPriority::ReadOptimized,
227 cache_ratio: 0.8,
228 hnsw_ef_search: 200,
229 },
230 2 => TuningConfig {
231 memtable_size: 128 * 1024 * 1024,
233 write_buffer_count: 2,
234 batch_size: 512,
235 prefetch_distance: 8,
236 flush_interval_ms: 2000,
237 compaction_priority: CompactionPriority::Balanced,
238 cache_ratio: 0.9,
239 hnsw_ef_search: 300,
240 },
241 3 => TuningConfig {
242 memtable_size: 64 * 1024 * 1024,
244 write_buffer_count: 3,
245 batch_size: 256,
246 prefetch_distance: 4,
247 flush_interval_ms: 1000,
248 compaction_priority: CompactionPriority::Balanced,
249 cache_ratio: 0.5,
250 hnsw_ef_search: 100,
251 },
252 4 => TuningConfig {
253 memtable_size: 128 * 1024 * 1024,
255 write_buffer_count: 6,
256 batch_size: 128,
257 prefetch_distance: 2,
258 flush_interval_ms: 300,
259 compaction_priority: CompactionPriority::WriteOptimized,
260 cache_ratio: 0.2,
261 hnsw_ef_search: 50,
262 },
263 5 => TuningConfig {
264 memtable_size: 32 * 1024 * 1024,
266 write_buffer_count: 2,
267 batch_size: 512,
268 prefetch_distance: 32,
269 flush_interval_ms: 3000,
270 compaction_priority: CompactionPriority::ReadOptimized,
271 cache_ratio: 0.95,
272 hnsw_ef_search: 150,
273 },
274 6 => TuningConfig {
275 memtable_size: 16 * 1024 * 1024,
277 write_buffer_count: 8,
278 batch_size: 32,
279 prefetch_distance: 1,
280 flush_interval_ms: 200,
281 compaction_priority: CompactionPriority::Balanced,
282 cache_ratio: 0.6,
283 hnsw_ef_search: 75,
284 },
285 7 => TuningConfig {
286 memtable_size: 512 * 1024 * 1024,
288 write_buffer_count: 2,
289 batch_size: 2048,
290 prefetch_distance: 64,
291 flush_interval_ms: 10000,
292 compaction_priority: CompactionPriority::WriteOptimized,
293 cache_ratio: 0.4,
294 hnsw_ef_search: 100,
295 },
296 _ => TuningConfig::default(),
297 }
298}
299
300struct UcbArm {
306 count: AtomicU64,
308 total_reward: RwLock<f64>,
310 sum_squared_reward: RwLock<f64>,
312}
313
314impl UcbArm {
315 fn new() -> Self {
316 Self {
317 count: AtomicU64::new(0),
318 total_reward: RwLock::new(0.0),
319 sum_squared_reward: RwLock::new(0.0),
320 }
321 }
322
323 fn avg_reward(&self) -> f64 {
325 let count = self.count.load(Ordering::Relaxed);
326 if count == 0 {
327 return 0.0;
328 }
329 *self.total_reward.read().unwrap() / count as f64
330 }
331
332 fn record_reward(&self, reward: f64) {
334 self.count.fetch_add(1, Ordering::Relaxed);
335 *self.total_reward.write().unwrap() += reward;
336 *self.sum_squared_reward.write().unwrap() += reward * reward;
337 }
338
339 fn ucb(&self, total_count: u64) -> f64 {
341 let count = self.count.load(Ordering::Relaxed);
342 if count == 0 {
343 return f64::MAX; }
345
346 let avg = self.avg_reward();
347 let exploration = UCB_C * ((total_count as f64).ln() / count as f64).sqrt();
348
349 avg + exploration
350 }
351}
352
353pub struct WorkloadClassifier {
359 counters: OperationCounters,
361 arms: [UcbArm; NUM_ARMS],
363 current_arm: RwLock<usize>,
365 current_config: RwLock<TuningConfig>,
367 start_time: Instant,
369 #[allow(dead_code)]
371 last_feature_time: RwLock<Instant>,
372 #[allow(dead_code)]
374 cached_features: RwLock<FeatureVector>,
375 reward_start: RwLock<Option<Instant>>,
377 ops_at_reward_start: AtomicU64,
379}
380
381impl WorkloadClassifier {
382 pub fn new() -> Self {
384 Self {
385 counters: OperationCounters::default(),
386 arms: std::array::from_fn(|_| UcbArm::new()),
387 current_arm: RwLock::new(3), current_config: RwLock::new(get_arm_config(3)),
389 start_time: Instant::now(),
390 last_feature_time: RwLock::new(Instant::now()),
391 cached_features: RwLock::new(FeatureVector::default()),
392 reward_start: RwLock::new(None),
393 ops_at_reward_start: AtomicU64::new(0),
394 }
395 }
396
397 #[inline]
399 pub fn record_point_read(&self) {
400 self.counters.point_reads.fetch_add(1, Ordering::Relaxed);
401 }
402
403 #[inline]
405 pub fn record_range_scan(&self) {
406 self.counters.range_scans.fetch_add(1, Ordering::Relaxed);
407 }
408
409 #[inline]
411 pub fn record_insert(&self) {
412 self.counters.inserts.fetch_add(1, Ordering::Relaxed);
413 }
414
415 #[inline]
417 pub fn record_update(&self) {
418 self.counters.updates.fetch_add(1, Ordering::Relaxed);
419 }
420
421 #[inline]
423 pub fn record_delete(&self) {
424 self.counters.deletes.fetch_add(1, Ordering::Relaxed);
425 }
426
427 #[inline]
429 pub fn record_vector_search(&self) {
430 self.counters.vector_searches.fetch_add(1, Ordering::Relaxed);
431 }
432
433 fn total_ops(&self) -> u64 {
435 self.counters.point_reads.load(Ordering::Relaxed)
436 + self.counters.range_scans.load(Ordering::Relaxed)
437 + self.counters.inserts.load(Ordering::Relaxed)
438 + self.counters.updates.load(Ordering::Relaxed)
439 + self.counters.deletes.load(Ordering::Relaxed)
440 + self.counters.vector_searches.load(Ordering::Relaxed)
441 }
442
443 pub fn extract_features(&self) -> FeatureVector {
445 let reads = self.counters.point_reads.load(Ordering::Relaxed);
446 let scans = self.counters.range_scans.load(Ordering::Relaxed);
447 let inserts = self.counters.inserts.load(Ordering::Relaxed);
448 let updates = self.counters.updates.load(Ordering::Relaxed);
449 let deletes = self.counters.deletes.load(Ordering::Relaxed);
450 let vectors = self.counters.vector_searches.load(Ordering::Relaxed);
451
452 let total = reads + scans + inserts + updates + deletes + vectors;
453 let total_f = total.max(1) as f64;
454
455 let elapsed = self.start_time.elapsed().as_secs_f64().max(0.001);
456
457 FeatureVector {
458 read_fraction: (reads + scans) as f64 / total_f,
459 write_fraction: (inserts + updates + deletes) as f64 / total_f,
460 scan_fraction: scans as f64 / total_f,
461 vector_fraction: vectors as f64 / total_f,
462 avg_latency_ms: 1.0, ops_per_second: total as f64 / elapsed,
464 key_locality: 0.5, }
466 }
467
468 pub fn workload_type(&self) -> WorkloadType {
470 self.extract_features().classify()
471 }
472
473 pub fn current_config(&self) -> TuningConfig {
475 self.current_config.read().unwrap().clone()
476 }
477
478 pub fn start_reward_measurement(&self) {
480 *self.reward_start.write().unwrap() = Some(Instant::now());
481 self.ops_at_reward_start.store(self.total_ops(), Ordering::Relaxed);
482 }
483
484 pub fn end_reward_measurement(&self) {
486 let start = match *self.reward_start.read().unwrap() {
487 Some(t) => t,
488 None => return,
489 };
490
491 let elapsed = start.elapsed().as_secs_f64();
492 if elapsed < 0.001 {
493 return;
494 }
495
496 let ops_start = self.ops_at_reward_start.load(Ordering::Relaxed);
497 let ops_now = self.total_ops();
498 let throughput = (ops_now - ops_start) as f64 / elapsed;
499
500 let reward = (throughput / 100000.0).min(1.0);
502
503 let arm_idx = *self.current_arm.read().unwrap();
505 self.arms[arm_idx].record_reward(reward);
506
507 *self.reward_start.write().unwrap() = None;
508 }
509
510 pub fn select_arm(&self) -> usize {
512 let total_count: u64 = self.arms.iter()
513 .map(|a| a.count.load(Ordering::Relaxed))
514 .sum();
515
516 if total_count < NUM_ARMS as u64 {
517 return total_count as usize;
519 }
520
521 self.arms.iter()
523 .enumerate()
524 .max_by(|(_, a), (_, b)| {
525 a.ucb(total_count)
526 .partial_cmp(&b.ucb(total_count))
527 .unwrap_or(std::cmp::Ordering::Equal)
528 })
529 .map(|(i, _)| i)
530 .unwrap_or(0)
531 }
532
533 pub fn update_config(&self) {
535 self.end_reward_measurement();
537
538 let new_arm = self.select_arm();
540 let new_config = get_arm_config(new_arm);
541
542 *self.current_arm.write().unwrap() = new_arm;
543 *self.current_config.write().unwrap() = new_config;
544
545 self.start_reward_measurement();
547 }
548
549 pub fn stats(&self) -> ClassifierStats {
551 let features = self.extract_features();
552 let arm_stats: Vec<_> = self.arms.iter()
553 .enumerate()
554 .map(|(i, arm)| ArmStats {
555 arm_id: i,
556 count: arm.count.load(Ordering::Relaxed),
557 avg_reward: arm.avg_reward(),
558 })
559 .collect();
560
561 ClassifierStats {
562 workload_type: features.classify(),
563 features,
564 current_arm: *self.current_arm.read().unwrap(),
565 arm_stats,
566 }
567 }
568}
569
570impl Default for WorkloadClassifier {
571 fn default() -> Self {
572 Self::new()
573 }
574}
575
576#[derive(Debug, Clone)]
578pub struct ArmStats {
579 pub arm_id: usize,
581 pub count: u64,
583 pub avg_reward: f64,
585}
586
587#[derive(Debug, Clone)]
589pub struct ClassifierStats {
590 pub workload_type: WorkloadType,
592 pub features: FeatureVector,
594 pub current_arm: usize,
596 pub arm_stats: Vec<ArmStats>,
598}
599
600#[cfg(test)]
601mod tests {
602 use super::*;
603 use std::thread;
604 use std::time::Duration;
605
606 #[test]
607 fn test_feature_extraction() {
608 let classifier = WorkloadClassifier::new();
609
610 for _ in 0..100 {
612 classifier.record_insert();
613 classifier.record_update();
614 }
615 for _ in 0..50 {
616 classifier.record_point_read();
617 }
618
619 let features = classifier.extract_features();
620 assert!(features.write_fraction > 0.5);
621
622 let workload = features.classify();
623 assert_eq!(workload, WorkloadType::Oltp);
624 }
625
626 #[test]
627 fn test_olap_detection() {
628 let classifier = WorkloadClassifier::new();
629
630 for _ in 0..100 {
632 classifier.record_range_scan();
633 classifier.record_point_read();
634 }
635 for _ in 0..10 {
636 classifier.record_insert();
637 }
638
639 let features = classifier.extract_features();
640 let workload = features.classify();
641 assert_eq!(workload, WorkloadType::Olap);
642 }
643
644 #[test]
645 fn test_vector_search_detection() {
646 let classifier = WorkloadClassifier::new();
647
648 for _ in 0..100 {
649 classifier.record_vector_search();
650 }
651 for _ in 0..50 {
652 classifier.record_point_read();
653 }
654
655 let features = classifier.extract_features();
656 let workload = features.classify();
657 assert_eq!(workload, WorkloadType::VectorSearch);
658 }
659
660 #[test]
661 fn test_ucb_arm_selection() {
662 let classifier = WorkloadClassifier::new();
663
664 for i in 0..NUM_ARMS {
666 let arm = classifier.select_arm();
667 classifier.arms[arm].record_reward(if arm % 2 == 0 { 0.8 } else { 0.2 });
669 }
670
671 let selected: Vec<_> = (0..20).map(|_| classifier.select_arm()).collect();
673 let even_count = selected.iter().filter(|&&a| a % 2 == 0).count();
674
675 assert!(even_count > 10);
677 }
678
679 #[test]
680 fn test_config_update() {
681 let classifier = WorkloadClassifier::new();
682
683 let config1 = classifier.current_config();
684
685 for _ in 0..100 {
687 classifier.record_insert();
688 }
689
690 classifier.start_reward_measurement();
691 thread::sleep(Duration::from_millis(10));
692 classifier.update_config();
693
694 let config2 = classifier.current_config();
696 assert!(config2.memtable_size > 0);
697 }
698
699 #[test]
700 fn test_arm_configs() {
701 for i in 0..NUM_ARMS {
702 let config = get_arm_config(i);
703 assert!(config.memtable_size > 0);
704 assert!(config.batch_size > 0);
705 assert!(config.prefetch_distance > 0);
706 }
707 }
708
709 #[test]
710 fn test_stats() {
711 let classifier = WorkloadClassifier::new();
712
713 for _ in 0..50 {
714 classifier.record_insert();
715 classifier.record_point_read();
716 }
717
718 let stats = classifier.stats();
719 assert_eq!(stats.arm_stats.len(), NUM_ARMS);
720 assert!(stats.features.ops_per_second > 0.0);
721 }
722}