use std::sync::Arc;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub struct QueryBudget {
pub query_class: String,
pub ram_bytes_limit: u64,
pub ssd_random_reads_limit: u32,
pub ssd_sequential_bytes_limit: u64,
pub cpu_cycles_limit: u64,
pub latency_target: Duration,
pub recall_target: f32,
pub recall_confidence: f32,
}
impl QueryBudget {
pub fn new(query_class: &str) -> Self {
Self {
query_class: query_class.to_string(),
ram_bytes_limit: u64::MAX,
ssd_random_reads_limit: u32::MAX,
ssd_sequential_bytes_limit: u64::MAX,
cpu_cycles_limit: u64::MAX,
latency_target: Duration::from_millis(100),
recall_target: 0.95,
recall_confidence: 0.99,
}
}
pub fn ram_bytes(mut self, limit: u64) -> Self {
self.ram_bytes_limit = limit;
self
}
pub fn ssd_random_reads(mut self, limit: u32) -> Self {
self.ssd_random_reads_limit = limit;
self
}
pub fn ssd_sequential_bytes(mut self, limit: u64) -> Self {
self.ssd_sequential_bytes_limit = limit;
self
}
pub fn cpu_cycles(mut self, limit: u64) -> Self {
self.cpu_cycles_limit = limit;
self
}
pub fn latency_target(mut self, target: Duration) -> Self {
self.latency_target = target;
self
}
pub fn recall_target(mut self, target: f32, confidence: f32) -> Self {
self.recall_target = target;
self.recall_confidence = confidence;
self
}
pub fn from_sla(
query_class: &str,
latency_target: Duration,
recall_target: f32,
hardware: &HardwareProfile,
) -> Self {
let t_ns = latency_target.as_nanos() as u64;
let ram_bytes = (hardware.ram_bandwidth_gbps as u64 * t_ns / 2) / 1_000_000_000;
let ssd_random = (t_ns / hardware.ssd_random_latency_ns) as u32;
let ssd_seq = (hardware.ssd_seq_bandwidth_mbps as u64 * t_ns) / 1_000_000_000;
let cpu_cycles = t_ns * hardware.cpu_freq_ghz as u64;
Self {
query_class: query_class.to_string(),
ram_bytes_limit: ram_bytes,
ssd_random_reads_limit: ssd_random,
ssd_sequential_bytes_limit: ssd_seq,
cpu_cycles_limit: cpu_cycles,
latency_target,
recall_target,
recall_confidence: 0.99,
}
}
pub fn low_latency() -> Self {
Self::new("low_latency")
.ram_bytes(4 * 1024 * 1024) .ssd_random_reads(0) .ssd_sequential_bytes(0) .cpu_cycles(500_000_000) .latency_target(Duration::from_millis(5))
.recall_target(0.80, 0.95)
}
pub fn balanced() -> Self {
Self::new("balanced")
.ram_bytes(16 * 1024 * 1024) .ssd_random_reads(0) .ssd_sequential_bytes(2 * 1024 * 1024) .cpu_cycles(2_000_000_000) .latency_target(Duration::from_millis(20))
.recall_target(0.90, 0.99)
}
pub fn high_recall() -> Self {
Self::new("high_recall")
.ram_bytes(64 * 1024 * 1024) .ssd_random_reads(16) .ssd_sequential_bytes(8 * 1024 * 1024) .cpu_cycles(10_000_000_000) .latency_target(Duration::from_millis(100))
.recall_target(0.99, 0.999)
}
}
#[derive(Debug, Clone)]
pub struct HardwareProfile {
pub ram_bandwidth_gbps: f32,
pub ssd_random_latency_ns: u64,
pub ssd_seq_bandwidth_mbps: u32,
pub cpu_freq_ghz: f32,
pub llc_size_bytes: usize,
}
impl Default for HardwareProfile {
fn default() -> Self {
Self {
ram_bandwidth_gbps: 50.0, ssd_random_latency_ns: 100_000, ssd_seq_bandwidth_mbps: 3000, cpu_freq_ghz: 3.5, llc_size_bytes: 32 * 1024 * 1024, }
}
}
impl HardwareProfile {
pub fn high_end_server() -> Self {
Self {
ram_bandwidth_gbps: 100.0,
ssd_random_latency_ns: 50_000,
ssd_seq_bandwidth_mbps: 5000,
cpu_freq_ghz: 3.8,
llc_size_bytes: 48 * 1024 * 1024,
}
}
pub fn standard_server() -> Self {
Self::default()
}
pub fn embedded() -> Self {
Self {
ram_bandwidth_gbps: 25.0,
ssd_random_latency_ns: 200_000,
ssd_seq_bandwidth_mbps: 500,
cpu_freq_ghz: 2.0,
llc_size_bytes: 8 * 1024 * 1024,
}
}
}
#[derive(Debug)]
pub struct CostTracker {
budget: QueryBudget,
ram_bytes: AtomicU64,
ssd_random_reads: AtomicU64,
ssd_sequential_bytes: AtomicU64,
cpu_cycles: AtomicU64,
start_time: Instant,
exhausted: std::sync::atomic::AtomicBool,
exhaustion_reason: parking_lot::Mutex<Option<BudgetExhaustionReason>>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BudgetExhaustionReason {
RamBytesExceeded,
SsdRandomReadsExceeded,
SsdSequentialBytesExceeded,
CpuCyclesExceeded,
LatencyTargetExceeded,
}
impl CostTracker {
pub fn new(budget: QueryBudget) -> Self {
Self {
budget,
ram_bytes: AtomicU64::new(0),
ssd_random_reads: AtomicU64::new(0),
ssd_sequential_bytes: AtomicU64::new(0),
cpu_cycles: AtomicU64::new(0),
start_time: Instant::now(),
exhausted: std::sync::atomic::AtomicBool::new(false),
exhaustion_reason: parking_lot::Mutex::new(None),
}
}
#[inline]
pub fn add_ram_bytes(&self, bytes: u64) -> bool {
let new_total = self.ram_bytes.fetch_add(bytes, Ordering::Relaxed) + bytes;
if new_total > self.budget.ram_bytes_limit {
self.mark_exhausted(BudgetExhaustionReason::RamBytesExceeded);
false
} else {
true
}
}
#[inline]
pub fn add_ssd_random_read(&self) -> bool {
let new_total = self.ssd_random_reads.fetch_add(1, Ordering::Relaxed) + 1;
if new_total > self.budget.ssd_random_reads_limit as u64 {
self.mark_exhausted(BudgetExhaustionReason::SsdRandomReadsExceeded);
false
} else {
true
}
}
#[inline]
pub fn add_ssd_sequential_bytes(&self, bytes: u64) -> bool {
let new_total = self
.ssd_sequential_bytes
.fetch_add(bytes, Ordering::Relaxed)
+ bytes;
if new_total > self.budget.ssd_sequential_bytes_limit {
self.mark_exhausted(BudgetExhaustionReason::SsdSequentialBytesExceeded);
false
} else {
true
}
}
#[inline]
pub fn add_cpu_cycles(&self, cycles: u64) -> bool {
let new_total = self.cpu_cycles.fetch_add(cycles, Ordering::Relaxed) + cycles;
if new_total > self.budget.cpu_cycles_limit {
self.mark_exhausted(BudgetExhaustionReason::CpuCyclesExceeded);
false
} else {
true
}
}
#[inline]
pub fn check_latency(&self) -> bool {
if self.start_time.elapsed() > self.budget.latency_target {
self.mark_exhausted(BudgetExhaustionReason::LatencyTargetExceeded);
false
} else {
true
}
}
fn mark_exhausted(&self, reason: BudgetExhaustionReason) {
self.exhausted.store(true, Ordering::Release);
let mut guard = self.exhaustion_reason.lock();
if guard.is_none() {
*guard = Some(reason);
}
}
#[inline]
pub fn is_exhausted(&self) -> bool {
if self.start_time.elapsed() > self.budget.latency_target {
self.mark_exhausted(BudgetExhaustionReason::LatencyTargetExceeded);
}
self.exhausted.load(Ordering::Acquire)
}
pub fn exhaustion_reason(&self) -> Option<BudgetExhaustionReason> {
*self.exhaustion_reason.lock()
}
pub fn remaining_ram_bytes(&self) -> u64 {
self.budget
.ram_bytes_limit
.saturating_sub(self.ram_bytes.load(Ordering::Relaxed))
}
pub fn remaining_ssd_random_reads(&self) -> u32 {
self.budget
.ssd_random_reads_limit
.saturating_sub(self.ssd_random_reads.load(Ordering::Relaxed) as u32)
}
pub fn remaining_time(&self) -> Duration {
self.budget
.latency_target
.saturating_sub(self.start_time.elapsed())
}
pub fn utilization(&self) -> CostUtilization {
CostUtilization {
ram_bytes_ratio: self.ram_bytes.load(Ordering::Relaxed) as f64
/ self.budget.ram_bytes_limit.max(1) as f64,
ssd_random_reads_ratio: self.ssd_random_reads.load(Ordering::Relaxed) as f64
/ self.budget.ssd_random_reads_limit.max(1) as f64,
ssd_sequential_bytes_ratio: self.ssd_sequential_bytes.load(Ordering::Relaxed) as f64
/ self.budget.ssd_sequential_bytes_limit.max(1) as f64,
cpu_cycles_ratio: self.cpu_cycles.load(Ordering::Relaxed) as f64
/ self.budget.cpu_cycles_limit.max(1) as f64,
latency_ratio: self.start_time.elapsed().as_nanos() as f64
/ self.budget.latency_target.as_nanos().max(1) as f64,
}
}
pub fn summary(&self) -> CostSummary {
CostSummary {
query_class: self.budget.query_class.clone(),
ram_bytes_used: self.ram_bytes.load(Ordering::Relaxed),
ram_bytes_limit: self.budget.ram_bytes_limit,
ssd_random_reads_used: self.ssd_random_reads.load(Ordering::Relaxed) as u32,
ssd_random_reads_limit: self.budget.ssd_random_reads_limit,
ssd_sequential_bytes_used: self.ssd_sequential_bytes.load(Ordering::Relaxed),
ssd_sequential_bytes_limit: self.budget.ssd_sequential_bytes_limit,
cpu_cycles_used: self.cpu_cycles.load(Ordering::Relaxed),
cpu_cycles_limit: self.budget.cpu_cycles_limit,
elapsed: self.start_time.elapsed(),
latency_target: self.budget.latency_target,
exhausted: self.is_exhausted(),
exhaustion_reason: self.exhaustion_reason(),
}
}
}
#[derive(Debug, Clone)]
pub struct CostUtilization {
pub ram_bytes_ratio: f64,
pub ssd_random_reads_ratio: f64,
pub ssd_sequential_bytes_ratio: f64,
pub cpu_cycles_ratio: f64,
pub latency_ratio: f64,
}
#[derive(Debug, Clone)]
pub struct CostSummary {
pub query_class: String,
pub ram_bytes_used: u64,
pub ram_bytes_limit: u64,
pub ssd_random_reads_used: u32,
pub ssd_random_reads_limit: u32,
pub ssd_sequential_bytes_used: u64,
pub ssd_sequential_bytes_limit: u64,
pub cpu_cycles_used: u64,
pub cpu_cycles_limit: u64,
pub elapsed: Duration,
pub latency_target: Duration,
pub exhausted: bool,
pub exhaustion_reason: Option<BudgetExhaustionReason>,
}
pub struct AdmissionController {
max_concurrent_per_class: parking_lot::RwLock<std::collections::HashMap<String, usize>>,
active_per_class: parking_lot::RwLock<std::collections::HashMap<String, AtomicUsize>>,
global_memory_pressure: AtomicU64,
max_global_memory: u64,
backpressure_wait: Duration,
}
pub struct AdmissionTicket {
query_class: String,
estimated_memory: u64,
controller: Arc<AdmissionController>,
}
impl Drop for AdmissionTicket {
fn drop(&mut self) {
self.controller
.release(&self.query_class, self.estimated_memory);
}
}
impl AdmissionController {
pub fn new(max_global_memory: u64) -> Arc<Self> {
Arc::new(Self {
max_concurrent_per_class: parking_lot::RwLock::new(std::collections::HashMap::new()),
active_per_class: parking_lot::RwLock::new(std::collections::HashMap::new()),
global_memory_pressure: AtomicU64::new(0),
max_global_memory,
backpressure_wait: Duration::from_millis(10),
})
}
pub fn set_class_limit(self: &Arc<Self>, query_class: &str, max_concurrent: usize) {
self.max_concurrent_per_class
.write()
.insert(query_class.to_string(), max_concurrent);
}
pub fn try_admit(self: &Arc<Self>, budget: &QueryBudget) -> Option<AdmissionTicket> {
let class_limits = self.max_concurrent_per_class.read();
if let Some(&limit) = class_limits.get(&budget.query_class) {
let mut active = self.active_per_class.write();
let counter = active
.entry(budget.query_class.clone())
.or_insert_with(|| AtomicUsize::new(0));
let current = counter.load(Ordering::Acquire);
if current >= limit {
return None;
}
counter.fetch_add(1, Ordering::AcqRel);
}
drop(class_limits);
let estimated_memory = budget.ram_bytes_limit / 2; let current = self
.global_memory_pressure
.fetch_add(estimated_memory, Ordering::AcqRel);
if current + estimated_memory > self.max_global_memory {
self.global_memory_pressure
.fetch_sub(estimated_memory, Ordering::AcqRel);
self.release_class_counter(&budget.query_class);
return None;
}
Some(AdmissionTicket {
query_class: budget.query_class.clone(),
estimated_memory,
controller: Arc::clone(self),
})
}
pub fn admit_with_backpressure(
self: &Arc<Self>,
budget: &QueryBudget,
max_wait: Duration,
) -> Option<AdmissionTicket> {
let deadline = Instant::now() + max_wait;
loop {
if let Some(ticket) = self.try_admit(budget) {
return Some(ticket);
}
if Instant::now() >= deadline {
return None;
}
std::thread::sleep(self.backpressure_wait);
}
}
fn release(&self, query_class: &str, estimated_memory: u64) {
self.global_memory_pressure
.fetch_sub(estimated_memory, Ordering::AcqRel);
self.release_class_counter(query_class);
}
fn release_class_counter(&self, query_class: &str) {
let active = self.active_per_class.read();
if let Some(counter) = active.get(query_class) {
counter.fetch_sub(1, Ordering::AcqRel);
}
}
pub fn metrics(&self) -> AdmissionMetrics {
let active = self.active_per_class.read();
let active_per_class: std::collections::HashMap<String, usize> = active
.iter()
.map(|(k, v)| (k.clone(), v.load(Ordering::Relaxed)))
.collect();
AdmissionMetrics {
global_memory_pressure: self.global_memory_pressure.load(Ordering::Relaxed),
max_global_memory: self.max_global_memory,
memory_utilization: self.global_memory_pressure.load(Ordering::Relaxed) as f64
/ self.max_global_memory as f64,
active_per_class,
}
}
}
#[derive(Debug, Clone)]
pub struct AdmissionMetrics {
pub global_memory_pressure: u64,
pub max_global_memory: u64,
pub memory_utilization: f64,
pub active_per_class: std::collections::HashMap<String, usize>,
}
pub struct QueryClassRegistry {
classes: parking_lot::RwLock<std::collections::HashMap<String, QueryBudget>>,
hardware: HardwareProfile,
}
impl QueryClassRegistry {
pub fn new(hardware: HardwareProfile) -> Self {
let mut classes = std::collections::HashMap::new();
classes.insert("low_latency".to_string(), QueryBudget::low_latency());
classes.insert("balanced".to_string(), QueryBudget::balanced());
classes.insert("high_recall".to_string(), QueryBudget::high_recall());
Self {
classes: parking_lot::RwLock::new(classes),
hardware,
}
}
pub fn register(&self, budget: QueryBudget) {
self.classes
.write()
.insert(budget.query_class.clone(), budget);
}
pub fn get(&self, query_class: &str) -> Option<QueryBudget> {
self.classes.read().get(query_class).cloned()
}
pub fn create_from_sla(
&self,
query_class: &str,
latency_target: Duration,
recall_target: f32,
) -> QueryBudget {
QueryBudget::from_sla(query_class, latency_target, recall_target, &self.hardware)
}
}
impl Default for QueryClassRegistry {
fn default() -> Self {
Self::new(HardwareProfile::default())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_budget_creation() {
let budget = QueryBudget::new("test")
.ram_bytes(1024 * 1024)
.ssd_random_reads(10)
.latency_target(Duration::from_millis(50));
assert_eq!(budget.query_class, "test");
assert_eq!(budget.ram_bytes_limit, 1024 * 1024);
assert_eq!(budget.ssd_random_reads_limit, 10);
}
#[test]
fn test_cost_tracker() {
let budget = QueryBudget::new("test").ram_bytes(1000).ssd_random_reads(2);
let tracker = CostTracker::new(budget);
assert!(tracker.add_ram_bytes(500));
assert!(!tracker.is_exhausted());
assert!(tracker.add_ram_bytes(400));
assert!(!tracker.is_exhausted());
assert!(!tracker.add_ram_bytes(200));
assert!(tracker.is_exhausted());
assert_eq!(
tracker.exhaustion_reason(),
Some(BudgetExhaustionReason::RamBytesExceeded)
);
}
#[test]
fn test_admission_controller() {
let controller = AdmissionController::new(64 * 1024 * 1024);
controller.set_class_limit("low_latency", 2);
let budget = QueryBudget::low_latency();
let ticket1 = controller.try_admit(&budget);
assert!(ticket1.is_some());
let ticket2 = controller.try_admit(&budget);
assert!(ticket2.is_some());
let ticket3 = controller.try_admit(&budget);
assert!(ticket3.is_none());
drop(ticket1);
let ticket4 = controller.try_admit(&budget);
assert!(ticket4.is_some());
}
#[test]
fn test_budget_from_sla() {
let hardware = HardwareProfile::default();
let budget = QueryBudget::from_sla("custom", Duration::from_millis(50), 0.95, &hardware);
assert!(budget.ram_bytes_limit > 0);
assert!(budget.cpu_cycles_limit > 0);
}
}