1use std::sync::Arc;
62use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
63use std::time::{Duration, Instant};
64
65#[derive(Debug, Clone)]
71pub struct QueryBudget {
72 pub query_class: String,
74
75 pub ram_bytes_limit: u64,
78
79 pub ssd_random_reads_limit: u32,
82
83 pub ssd_sequential_bytes_limit: u64,
86
87 pub cpu_cycles_limit: u64,
90
91 pub latency_target: Duration,
93
94 pub recall_target: f32,
96
97 pub recall_confidence: f32,
99}
100
101impl QueryBudget {
102 pub fn new(query_class: &str) -> Self {
104 Self {
105 query_class: query_class.to_string(),
106 ram_bytes_limit: u64::MAX,
107 ssd_random_reads_limit: u32::MAX,
108 ssd_sequential_bytes_limit: u64::MAX,
109 cpu_cycles_limit: u64::MAX,
110 latency_target: Duration::from_millis(100),
111 recall_target: 0.95,
112 recall_confidence: 0.99,
113 }
114 }
115
116 pub fn ram_bytes(mut self, limit: u64) -> Self {
118 self.ram_bytes_limit = limit;
119 self
120 }
121
122 pub fn ssd_random_reads(mut self, limit: u32) -> Self {
124 self.ssd_random_reads_limit = limit;
125 self
126 }
127
128 pub fn ssd_sequential_bytes(mut self, limit: u64) -> Self {
130 self.ssd_sequential_bytes_limit = limit;
131 self
132 }
133
134 pub fn cpu_cycles(mut self, limit: u64) -> Self {
136 self.cpu_cycles_limit = limit;
137 self
138 }
139
140 pub fn latency_target(mut self, target: Duration) -> Self {
142 self.latency_target = target;
143 self
144 }
145
146 pub fn recall_target(mut self, target: f32, confidence: f32) -> Self {
148 self.recall_target = target;
149 self.recall_confidence = confidence;
150 self
151 }
152
153 pub fn from_sla(
159 query_class: &str,
160 latency_target: Duration,
161 recall_target: f32,
162 hardware: &HardwareProfile,
163 ) -> Self {
164 let t_ns = latency_target.as_nanos() as u64;
165
166 let ram_bytes = (hardware.ram_bandwidth_gbps as u64 * t_ns / 2) / 1_000_000_000;
169
170 let ssd_random = (t_ns / hardware.ssd_random_latency_ns) as u32;
173
174 let ssd_seq = (hardware.ssd_seq_bandwidth_mbps as u64 * t_ns) / 1_000_000_000;
176
177 let cpu_cycles = t_ns * hardware.cpu_freq_ghz as u64;
179
180 Self {
181 query_class: query_class.to_string(),
182 ram_bytes_limit: ram_bytes,
183 ssd_random_reads_limit: ssd_random,
184 ssd_sequential_bytes_limit: ssd_seq,
185 cpu_cycles_limit: cpu_cycles,
186 latency_target,
187 recall_target,
188 recall_confidence: 0.99,
189 }
190 }
191
192 pub fn low_latency() -> Self {
194 Self::new("low_latency")
195 .ram_bytes(4 * 1024 * 1024) .ssd_random_reads(0) .ssd_sequential_bytes(0) .cpu_cycles(500_000_000) .latency_target(Duration::from_millis(5))
200 .recall_target(0.80, 0.95)
201 }
202
203 pub fn balanced() -> Self {
205 Self::new("balanced")
206 .ram_bytes(16 * 1024 * 1024) .ssd_random_reads(0) .ssd_sequential_bytes(2 * 1024 * 1024) .cpu_cycles(2_000_000_000) .latency_target(Duration::from_millis(20))
211 .recall_target(0.90, 0.99)
212 }
213
214 pub fn high_recall() -> Self {
216 Self::new("high_recall")
217 .ram_bytes(64 * 1024 * 1024) .ssd_random_reads(16) .ssd_sequential_bytes(8 * 1024 * 1024) .cpu_cycles(10_000_000_000) .latency_target(Duration::from_millis(100))
222 .recall_target(0.99, 0.999)
223 }
224}
225
226#[derive(Debug, Clone)]
232pub struct HardwareProfile {
233 pub ram_bandwidth_gbps: f32,
235
236 pub ssd_random_latency_ns: u64,
238
239 pub ssd_seq_bandwidth_mbps: u32,
241
242 pub cpu_freq_ghz: f32,
244
245 pub llc_size_bytes: usize,
247}
248
249impl Default for HardwareProfile {
250 fn default() -> Self {
251 Self {
252 ram_bandwidth_gbps: 50.0, ssd_random_latency_ns: 100_000, ssd_seq_bandwidth_mbps: 3000, cpu_freq_ghz: 3.5, llc_size_bytes: 32 * 1024 * 1024, }
258 }
259}
260
261impl HardwareProfile {
262 pub fn high_end_server() -> Self {
264 Self {
265 ram_bandwidth_gbps: 100.0,
266 ssd_random_latency_ns: 50_000,
267 ssd_seq_bandwidth_mbps: 5000,
268 cpu_freq_ghz: 3.8,
269 llc_size_bytes: 48 * 1024 * 1024,
270 }
271 }
272
273 pub fn standard_server() -> Self {
275 Self::default()
276 }
277
278 pub fn embedded() -> Self {
280 Self {
281 ram_bandwidth_gbps: 25.0,
282 ssd_random_latency_ns: 200_000,
283 ssd_seq_bandwidth_mbps: 500,
284 cpu_freq_ghz: 2.0,
285 llc_size_bytes: 8 * 1024 * 1024,
286 }
287 }
288}
289
290#[derive(Debug)]
296pub struct CostTracker {
297 budget: QueryBudget,
299
300 ram_bytes: AtomicU64,
302
303 ssd_random_reads: AtomicU64,
305
306 ssd_sequential_bytes: AtomicU64,
308
309 cpu_cycles: AtomicU64,
311
312 start_time: Instant,
314
315 exhausted: std::sync::atomic::AtomicBool,
317
318 exhaustion_reason: parking_lot::Mutex<Option<BudgetExhaustionReason>>,
320}
321
322#[derive(Debug, Clone, Copy, PartialEq, Eq)]
324pub enum BudgetExhaustionReason {
325 RamBytesExceeded,
326 SsdRandomReadsExceeded,
327 SsdSequentialBytesExceeded,
328 CpuCyclesExceeded,
329 LatencyTargetExceeded,
330}
331
332impl CostTracker {
333 pub fn new(budget: QueryBudget) -> Self {
335 Self {
336 budget,
337 ram_bytes: AtomicU64::new(0),
338 ssd_random_reads: AtomicU64::new(0),
339 ssd_sequential_bytes: AtomicU64::new(0),
340 cpu_cycles: AtomicU64::new(0),
341 start_time: Instant::now(),
342 exhausted: std::sync::atomic::AtomicBool::new(false),
343 exhaustion_reason: parking_lot::Mutex::new(None),
344 }
345 }
346
347 #[inline]
349 pub fn add_ram_bytes(&self, bytes: u64) -> bool {
350 let new_total = self.ram_bytes.fetch_add(bytes, Ordering::Relaxed) + bytes;
351 if new_total > self.budget.ram_bytes_limit {
352 self.mark_exhausted(BudgetExhaustionReason::RamBytesExceeded);
353 false
354 } else {
355 true
356 }
357 }
358
359 #[inline]
361 pub fn add_ssd_random_read(&self) -> bool {
362 let new_total = self.ssd_random_reads.fetch_add(1, Ordering::Relaxed) + 1;
363 if new_total > self.budget.ssd_random_reads_limit as u64 {
364 self.mark_exhausted(BudgetExhaustionReason::SsdRandomReadsExceeded);
365 false
366 } else {
367 true
368 }
369 }
370
371 #[inline]
373 pub fn add_ssd_sequential_bytes(&self, bytes: u64) -> bool {
374 let new_total = self
375 .ssd_sequential_bytes
376 .fetch_add(bytes, Ordering::Relaxed)
377 + bytes;
378 if new_total > self.budget.ssd_sequential_bytes_limit {
379 self.mark_exhausted(BudgetExhaustionReason::SsdSequentialBytesExceeded);
380 false
381 } else {
382 true
383 }
384 }
385
386 #[inline]
388 pub fn add_cpu_cycles(&self, cycles: u64) -> bool {
389 let new_total = self.cpu_cycles.fetch_add(cycles, Ordering::Relaxed) + cycles;
390 if new_total > self.budget.cpu_cycles_limit {
391 self.mark_exhausted(BudgetExhaustionReason::CpuCyclesExceeded);
392 false
393 } else {
394 true
395 }
396 }
397
398 #[inline]
400 pub fn check_latency(&self) -> bool {
401 if self.start_time.elapsed() > self.budget.latency_target {
402 self.mark_exhausted(BudgetExhaustionReason::LatencyTargetExceeded);
403 false
404 } else {
405 true
406 }
407 }
408
409 fn mark_exhausted(&self, reason: BudgetExhaustionReason) {
411 self.exhausted.store(true, Ordering::Release);
412 let mut guard = self.exhaustion_reason.lock();
413 if guard.is_none() {
414 *guard = Some(reason);
415 }
416 }
417
418 #[inline]
420 pub fn is_exhausted(&self) -> bool {
421 if self.start_time.elapsed() > self.budget.latency_target {
423 self.mark_exhausted(BudgetExhaustionReason::LatencyTargetExceeded);
424 }
425 self.exhausted.load(Ordering::Acquire)
426 }
427
428 pub fn exhaustion_reason(&self) -> Option<BudgetExhaustionReason> {
430 *self.exhaustion_reason.lock()
431 }
432
433 pub fn remaining_ram_bytes(&self) -> u64 {
435 self.budget
436 .ram_bytes_limit
437 .saturating_sub(self.ram_bytes.load(Ordering::Relaxed))
438 }
439
440 pub fn remaining_ssd_random_reads(&self) -> u32 {
442 self.budget
443 .ssd_random_reads_limit
444 .saturating_sub(self.ssd_random_reads.load(Ordering::Relaxed) as u32)
445 }
446
447 pub fn remaining_time(&self) -> Duration {
449 self.budget
450 .latency_target
451 .saturating_sub(self.start_time.elapsed())
452 }
453
454 pub fn utilization(&self) -> CostUtilization {
456 CostUtilization {
457 ram_bytes_ratio: self.ram_bytes.load(Ordering::Relaxed) as f64
458 / self.budget.ram_bytes_limit.max(1) as f64,
459 ssd_random_reads_ratio: self.ssd_random_reads.load(Ordering::Relaxed) as f64
460 / self.budget.ssd_random_reads_limit.max(1) as f64,
461 ssd_sequential_bytes_ratio: self.ssd_sequential_bytes.load(Ordering::Relaxed) as f64
462 / self.budget.ssd_sequential_bytes_limit.max(1) as f64,
463 cpu_cycles_ratio: self.cpu_cycles.load(Ordering::Relaxed) as f64
464 / self.budget.cpu_cycles_limit.max(1) as f64,
465 latency_ratio: self.start_time.elapsed().as_nanos() as f64
466 / self.budget.latency_target.as_nanos().max(1) as f64,
467 }
468 }
469
470 pub fn summary(&self) -> CostSummary {
472 CostSummary {
473 query_class: self.budget.query_class.clone(),
474 ram_bytes_used: self.ram_bytes.load(Ordering::Relaxed),
475 ram_bytes_limit: self.budget.ram_bytes_limit,
476 ssd_random_reads_used: self.ssd_random_reads.load(Ordering::Relaxed) as u32,
477 ssd_random_reads_limit: self.budget.ssd_random_reads_limit,
478 ssd_sequential_bytes_used: self.ssd_sequential_bytes.load(Ordering::Relaxed),
479 ssd_sequential_bytes_limit: self.budget.ssd_sequential_bytes_limit,
480 cpu_cycles_used: self.cpu_cycles.load(Ordering::Relaxed),
481 cpu_cycles_limit: self.budget.cpu_cycles_limit,
482 elapsed: self.start_time.elapsed(),
483 latency_target: self.budget.latency_target,
484 exhausted: self.is_exhausted(),
485 exhaustion_reason: self.exhaustion_reason(),
486 }
487 }
488}
489
490#[derive(Debug, Clone)]
492pub struct CostUtilization {
493 pub ram_bytes_ratio: f64,
494 pub ssd_random_reads_ratio: f64,
495 pub ssd_sequential_bytes_ratio: f64,
496 pub cpu_cycles_ratio: f64,
497 pub latency_ratio: f64,
498}
499
500#[derive(Debug, Clone)]
502pub struct CostSummary {
503 pub query_class: String,
504 pub ram_bytes_used: u64,
505 pub ram_bytes_limit: u64,
506 pub ssd_random_reads_used: u32,
507 pub ssd_random_reads_limit: u32,
508 pub ssd_sequential_bytes_used: u64,
509 pub ssd_sequential_bytes_limit: u64,
510 pub cpu_cycles_used: u64,
511 pub cpu_cycles_limit: u64,
512 pub elapsed: Duration,
513 pub latency_target: Duration,
514 pub exhausted: bool,
515 pub exhaustion_reason: Option<BudgetExhaustionReason>,
516}
517
518pub struct AdmissionController {
527 max_concurrent_per_class: parking_lot::RwLock<std::collections::HashMap<String, usize>>,
529
530 active_per_class: parking_lot::RwLock<std::collections::HashMap<String, AtomicUsize>>,
532
533 global_memory_pressure: AtomicU64,
535
536 max_global_memory: u64,
538
539 backpressure_wait: Duration,
541}
542
543pub struct AdmissionTicket {
545 query_class: String,
546 estimated_memory: u64,
547 controller: Arc<AdmissionController>,
548}
549
550impl Drop for AdmissionTicket {
551 fn drop(&mut self) {
552 self.controller
553 .release(&self.query_class, self.estimated_memory);
554 }
555}
556
557impl AdmissionController {
558 pub fn new(max_global_memory: u64) -> Arc<Self> {
560 Arc::new(Self {
561 max_concurrent_per_class: parking_lot::RwLock::new(std::collections::HashMap::new()),
562 active_per_class: parking_lot::RwLock::new(std::collections::HashMap::new()),
563 global_memory_pressure: AtomicU64::new(0),
564 max_global_memory,
565 backpressure_wait: Duration::from_millis(10),
566 })
567 }
568
569 pub fn set_class_limit(self: &Arc<Self>, query_class: &str, max_concurrent: usize) {
571 self.max_concurrent_per_class
572 .write()
573 .insert(query_class.to_string(), max_concurrent);
574 }
575
576 pub fn try_admit(self: &Arc<Self>, budget: &QueryBudget) -> Option<AdmissionTicket> {
580 let class_limits = self.max_concurrent_per_class.read();
582 if let Some(&limit) = class_limits.get(&budget.query_class) {
583 let mut active = self.active_per_class.write();
584 let counter = active
585 .entry(budget.query_class.clone())
586 .or_insert_with(|| AtomicUsize::new(0));
587
588 let current = counter.load(Ordering::Acquire);
589 if current >= limit {
590 return None;
591 }
592 counter.fetch_add(1, Ordering::AcqRel);
593 }
594 drop(class_limits);
595
596 let estimated_memory = budget.ram_bytes_limit / 2; let current = self
599 .global_memory_pressure
600 .fetch_add(estimated_memory, Ordering::AcqRel);
601
602 if current + estimated_memory > self.max_global_memory {
603 self.global_memory_pressure
605 .fetch_sub(estimated_memory, Ordering::AcqRel);
606 self.release_class_counter(&budget.query_class);
607 return None;
608 }
609
610 Some(AdmissionTicket {
611 query_class: budget.query_class.clone(),
612 estimated_memory,
613 controller: Arc::clone(self),
614 })
615 }
616
617 pub fn admit_with_backpressure(
619 self: &Arc<Self>,
620 budget: &QueryBudget,
621 max_wait: Duration,
622 ) -> Option<AdmissionTicket> {
623 let deadline = Instant::now() + max_wait;
624
625 loop {
626 if let Some(ticket) = self.try_admit(budget) {
627 return Some(ticket);
628 }
629
630 if Instant::now() >= deadline {
631 return None;
632 }
633
634 std::thread::sleep(self.backpressure_wait);
635 }
636 }
637
638 fn release(&self, query_class: &str, estimated_memory: u64) {
640 self.global_memory_pressure
641 .fetch_sub(estimated_memory, Ordering::AcqRel);
642 self.release_class_counter(query_class);
643 }
644
645 fn release_class_counter(&self, query_class: &str) {
646 let active = self.active_per_class.read();
647 if let Some(counter) = active.get(query_class) {
648 counter.fetch_sub(1, Ordering::AcqRel);
649 }
650 }
651
652 pub fn metrics(&self) -> AdmissionMetrics {
654 let active = self.active_per_class.read();
655 let active_per_class: std::collections::HashMap<String, usize> = active
656 .iter()
657 .map(|(k, v)| (k.clone(), v.load(Ordering::Relaxed)))
658 .collect();
659
660 AdmissionMetrics {
661 global_memory_pressure: self.global_memory_pressure.load(Ordering::Relaxed),
662 max_global_memory: self.max_global_memory,
663 memory_utilization: self.global_memory_pressure.load(Ordering::Relaxed) as f64
664 / self.max_global_memory as f64,
665 active_per_class,
666 }
667 }
668}
669
670#[derive(Debug, Clone)]
672pub struct AdmissionMetrics {
673 pub global_memory_pressure: u64,
674 pub max_global_memory: u64,
675 pub memory_utilization: f64,
676 pub active_per_class: std::collections::HashMap<String, usize>,
677}
678
679pub struct QueryClassRegistry {
685 classes: parking_lot::RwLock<std::collections::HashMap<String, QueryBudget>>,
686 hardware: HardwareProfile,
687}
688
689impl QueryClassRegistry {
690 pub fn new(hardware: HardwareProfile) -> Self {
692 let mut classes = std::collections::HashMap::new();
693 classes.insert("low_latency".to_string(), QueryBudget::low_latency());
694 classes.insert("balanced".to_string(), QueryBudget::balanced());
695 classes.insert("high_recall".to_string(), QueryBudget::high_recall());
696
697 Self {
698 classes: parking_lot::RwLock::new(classes),
699 hardware,
700 }
701 }
702
703 pub fn register(&self, budget: QueryBudget) {
705 self.classes
706 .write()
707 .insert(budget.query_class.clone(), budget);
708 }
709
710 pub fn get(&self, query_class: &str) -> Option<QueryBudget> {
712 self.classes.read().get(query_class).cloned()
713 }
714
715 pub fn create_from_sla(
717 &self,
718 query_class: &str,
719 latency_target: Duration,
720 recall_target: f32,
721 ) -> QueryBudget {
722 QueryBudget::from_sla(query_class, latency_target, recall_target, &self.hardware)
723 }
724}
725
726impl Default for QueryClassRegistry {
727 fn default() -> Self {
728 Self::new(HardwareProfile::default())
729 }
730}
731
732#[cfg(test)]
733mod tests {
734 use super::*;
735
736 #[test]
737 fn test_budget_creation() {
738 let budget = QueryBudget::new("test")
739 .ram_bytes(1024 * 1024)
740 .ssd_random_reads(10)
741 .latency_target(Duration::from_millis(50));
742
743 assert_eq!(budget.query_class, "test");
744 assert_eq!(budget.ram_bytes_limit, 1024 * 1024);
745 assert_eq!(budget.ssd_random_reads_limit, 10);
746 }
747
748 #[test]
749 fn test_cost_tracker() {
750 let budget = QueryBudget::new("test").ram_bytes(1000).ssd_random_reads(2);
751
752 let tracker = CostTracker::new(budget);
753
754 assert!(tracker.add_ram_bytes(500));
755 assert!(!tracker.is_exhausted());
756
757 assert!(tracker.add_ram_bytes(400));
758 assert!(!tracker.is_exhausted());
759
760 assert!(!tracker.add_ram_bytes(200));
762 assert!(tracker.is_exhausted());
763 assert_eq!(
764 tracker.exhaustion_reason(),
765 Some(BudgetExhaustionReason::RamBytesExceeded)
766 );
767 }
768
769 #[test]
770 fn test_admission_controller() {
771 let controller = AdmissionController::new(64 * 1024 * 1024);
776 controller.set_class_limit("low_latency", 2);
777
778 let budget = QueryBudget::low_latency();
779
780 let ticket1 = controller.try_admit(&budget);
781 assert!(ticket1.is_some());
782
783 let ticket2 = controller.try_admit(&budget);
784 assert!(ticket2.is_some());
785
786 let ticket3 = controller.try_admit(&budget);
788 assert!(ticket3.is_none());
789
790 drop(ticket1);
792
793 let ticket4 = controller.try_admit(&budget);
795 assert!(ticket4.is_some());
796 }
797
798 #[test]
799 fn test_budget_from_sla() {
800 let hardware = HardwareProfile::default();
801 let budget = QueryBudget::from_sla("custom", Duration::from_millis(50), 0.95, &hardware);
802
803 assert!(budget.ram_bytes_limit > 0);
804 assert!(budget.cpu_cycles_limit > 0);
805 }
806}